使用swoole的Process实现生产者消费者模型
2017-10-04 00:00
435 查看
零.源码
<?php abstract class Schedule{ protected $_consumerList = array(); protected $_msgqkey = null; protected $_consumerNum = 2; protected $_finishFlag = 'ALLDONE'; public function __construct($cNum = 0){ if ($cNum){ $this->_consumerNum = $cNum; } } public function setConsumerNum($num = 0){ if ($num){ $this->_consumerNum = $num; return true; } return false; } public function setFinishFlag($flag = null){ if ($flag){ $this->_finishFlag = $flag; return true; } return false; } public function run(){ $this->_consumerList = array(); for($i=0; $i<$this->_consumerNum; $i++){ $consumer = new swoole_process(function($worker){ $this->_consumerFunc($worker); }); if ($this->_msgqkey){ $consumer->useQueue($this->_msgqkey); } else{ $consumer->useQueue(); } $pid = $consumer->start(); $this->_consumerList[$pid] = $consumer; } $producer = new swoole_process(function($worker){ //echo "i'm passer\n"; exit(0); }); if ($this->_msgqkey){ $producer->useQueue($this->_msgqkey); } else{ $producer->useQueue(); } $pid = $producer->start(); echo "begin:\n"; echo sprintf("msgqkey:%s\n", $producer->msgQueueKey); $this->_producerFunc($producer); } protected function _producerFunc($worker){ if ($this->_onlyConsume()){ return; } foreach ($this->doProduce($worker) as $data){ $worker->push($data); } //任务数据被取完 while(true){ $c = $worker->statQueue(); $n = $c['queue_num']; if ($n === 0){ break; } } //放入consumer进程程结束标识 foreach($this->_consumerList as $pid => $w){ $w->push($this->_finishFlag); } //确认结束 while(true){ $c = $worker->statQueue(); $n = $c['queue_num']; if ($n === 0){ break; } } $worker->freeQueue(); } protected function _consumerFunc($worker){ while(1){ $data = $worker->pop(); $pid = $worker->pid; if ($data == $this->_finishFlag){ echo "consumer $pid exit\n"; $worker->exit(0); } else{ $this->doConsume($data, $worker); } } } protected function _onlyConsume(){ return !! $this->_msgqkey; } abstract protected function doProduce($worker); abstract protected function doConsume($data, $worker); }
一.功能说明
实现了生产者消费者模型,一个生产者向任务队列写数据,N个消费者取数据做处理。数据处理完后生产者与消费者自动退出
在消费者意外挂掉的情况下,允许单独运行消费者继续处理之前队列中的任务
二.使用说明
1. 生产者消费者demo
<?php class Taskdemo extends Schedule{ protected $_consumerNum = 5; protected function doProduce($worker){ $all = 100; for($i=0; $i<$all; $i+=4){ yield json_encode(array('data'=>$i)); } } protected function doConsume($data, $worker){ //your process sleep(1); echo "consumer:{$worker->pid} redv {$data}\n"; } }
说明
1. 要继承Schedule
2. _consumerNum为消费者个数,不设置,默认2个。
3. doProduce($worker)用于产生任务数据的函数,要求返回值必须是数组或迭代器,每一项为一条任务数据。$worker为swoole进程句柄。
4. doConsume($data,$worker)用于消费者处理数据的函数。$data为单条消息,$worker为swoole进程句柄。
5. 一般情况进程句柄$worker都不会用到,可以忽略
2. 处理程序中途挂掉的情况
步骤:1.确认当前队列的key
程序运行时,会打出
msgqkey:1078263
也可以使用命令行
ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x001073f7 2359298 ballqiu 666 165 15
key值即是所需要的
修改Taskdemo,加入一行代码
protected $_msgqkey = 0x001073f7;
重新运行程序
手动删除队列
ipcrm -q $msgqkey
三.实现原理
使用swoole的Process,主进程调用doProduce()向消息队列写任务数据,fork出的n个子进程从队列取数据。队列就是linux用于进程间通信的消息队列。子进程从队列里不停取任务处理,如果拿到“完成标识串”(一个特定字符串),就退出。
主进程发现队列数据被处理完后,如果有n个子进程,就向队列发n个到“完成标识串”。然后再次检查队列,队列空时删除队列,自身退出。
四.注意事项
消息队列的一些使用上的限制,可以参见这里默认的”完成标识串”是ALLDONE,如需修改,可在Taskdemo中增加
protected $_finishFlag = 'youflag';
相关文章推荐
- 使用swoole的Process实现生产者消费者模型
- 使用swoole实现生产者消费者模型(2)
- Java多线程使用wait和notify实现生产者消费者模型
- 使用semaphore实现生产者-消费者简单模型
- 使用wait和notifyAll实现生产者消费者模型
- 什么是阻塞队列? 如何使用阻塞队列来实现生产者-消费者模型?
- 使用BlockingQueue实现生产者和消费者模型
- Java多线程之~~~~使用wait和notify实现生产者消费者模型
- Java多线程之~~~~使用wait和notify实现生产者消费者模型
- 使用ReentrantLock和Condition实现生产者消费者模型
- 使用semaphore实现生产者-消费者简单模型
- 使用Lock和Condition实现生产者消费者模型
- 使用Boost.Tread库实现生产者消费者模型
- Java多线程之~~~~使用wait和notify实现生产者消费者模型
- Java并发编程笔记 使用阻塞队列实现生产者-消费者模型
- 使用blockingqueue实现的简单生产者消费者模型
- 什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?
- 进程间通信之信号量--使用信号实现生产者消费者问题
- java实现生产者消费者模型
- 简单实现带有数据缓冲池的生产者消费者模型