您的位置:首页 > 其它

RabbitMQ消息队列实现流程

2017-03-03 17:16 369 查看
首先通过ConnectionFactory配置消息队列服务端信息,mq server端涉及的配置信息主要有server端的地址、端口号、虚拟主机(安装完有个默认的是”/”)、用户名、密码(刚安装完有个默认的用户,用户名和密码都是guest)

笔者的项目是用SpringBoot搭的,获取ConnectionFactory的方式为

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672);

connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");

//        connectionFactory.setUsername("lindl");
//        connectionFactory.setPassword("passwd");
//        connectionFactory.setVirtualHost("vhost");

connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}


如果是java实现的普通maven工程,可以用

ConnectionFactory factory=new ConnectionFactory();
factory.setHost(url);
factory.setPort(port);
...


通过ConnectionFactory创建Connection连接池

int MAX_QUEUQ_SIZE = 2000;
long keepAliveTime=24*60*60*1000;
ExecutorService es=new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(MAX_QUEUQ_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());
try {
connection = factory.newConnection(es);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}


通过连接池创建监听线程,也就是监听队列。创建一个监听队列涉及的配置信息有队列的id、类型、交换机等。这里的队列类型有直连、分发(速度最快)、主题(消息按一定的规则发送到对应的队列)、header四种。每种队列适用的场景不一样。监听线程里边通过在while(true){…}利用com.rabbitmq.client.QueueingConsumer.Delivery的getBody()不断地去获取客户端发送的消息,然后通过解析拿到的消息去执行对应的消费者,一般传过来的消息体会包含消费者的id或能拿到消费者执行路径的标识及传递给消费者的一些参数。

@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(new Queue("actQueue",true));
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {

public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费

}

});
return container;
}


通过Connection创建Channel,连接消息与队列的通道。客户端发送消息给队列和消费者从队列里面拿消息都是通过Channel。

发送消息

Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());


// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String mes = new String(body, "UTF-8");

logger.info("======Customer Received "+mes+"===========");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: