Rabbit mq订阅方式获取消息并可设置持久化
2016-02-01 00:00
351 查看
Rabbit 通过方式获取消息:订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
可以通过
整理代码如下:
Produce
Customer
可以通过
channel.basicQos(1);设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:
整理代码如下:
Produce
public class RabbitMQProduce { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory factory =new ConnectionFactory(); String routingKey="test"; String exchange="test"; factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel =conn.createChannel(); //发送消息 for(int i=0;i<8000;i++){ if(i%5==0){ Thread.sleep(200); } byte[] messageBodyBytes =(i+"").getBytes(); //如果将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN //也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在 //消费者在声明时也要做持久化声明 channel.basicPublish(exchange, routingKey, null, messageBodyBytes); System.out.println("发送.."+i); } channel.close(); conn.close(); } }
Customer
public class RabbitMqCustomer { private static ConnectionFactory factory; private static String QueryName="test"; private static Connection conn; private static Channel channel; private static String exchange="test"; private static String routingKey="test"; public static void main(String[] args) throws Exception { start(); /** * 采用订阅的方式获取消息 */ channel.basicConsume(QueryName, false, new DefaultConsumer(channel){ @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { System.out.println("==="+consumerTag+"====="+sig.getMessage()); boolean isOpenConnect = conn!=null&&conn.isOpen(); boolean isOpenChannel = channel != null && channel.isOpen(); while(!isOpenChannel||!isOpenConnect){ try { System.out.println("连接失败重连接...."); start(); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { //消息序号 long deliveryTag = envelope.getDeliveryTag(); String mes = new String(body,"UTF-8"); System.out.println("接受到消息:"+mes); //确认收到,消息回执 channel.basicAck(deliveryTag, true); } }); } public static void start() throws IOException { factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("test"); factory.setPassword("test"); conn = factory.newConnection(); channel = conn.createChannel(); channel.exchangeDeclare(exchange, "topic"); channel.queueDeclare(QueryName, false, false, false, null);//声明消息队列,且为可持久化的 channel.queueBind(QueryName, exchange, routingKey); channel.basicQos(1); //消息分发处理 } }
相关文章推荐
- spring ioc原理(看完后大家可以自己写一个spring)
- maven中下载jar包源码和javadoc的命令介绍
- java 中Properties使用及 getResourceAsStream的用法
- ArrayList循环遍历并删除元素的常见陷阱
- iOS支付宝支付_第一节_签约与审核(签约到集成)
- iOS9 http设置时遇到的问题
- Golang, Swift, Nodejs 和 Rust 性能对比
- 判断集合是否为null或者空
- ADG(active dataguard)体系结构以及三种模式
- MarkDown使用
- 关于volist mod的一点疑问
- Apache中设置强制将http转换为https。
- Angular移除不必要的$watch之性能优化
- 解析APP开发的三大标准
- 11.Spark之运行模式及原理
- 读野蛮生长笔记
- 会声会影翻页转场视频制作教程
- MySQL_生成唯一GID函数
- windows下spark简单搭建
- iOS数据持久化——SQLite(数据库)