您的位置:首页 > 运维架构 > Linux

ActiveMQ:点对点队列消费者接收不到消息

2017-11-20 10:41 399 查看

一.环境说明

Windows 1709

阿里云ECS

CentOS 7.4

ActiveMQ 5.15.2

JDK 1.8

IDEA 2017.3

Maven 3.5.0

二.问题说明

远程消息服务器使用的是阿里云ECS,在windows上编写测试类测试消息队列的点对点的通信,却发现,无法消费生产者生产的消息,即接收不到消息.

三.代码

testProducer

@Test
public void testProducer() throws JMSException {
//创建ConnectionFactory对象.需要制定服务端ip和端口号
//ConnectionFactory是接口,使用其实现类
ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://阿里云服务器公网IP地址:61616");
//开启连接,使用Connection对象的start方法
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//使用Connection对象创建Session对象
/*
参数说明:
arg0:boolean,是否开启事务
arg1:int,应答模式(1:自动应答,2:手动应答)
当使用事务时,第二个参数无效
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session对象创建一个Destination对象(topic,queue,此处选择queue)
Queue queue = session.createQueue("testQueue");
//使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//创建一个Message对象,创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("生产者1号,为您服务");
//使用Producer对象发送消息
producer.send(textMessage);
System.err.println("消息生产完成");
//关闭资源
producer.close();
session.close();
connection.close();
}


testConsumer

@Test
public void testConsumer() throws Exception {
//创建ConnectionFactory对象.需要制定服务端ip和端口号
//ConnectionFactory是接口,使用其实现类
ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://阿里云服务器公网IP地址:61616");
//开启连接,使用Connection对象的start方法
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//使用Connection对象创建Session对象
/*
参数说明:
arg0:boolean,是否开启事务
arg1:int,应答模式(1:自动应答,2:手动应答)
当使用事务时,第二个参数无效
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session对象创建一个Destination对象(topic,queue,此处选择queue),与发送方保持一致
Queue queue = session.createQueue("testQueue");
//使用Session对象创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//接收消息
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
//打印消息
System.err.println("消息接收完成...." + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
//关闭资源
consumer.close();
connection.close();
session.close();
}


:ConnectionFactory属于很重的类,不宜重复创建.这里由于只是做演示,所以未对其进行抽取封装成工具类,可自行完成

四.解决问题

经测试发现,这样写我的消费者无法消费消息队列中的消息,而且setMessageListener中的onMessage方法无法进入,就结束了.后来根据别人指点,才知道onMessage是异步接收消息的,所以有个时间差的问题.因为阿里云是远程服务器,无法做到像本机虚拟机一样的快速访问,导致还没接收消息,consumer等就被程序close掉了,也就无法消费消息,接收消息队列中的消息.

所以,为了解决异步问题,可从以下两个方向着手:

1. 强制线程休眠.等待异步执行完成

consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
//打印消息
System.err.println("消息接收完成...." + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
//让线程休眠,等待消息接收完毕
Thread.sleep(5000);


2.使用receive(阻塞式方法,在接到消息前会一直阻塞着)

Message message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());


经测试,均可以达到接收消息的目的

2017/11/20

Lucifer
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq 5-15-2 idea centos