您的位置:首页 > 编程语言 > PHP开发

[PHP] 使用 pcntl 库实现PHP多进程

2017-06-13 13:27 483 查看
摘要: 最近因项目需要,需要大量同步数据,数据量基数在3000万条左右,因此想到了开启多进程来处理,下面是处理的完整代码,基于laravel 5.1框架。 这是经过实际环境验证过的,所以类似场景可以简单修改下就可使用。

最近因项目需要,需要大量同步数据,数据量基数在3000万条左右,因此想到了开启多进程来处理,下面是处理的完整代码,基于laravel 5.1框架。

这是经过实际环境验证过的,所以类似场景可以简单修改下就可使用。
/**
* ******数据同步脚本
*
* @author yedonghai
*/
namespace App\Console\Commands;

use DB;
use Illuminate\Console\Command;
use App\Services\ZzcService;

class ZzcSyncCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'zzcsync:data';

/**
* The console command description.
*
* @var string
*/
protected $description = 'sync zzc apply info';

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$userNum = 6500000;
$workers = 30;
$block = 50000;
$loop = 0;
$flag = 0;

$processIds = [];
do {

$flag = $loop * $workers * $block;

for ($i = 0; $i < $workers; $i++) {
$minUserId = ($block * $i) + $flag;
$maxUserId = $block * ($i + 1) + $flag;

if ($minUserId < $userNum) {
$processIds[$i] = pcntl_fork();
switch ($processIds[$i]) {
case -1 :
echo "fork failed : {$i} \r\n";
exit;
case 0 :
$this->_userReport($minUserId, $maxUserId);
exit;
default :
break;
}
} else {
break;
}
}

while(count($processIds) > 0) {
$mypid = pcntl_waitpid(-1, $status, WNOHANG);
foreach ($processIds as $key => $pid) {
if ($mypid == $pid || $mypid == -1) {
unset($processIds[$key]);
}
}
}

$loop++;

} while (empty($processIds) && $flag < $userNum);
}

/**
* 子进程获取指定数据
*
* @param integer  $minUserId  读取区间的下限
* @param integer  $maxUserId  读取区间的上限
*
* @return array
*/
private function _userReport($minUserId, $maxUserId)
{
$users = DB::table('users')->leftJoin('user_credits', 'user_credits.user_id', '=', 'users.id')
->select('users.id', 'users.user_name as mobile', 'users.id_number as pid', 'users.truename as name')
->where('user_credits.audit_limit', '>', 0)
->where('users.id', '>=', $minUserId)
->where('users.id', '<', $maxUserId)
->get();

foreach ($users as $userObj) {

$userExist = DB::table('zzc_apply')->where('user_id', $userObj->id)->first();
if (!empty((array)$userExist)) {
continue;
}

$userArr = [];

$userArr['loan_type'] = '消费贷';
$userArr['loan_term'] = '3';
$userArr['loan_purpose'] = '购物';
$userArr['applicant']['name']   = $userObj->name;
$userArr['applicant']['pid']    = $userObj->pid;
$userArr['applicant']['mobile'] = $userObj->mobile;

$user = json_encode($userArr);
$zzcService = new ZzcService();
$zzcService->createNezha($user);
}
}
    }
原文链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: