php中RabbitMQ的使用
什么是队列
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回。消息使用者再从MQ中取消息进行逻辑处理。对于消耗较大的请求,可以立马返回处理结果。减少服务器压力。为各个子系统之间解耦和异步处理。
rabbitmq的整体结构
rmq简单来说就是一个(生产/消费)的模型结构。消息生产者把数据丢到队列中,消息消费者从队列中取出数据进行逻辑处理。
那么如何确保,生产者添加的数据,能够到达指定的队列中呢?
rmq(消息队列)主要提供了三个概念(中间件?)来确保消息的分发,Exchange(交换机)、RoutingKey(路由)、Queue(队列)。
从上面的图也可以看出来, 处理消息的接收、分发,主要在Broker模块中。
Exchange 所有生产消息的入口都是到交换机这里。
exchange通过进来的路由(RoutingKey),去和已binding的规则进行匹配,找到指定的队列。
RoutingKey 我的理解,这里相当于一把钥匙。而binding的操作相当于一把锁头。
Queue 消息的存放区域,等到消费者来取。
Binding Exchange和Queue之间的一个绑定。
从这些概念来看,影响规则的主要是依赖Exchange。
那rmq提供了哪些类型,都有什么特点呢?
exchange类型
RabbitMQ提供了四种Exchange类型
- direct
- fanout
- topic
- header(header类型在实际使用中较少,所以在这里就不进行说明。)
Direct Exchange
direct 的规则比较简单。
在发布消息前,需要把exchange和queue做一个绑定。
如果发布消息的时候,RoutingKey 和绑定的值(key)一致。则将消息投递到该队列中。
如果不存在对应的队列,则消息会被丢弃。 (这时候访问rmq管理web时。可以看到消息进来,但是队列中没有值)
Fanout Exchange
fanout 类型则更简单一些。 只要exchange和队列做了绑定。发布的消息都会到队列中去。
Topic Exchange
相对来说 topic类型要复杂一些。 和direct类型相比。topic相当于模糊匹配,而direct为全等。类似mysql中 ‘like’关键词。
针对direct 类型写一个实例
实例分两部分 生产者、消费者(回调函数)
因为我的代码,对mq的部分做了封装,懒得拆分出来。 所以我只贴业务代码和封装的核心方法。
生产者代码
$mqModel = new Rabbitmq(); // 初始化(rmq连接操作) $newResult = ['tom','bill','jack']; if ($mqModel) { $mqRoute = 'push_data_to_crm_routing'; // 路由 $mqExchange = 'push_data_to_crm_exchange'; // 交换机 $mqQuery = 'push_data_to_crm_queue'; // 队列 // 建立连接,设置交换机,设置队列 $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute); foreach ($newResult as $k => $v){ $push_data = $v; $mqModel->publishMessage($push_data,$mqRoute); // 消息推送 } }
消费者代码
$mqModel = new Rabbitmq(); // $mqRoute = 'push_data_to_crm_routing'; 消费者用不上路由,因为不需要指定。 只要想取队列,消费即可。 $mqExchange = 'push_data_to_crm_exchange'; $mqQuery = 'push_data_to_crm_queue'; $mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE); $zmq->consume(function($msg){ var_dump($msg); return true; });
封装类中的核心方法
//设置交换机 public function setExchange($changeName = '', $changeType = '', $flags = false) { $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setExchange", 1); } $this->exchange = new \AMQPExchange($this->channel); if($changeName){ $this->changeName = $changeName; // 交换机名称 $this->exchange->setName($changeName); // 设置名称 $changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT; // 交换机类型 }else{ $this->changeName = ''; } if($changeType){ $this->changeType = $changeType; $this->exchange->settype($changeType); // 设置交换机类型 }else{ $this->changeType = ''; } if($flags){ $this->exchange->setFlags($flags); //交换机标志 } if($changeType || $flags){ $this->exchange->declareExchange(); // 创建 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 设置队列 public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){ $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setQueue", 1); } $this->queue = new \AMQPQueue($this->channel); if(!$queueName){ return false; } $this->queueName = $queueName; // 队列名称 $this->queue->setName($queueName); if($flags){ $this->queue->setFlags($flags); // 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明 } if(is_array($arguments) && !empty($arguments)){ $this->queue->setArguments($arguments); // 参数配置 } $this->queue->declareQueue(); // 创建一个队列 $exchange_name = $exchange_name === false ? '' : ($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name); $routing_key = $routing_key ? $routing_key : $this->queueName; if($exchange_name && $routing_key ){ $this->queue->bind($exchange_name, $routing_key); // 交换机和队列的绑定操作 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 发布消息 public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){ if(!$message){ return false; } $routing_key = $routing_key ? $routing_key : $this->queueName; // 发布消息,带有路由key。如果需要,则会用于关联。 $this->exchange->publish($message, $routing_key, $flags, $attributes); return true; } // 消费 public function consume($callback = null, $qos = 0, $isAct = true){ if($qos){ $this->channel->qos(0, $qos); } $errorMsg = ''; try{ if(!$this->queue){ throw new \AMQPQueueException("Error queue on method consume", 1); } $this->callBackFnc = $callback; $this->isAct = $isAct; $callback = function($envelope, $queue){ if(is_callable($this->callBackFnc)){ call_user_func($this->callBackFnc, $envelope->getBody()); if($this->isAct){ $queue->ack($envelope->getDeliveryTag()); }else{ $queue->nack($envelope->getDeliveryTag()); } } }; $this->queue->consume($callback); } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new \Exception($errorMsg, 1); } }
因为封装代码里写了很多 try catch 所以看起来特别乱。 还有部分兼容的逻辑。 看起来不舒服,就先删掉再看吧。
执行结果
先跑一遍生产者代码,这里可以用浏览器直接访问。 执行完了之后。 到rabbitmq 的web管理页面中查看。 发现消息已经正常添加到队列中。(web管理页面可查询别的文章开启)
这时候再执行消费者代码。 消费者代码需要在cli下执行。因为消费者为轮询等待,是死循环,无法在浏览器下执行。
终端输出结果,消息已经被消费掉了。 再次返回web管理页面,队列中的三条记录也没有了。
结尾
实际上,这样的一个流程,是可以直接在实际项目中使用的。
这就是我上一个项目上使用的rmq(仅限应用架构)。
只不过这是一个简略版的。为了确保子系统的独立。
消费者应该单独成一个小系统。
在回调函数上,使用curl请求业务应用。
这样两个系统分来。
业务逻辑还是在原应用中。
还没结束,到这里似乎缺少点人性化的东西。
因为,没建一个队列,都需要单独去运行一个消费者代码。
这样很繁琐,而且很多人都不会有权限直接操作服务器。
所以,这里还需要搭建一个平台,用来自动部署消费端代码运行,停止,更改。
如何去部署,那就需要思考了。
- 基于PHP使用rabbitmq实现消息队列
- 基于PHP使用rabbitmq实现消息队列
- rabbitmq php测试使用
- 使用PHP访问RabbitMQ消息队列
- 使用PHP访问RabbitMQ消息队列
- 在php中使用rabbitmq从概念到实战
- RabbitMQ队列使用 (PHP)
- rabbitmq 结合php的安装配置使用
- rabbitmq php测试使用
- PHP 使用 Rabbitmq Demo
- rabbitmq使用__php客户端(消息发送者)
- php-rabbit RabbitMQ 具体使用方法
- php 消息队列 rabbitmq 的安装使用
- PHP使用RabbitMQ
- 使用php-amqplib连接rabbitMQ 学习笔记及总结
- 基于PHP使用rabbitmq实现消息队列
- php RabbitMQ使用
- RabbitMQ php 使用
- PHP使用RabbitMQ实例
- RabbitMQ PHP版使用教程