您的位置:首页 > 编程语言 > PHP开发

使用swoole的Process实现生产者消费者模型

2017-10-04 00:00 435 查看

零.源码

<?php
abstract class Schedule{
protected $_consumerList = array();
protected $_msgqkey = null;

protected $_consumerNum = 2;
protected $_finishFlag = 'ALLDONE';

public function __construct($cNum = 0){
if ($cNum){
$this->_consumerNum = $cNum;
}
}

public function setConsumerNum($num = 0){
if ($num){
$this->_consumerNum = $num;
return true;
}

return false;
}

public function setFinishFlag($flag = null){
if ($flag){
$this->_finishFlag = $flag;
return true;
}

return false;
}

public function run(){
$this->_consumerList = array();
for($i=0; $i<$this->_consumerNum; $i++){
$consumer = new swoole_process(function($worker){
$this->_consumerFunc($worker);
});

if ($this->_msgqkey){
$consumer->useQueue($this->_msgqkey);
}
else{
$consumer->useQueue();
}
$pid = $consumer->start();

$this->_consumerList[$pid] = $consumer;
}

$producer = new swoole_process(function($worker){
//echo "i'm passer\n";
exit(0);
});

if ($this->_msgqkey){
$producer->useQueue($this->_msgqkey);
}
else{
$producer->useQueue();
}

$pid = $producer->start();
echo "begin:\n";
echo sprintf("msgqkey:%s\n", $producer->msgQueueKey);

$this->_producerFunc($producer);
}

protected function _producerFunc($worker){
if ($this->_onlyConsume()){
return;
}

foreach ($this->doProduce($worker) as $data){
$worker->push($data);
}

//任务数据被取完
while(true){
$c = $worker->statQueue();
$n = $c['queue_num'];
if ($n === 0){
break;
}
}

//放入consumer进程程结束标识
foreach($this->_consumerList as $pid => $w){
$w->push($this->_finishFlag);
}

//确认结束
while(true){
$c = $worker->statQueue();
$n = $c['queue_num'];
if ($n === 0){
break;
}
}

$worker->freeQueue();
}

protected function _consumerFunc($worker){
while(1){
$data = $worker->pop();
$pid = $worker->pid;
if ($data == $this->_finishFlag){
echo "consumer $pid exit\n";
$worker->exit(0);
}
else{
$this->doConsume($data, $worker);
}
}
}

protected function _onlyConsume(){
return !! $this->_msgqkey;
}

abstract protected function doProduce($worker);

abstract protected function doConsume($data, $worker);
}


一.功能说明

实现了生产者消费者模型,一个生产者向任务队列写数据,N个消费者取数据做处理。

数据处理完后生产者与消费者自动退出

在消费者意外挂掉的情况下,允许单独运行消费者继续处理之前队列中的任务

二.使用说明

1. 生产者消费者demo

<?php
class Taskdemo extends Schedule{
protected $_consumerNum = 5;

protected function doProduce($worker){
$all = 100;
for($i=0; $i<$all; $i+=4){
yield json_encode(array('data'=>$i));
}
}

protected function doConsume($data, $worker){
//your process
sleep(1);
echo "consumer:{$worker->pid} redv {$data}\n";
}
}

说明
1. 要继承Schedule
2. _consumerNum为消费者个数,不设置,默认2个。
3. doProduce($worker)用于产生任务数据的函数,要求返回值必须是数组或迭代器,每一项为一条任务数据。$worker为swoole进程句柄。
4. doConsume($data,$worker)用于消费者处理数据的函数。$data为单条消息,$worker为swoole进程句柄。
5. 一般情况进程句柄$worker都不会用到,可以忽略

2. 处理程序中途挂掉的情况

步骤:

1.确认当前队列的key
程序运行时,会打出

msgqkey:1078263


也可以使用命令行

ipcs -q
------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
0x001073f7 2359298    ballqiu    666        165          15

key值即是所需要的

修改Taskdemo,加入一行代码

protected $_msgqkey = 0x001073f7;


重新运行程序

手动删除队列

ipcrm -q $msgqkey


三.实现原理

使用swoole的Process,主进程调用doProduce()向消息队列写任务数据,fork出的n个子进程从队列取数据。队列就是linux用于进程间通信的消息队列。

子进程从队列里不停取任务处理,如果拿到“完成标识串”(一个特定字符串),就退出。

主进程发现队列数据被处理完后,如果有n个子进程,就向队列发n个到“完成标识串”。然后再次检查队列,队列空时删除队列,自身退出。

四.注意事项

消息队列的一些使用上的限制,可以参见这里

默认的”完成标识串”是ALLDONE,如需修改,可在Taskdemo中增加

protected $_finishFlag = 'youflag';
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  swoole php