php消息队列处理实践 ,利用AMQP和redis两种方法
2013-04-03 14:12
1156 查看
一:利用AMQP方法
配置文件:
接收并处理文件:
加入队列文件:
二:利用redis做消息队列处理
类 amqp.php <?php class Amqp { public $e_name; public $q_name; public $k_route; public $channel; public function __construct($config,$e_name,$q_name,$k_route) { $this->e_name = $e_name; $this->q_name = $q_name; $this->k_route = $k_route; //创建连接和channel $this->conn = new AMQPConnection($config); if (!$this->conn->connect()) { return array('error_code' => 1,'msg'=>'Cannot connect to the broker!' ); } $this->channel = new AMQPChannel($this->conn); $this->CreateExchange(); $this->CreateQueue(); } //创建交换机 public function CreateExchange() { $ex = new AMQPExchange($this->channel); $ex->setName($this->e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化 //echo "Exchange Status:".$ex->declare()."\n"; //队列内容总数 $ex->declare(); $this->ex = $ex; } //创建队列 public function CreateQueue() { $q = new AMQPQueue($this->channel); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化 //echo "Message Total:".$this->q->declare()."\n"; //绑定交换机与队列,并指定路由键 //echo "queue status: ".$q->declare(); //echo "\n"; //echo 'Queue Bind: '.$q->bind($this->e_name, $this->k_route)."\n"; //echo "\n"; $q->bind($this->e_name, $this->k_route); } //发送消息 public function send($msg) { //$this->CreateExchange(); //$this->CreateQueue(); $message=json_encode($msg); $this->channel->startTransaction(); //echo "send: ".$this->ex->publish($message, $this->k_route); //将你的消息通过制定routingKey发送 $status = $this->ex->publish($message, $this->k_route); $this->channel->commitTransaction(); $this->conn->disconnect(); return array('status'=>$status); } //获取消息 public function get() { $q = new AMQPQueue($this->channel); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //$q->delete();删除队列 $return=array(); while($a=$q->declare()) { //echo "queue status: ".$a; //echo "==========\n"; $messages = $q->get(AMQP_AUTOACK); $return[]=json_decode($messages->getBody(),true); //echo "\n"; } $this->conn->disconnect(); return $return; } }
配置文件:
config.php return array( 'amqp'=>array( array( 'host' => 'localhost', 'port' => '5672', 'vhost' => '/', 'user' => 'guest', 'password' => 'guest' ) ), );
接收并处理文件:
get.php require_once('amqp.php'); $config = require('config.php'); $config_qmqp = $config['amqp']; $e_name = 'e_guest'; //交换机名 $k_route = 'k_route_sendemail'; //路由key $q_name = 'q_guest_sendemail'; //队列名 $amqp = new Amqp($config_qmqp,$e_name,$q_name,$k_route); $re = $amqp->get();
加入队列文件:
send.php require_once('amqp.php'); $e_name = 'e_guest'; //交换机名 $k_route = 'k_route_feedpush'; //路由key $q_name = 'q_guest_feedpush'; //队列名 $config = config('amqp'); $amqp = new Amqp(config('amqp'),$e_name,$q_name,$k_route); $msg = array('test','123'); $re = $amqp->send($msg);
二:利用redis做消息队列处理
//redis出队列POP function actionRedisPop() { $redis = new Redis; $redis->connect('cloud_redis',9002); while ($usr = $redis->rPop('list_test')) { $array = json_decode($usr,true); print_R($array); } } //redis入队列push function actionRedisPush() { $redis = new Redis; $redis->connect('cloud_redis',9002); $data = array('list_name'=>'usr','value'=>date('Y-m-d H:i:s')); $json = json_encode($data); var_dump($redis->lPush('list_test', $json)); }
相关文章推荐
- PHP中利用redis实现消息队列处理高并发请求
- PHP中利用redis实现消息队列处理高并发请求--简洁代码实现效果
- PHP中利用redis实现消息队列处理高并发请求
- phpredis提高消息队列的实时性方法
- PHP使用redis消息队列发布微博的方法示例
- PHP基于Redis消息队列实现发布微博的方法
- phpredis提高消息队列的实时性方法(推荐)
- SpringBoot利用redis集成消息队列的方法
- 利用Python操作消息队列RabbitMQ的方法教程
- [乐意黎转载]PHP+Redis 队列实践
- 基于Redis的消息队列php-resque
- 利用PHP操作Linux消息队列完成进程间通信
- java redis使用之利用jedis实现redis消息队列
- PHP下操作Linux消息队列完成进程间通信的方法
- handle处理主线程消息的两种方法
- SpringBoot利用redis集成消息队列
- 简介:如何利用IndexOf和SubString两种方法处理字符串
- 消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式
- php-共享内存以及利用共享内存实现消息队列
- (五)RabbitMQ消息队列-安装amqp扩展并订阅/发布Demo(PHP版)