RabbitMQ消息队列实现流程
2017-03-03 17:16
369 查看
首先通过ConnectionFactory配置消息队列服务端信息,mq server端涉及的配置信息主要有server端的地址、端口号、虚拟主机(安装完有个默认的是”/”)、用户名、密码(刚安装完有个默认的用户,用户名和密码都是guest)
笔者的项目是用SpringBoot搭的,获取ConnectionFactory的方式为
如果是java实现的普通maven工程,可以用
通过ConnectionFactory创建Connection连接池
通过连接池创建监听线程,也就是监听队列。创建一个监听队列涉及的配置信息有队列的id、类型、交换机等。这里的队列类型有直连、分发(速度最快)、主题(消息按一定的规则发送到对应的队列)、header四种。每种队列适用的场景不一样。监听线程里边通过在while(true){…}利用com.rabbitmq.client.QueueingConsumer.Delivery的getBody()不断地去获取客户端发送的消息,然后通过解析拿到的消息去执行对应的消费者,一般传过来的消息体会包含消费者的id或能拿到消费者执行路径的标识及传递给消费者的一些参数。
通过Connection创建Channel,连接消息与队列的通道。客户端发送消息给队列和消费者从队列里面拿消息都是通过Channel。
发送消息
笔者的项目是用SpringBoot搭的,获取ConnectionFactory的方式为
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); // connectionFactory.setUsername("lindl"); // connectionFactory.setPassword("passwd"); // connectionFactory.setVirtualHost("vhost"); connectionFactory.setPublisherConfirms(true); // 必须要设置 return connectionFactory; }
如果是java实现的普通maven工程,可以用
ConnectionFactory factory=new ConnectionFactory(); factory.setHost(url); factory.setPort(port); ...
通过ConnectionFactory创建Connection连接池
int MAX_QUEUQ_SIZE = 2000; long keepAliveTime=24*60*60*1000; ExecutorService es=new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(MAX_QUEUQ_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); try { connection = factory.newConnection(es); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
通过连接池创建监听线程,也就是监听队列。创建一个监听队列涉及的配置信息有队列的id、类型、交换机等。这里的队列类型有直连、分发(速度最快)、主题(消息按一定的规则发送到对应的队列)、header四种。每种队列适用的场景不一样。监听线程里边通过在while(true){…}利用com.rabbitmq.client.QueueingConsumer.Delivery的getBody()不断地去获取客户端发送的消息,然后通过解析拿到的消息去执行对应的消费者,一般传过来的消息体会包含消费者的id或能拿到消费者执行路径的标识及传递给消费者的一些参数。
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(new Queue("actQueue",true)); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; }
通过Connection创建Channel,连接消息与队列的通道。客户端发送消息给队列和消费者从队列里面拿消息都是通过Channel。
发送消息
Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String mes = new String(body, "UTF-8"); logger.info("======Customer Received "+mes+"==========="); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer);
相关文章推荐
- 基于PHP使用rabbitmq实现消息队列
- Python操作RabbitMQ服务器实现消息队列的路由功能
- spring boot 自学笔记(五) Rabbitmq集成,延时消息队列实现
- 消息队列的使用 RabbitMQ (二): Windows 环境下集群的实现与优化
- RabbitMQ开源企业级消息队列系统实现方案(单机版)
- rabbitmq结合spring实现消息队列优先级
- 基于PHP使用rabbitmq实现消息队列
- rabbitmq结合spring实现消息队列优先级的方法
- RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)
- Redis与RabbitMQ实现消息队列
- Redis和RabbitMQ实现消息队列
- Python操作RabbitMQ服务器实现消息队列的路由功能
- (十一)RabbitMQ消息队列-如何实现高可用
- 基于PHP使用rabbitmq实现消息队列
- 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现
- Python RabbitMQ消息队列实现rpc
- RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)
- RabbitMQ学习(十)之spring整合发送同步消息(注解实现)
- RabbitMQ消息队列(五):Routing 消息路由
- C#实现rabbitmq 延迟队列功能