Apache ActiveMQ - 4 - ActiveMQ生产消费Demo
2017-10-31 10:16
225 查看
这篇笔记,记录几种ActiveMQ生产和消费的实例Demo。
1. 使用JMS发布和订阅消息
2. Queue队列方式发送点对点消息数据
3. 普通方式
4. 普通方式(带监听回调功能)
废话不多说,下面直接看代码...
使用JMS发布和订阅消息
package com.activemq.activemq1_jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息接受者
*
* @author CYX
* @time 2016年12月13日下午4:27:31
*/
public class MessageReceiver {
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESTINATION = "hoo.mq.queue";
public static void main(String[] args) throws Exception {
MessageReceiver.run();
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动链接
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者.
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
// 接收数据的时间(等待)100ms
Message message = consumer.receive(1000 * 1000);
TextMessage text = (TextMessage) message;
if (null != text) {
System.out.println("text : " + text.getText());
System.out.println("接收到的 : " + text);
} else {
break;
}
}
// 提交会话.
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}
}
}
package com.activemq.activemq1_jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息发送者<br>
* 使用JMS方式发送接收消息<br>
* @author CYX
* @time 2016年12月13日下午4:13:58
*/
public class MessageSender {
/** 发送次数 */
public static final int SEND_NUM = 5;
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESRINATION = "hoo.mq.queue";
public static void main(String[] args) {
try {
MessageSender.run();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "我是消息 : " + (i + 1) + "....";
TextMessage text = session.createTextMessage(message);
System.out.println("message : " + message);
producer.send(text);
}
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂.
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个链接.
connection = factory.createConnection();
// 启动链接.
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESRINATION);
// 创建消息制作者.
MessageProducer producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交会话.
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Queue队列方式发送点对点消息数据
package com.activemq.activemq1_queue;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
/**
* 消息接收者
*
* @author CYX
* @time 2016年12月19日下午7:56:45
*/
public class QueueReceiverMessage {
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER = ActiveMQConnection.DEFAULT_BROKER_URL;
public static final String TARGET = "hoo.mq.queue";
public static void main(String[] args) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER);
// 通过工厂创建一个链接
connection = factory.createQueueConnection();
// 启动链接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(TARGET);
// 创建消息接收者
QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (null != msg) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + " 接收 : " + map.getString("text"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
Thread.sleep(1000 * 100);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}
}
}
普通方式
普通方式(带监听回调功能)
package com.activemq.activemq5;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息的生产者
*
* @author CYX
* @time 2016年12月19日下午5:30:44
*/
public class JMSProducer {
// 发送的消息数量
private static final int SENDUNM = 10;
public static void main(String[] args) {
// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息生产者
MessageProducer messageProducer;
try {
connection = ActiveMQPoolsUtil.getConnection();
System.out.println(connection);
// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为HelloWorld的消息队列.
destination = session.createQueue("HelloWorld");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息.
*
* @param session
* @param messageProducer
* 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < SENDUNM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 发送消息 : " + i);
// System.out.println("发送消息 : ActiveMQ 发送消息 : " + i);
// 通过消息生产者发送消息
messageProducer.send(message);
}
}
}
package com.activemq.activemq5;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.transport.TransportListener;
/**
* TransportListener : 消息传输监听
*
* 如果ActiveMQ整个集群也宕机,这时消息都无法发送,这可怎么办,还好ActiveMQ提供了消息传输监听(transportListener).<br>
* 可以对ActiveMQConnectionFactory添加一个Activemq的消息传输监听,该监听实现Activemq的TransportListener接口。<br>
* 该接口实现的监听方法有onCommand(),onException(),transportResumed(),transportInterupted()等监听方法。<br>
* 拥有这些方法就足以实时感知ActiveMQ服务器的状态了,当发现服务器无法连接时,就采取相应措施,如把消息存储在本地,当服务器恢复时再进行发送。<br>
*
* @author CYX
* @time 2016年12月24日下午3:14:49
*/
public class JMSConsumer implements TransportListener {
public static void main(String[] args) {
// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息的消费者
MessageConsumer messageConsumer;
try {
connection = ActiveMQPoolsUtil.getConnection();
// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建一个链接HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (null != textMessage) {
System.out.println("收到的消息 : " + textMessage.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 对消息传输命令进行监控
*/
@Override
public void onCommand(Object arg0) {
// TODO Auto-generated method stub
}
/**
* 对监控到的异常进行触发
*/
@Override
public void onException(IOException arg0) {
// TODO Auto-generated method stub
}
/**
* 当failover(失效备援)时触发
*/
@Override
public void transportInterupted() {
// TODO Auto-generated method stub
}
/**
* 监控到failover(失效备援)恢复后进行触发
*/
@Override
public void transportResumed() {
// TODO Auto-generated method stub
}
}
1. 使用JMS发布和订阅消息
2. Queue队列方式发送点对点消息数据
3. 普通方式
4. 普通方式(带监听回调功能)
废话不多说,下面直接看代码...
使用JMS发布和订阅消息
package com.activemq.activemq1_jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息接受者
*
* @author CYX
* @time 2016年12月13日下午4:27:31
*/
public class MessageReceiver {
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESTINATION = "hoo.mq.queue";
public static void main(String[] args) throws Exception {
MessageReceiver.run();
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动链接
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者.
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
// 接收数据的时间(等待)100ms
Message message = consumer.receive(1000 * 1000);
TextMessage text = (TextMessage) message;
if (null != text) {
System.out.println("text : " + text.getText());
System.out.println("接收到的 : " + text);
} else {
break;
}
}
// 提交会话.
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}
}
}
package com.activemq.activemq1_jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息发送者<br>
* 使用JMS方式发送接收消息<br>
* @author CYX
* @time 2016年12月13日下午4:13:58
*/
public class MessageSender {
/** 发送次数 */
public static final int SEND_NUM = 5;
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESRINATION = "hoo.mq.queue";
public static void main(String[] args) {
try {
MessageSender.run();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "我是消息 : " + (i + 1) + "....";
TextMessage text = session.createTextMessage(message);
System.out.println("message : " + message);
producer.send(text);
}
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂.
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个链接.
connection = factory.createConnection();
// 启动链接.
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESRINATION);
// 创建消息制作者.
MessageProducer producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交会话.
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Queue队列方式发送点对点消息数据
package com.activemq.activemq1_queue;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
/**
* 消息接收者
*
* @author CYX
* @time 2016年12月19日下午7:56:45
*/
public class QueueReceiverMessage {
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER = ActiveMQConnection.DEFAULT_BROKER_URL;
public static final String TARGET = "hoo.mq.queue";
public static void main(String[] args) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER);
// 通过工厂创建一个链接
connection = factory.createQueueConnection();
// 启动链接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(TARGET);
// 创建消息接收者
QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (null != msg) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + " 接收 : " + map.getString("text"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
Thread.sleep(1000 * 100);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}
}
}
package com.activemq.activemq1_queue; import javax.jms.DeliveryMode; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionMetaData; /** * Queue队列方式发送点对点消息数据 * * Queue方式消息发送 * * @author CYX * @time 2016年12月19日下午7:29:07 */ public class QueueSenderMeesage { // 消息发送次数 public static final int SEND_SEND = 5; // tcp地址 public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// tcp://localhost:61616 // 消息队列名称 public static final String DESTINATION = "hoo.mq.queue"; public static final String USERNAME = ActiveMQConnection.DEFAULT_USER; public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; /** * 发送消息 * * @param session * @param sender * @throws Exception */ public static void sendMessage(QueueSession session, QueueSender sender) throws Exception { for (int i = 0; i < SEND_SEND; i++) { String message = "消息发送者 : " + i; MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println(map); sender.send(map); } } public static void main(String[] args) { QueueConnection connection = null; QueueSession session = null; try { // 创建链接工厂 QueueConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); // 通过工厂创建一个链接. connection = factory.createQueueConnection(); // 启动链接 connection.start(); // 创建一个session会话 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Queue queue = session.createQueue(DESTINATION); // 创建消息发送者 QueueSender sender = session.createSender(queue); // 设置持久模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送消息 sendMessage(session, sender); // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != session) { session.close(); } if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
普通方式
package com.activemq.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // 连接工厂,JMS用它创建链接 ConnectionFactory connectionFactory; // JMS客户端到JMS Provider的连接. Connection connection = null; // 一个发送或接收消息的线程 Session session; // 消息目的地,消息发送给谁. Destination destination; // 消费者,消息接受者. MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 从工厂方法中获取连接对象. connection = connectionFactory.createConnection(); // 启动连接. connection.start(); // 获取操作连接. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session. destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息时间,为了便与测试,设定100秒. TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("接收到消息 : " + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
package com.activemq.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // 连接工厂. ConnectionFactory connectionFactory; // JMS客户端到JMS Provider的连接. Connection connection = null; // 一个发送或者接受 Session session; // 消息的目的地,消息发送给谁. Destination destnation; // 消息发送者 MessageProducer producer; // 构造ConnectionFactory实例对象,此处采用ActiveMQ的实现jar. connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造工厂得到连接对象. connection = connectionFactory.createConnection(); // 启动. connection.start(); // 获取操作连接. session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session,注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destnation = session.createQueue("FirstQueue"); // 得到消息生成者(发送者) producer = session.createProducer(destnation); // 设置不持久化,根据实际项目需求决定. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } /** * 发送消息 * * @param session * @param producer * @throws Exception */ public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 0; i < SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMQ发送的消息 : " + i); // 发送消息到目的地. System.out.println("发送消息 : " + "ActiveMQ发送的消息 : " + i); producer.send(message); } } }
普通方式(带监听回调功能)
package com.activemq.activemq5;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息的生产者
*
* @author CYX
* @time 2016年12月19日下午5:30:44
*/
public class JMSProducer {
// 发送的消息数量
private static final int SENDUNM = 10;
public static void main(String[] args) {
// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息生产者
MessageProducer messageProducer;
try {
connection = ActiveMQPoolsUtil.getConnection();
System.out.println(connection);
// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为HelloWorld的消息队列.
destination = session.createQueue("HelloWorld");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息.
*
* @param session
* @param messageProducer
* 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < SENDUNM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 发送消息 : " + i);
// System.out.println("发送消息 : ActiveMQ 发送消息 : " + i);
// 通过消息生产者发送消息
messageProducer.send(message);
}
}
}
package com.activemq.activemq5;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.transport.TransportListener;
/**
* TransportListener : 消息传输监听
*
* 如果ActiveMQ整个集群也宕机,这时消息都无法发送,这可怎么办,还好ActiveMQ提供了消息传输监听(transportListener).<br>
* 可以对ActiveMQConnectionFactory添加一个Activemq的消息传输监听,该监听实现Activemq的TransportListener接口。<br>
* 该接口实现的监听方法有onCommand(),onException(),transportResumed(),transportInterupted()等监听方法。<br>
* 拥有这些方法就足以实时感知ActiveMQ服务器的状态了,当发现服务器无法连接时,就采取相应措施,如把消息存储在本地,当服务器恢复时再进行发送。<br>
*
* @author CYX
* @time 2016年12月24日下午3:14:49
*/
public class JMSConsumer implements TransportListener {
public static void main(String[] args) {
// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息的消费者
MessageConsumer messageConsumer;
try {
connection = ActiveMQPoolsUtil.getConnection();
// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建一个链接HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (null != textMessage) {
System.out.println("收到的消息 : " + textMessage.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 对消息传输命令进行监控
*/
@Override
public void onCommand(Object arg0) {
// TODO Auto-generated method stub
}
/**
* 对监控到的异常进行触发
*/
@Override
public void onException(IOException arg0) {
// TODO Auto-generated method stub
}
/**
* 当failover(失效备援)时触发
*/
@Override
public void transportInterupted() {
// TODO Auto-generated method stub
}
/**
* 监控到failover(失效备援)恢复后进行触发
*/
@Override
public void transportResumed() {
// TODO Auto-generated method stub
}
}
package com.activemq.activemq5; import javax.jms.Connection; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; public class ActiveMQPoolsUtil { // 默认链接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认链接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认链接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接 private static Connection connection; public ActiveMQPoolsUtil() { } static { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setUserName(USERNAME); activeMQConnectionFactory.setPassword(PASSWORD); activeMQConnectionFactory.setBrokerURL(BROKEURL); try { PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); // session数量 int maximumActive = 200; pooledConnectionFactory.setMaximumActiveSessionPerConnection(maximumActive); pooledConnectionFactory.setIdleTimeout(120); pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setBlockIfSessionPoolIsFull(true); connection = pooledConnectionFactory.createConnection(); // 必须start,否则无法接收消息 connection.start(); } catch (Exception e) { e.printStackTrace(); } } /** * 关闭链接 */ public static void close() { try { if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } /** * 获取一个链接 * * @return */ public static Connection getConnection() { return connection; } /** * 链接设置. * * @param connection */ public static void setConnection(PooledConnection connection) { ActiveMQPoolsUtil.connection = connection; } }
相关文章推荐
- Kafka 使用Java实现数据的生产和消费demo
- kafka java 生产消费程序demo示例
- 初次尝试ActiveMQ,实现简单的消息生产和消息消费
- ActiveMQ使用线程池实现消息的生产与消费 .
- kafka java 生产消费程序demo示例
- ActiveMQ使用线程池实现消息的生产与消费
- ActiveMQ 由浅入深之二(生产消费模式)
- 初次尝试ActiveMQ,实现简单的消息生产和消息消费
- 生产消费模型demo
- ActiveMQ消息队列之java消息生产与消费
- Apache ActiveMQ DEMO的搭建流程
- kafka java 生产消费程序demo示例
- ActiveMQ五种消息类型生产消费
- Apache ActiveMQ启动DEMO错误
- kafka java 生产消费demo
- kafka java 生产消费程序demo示例
- ActiveMQ使用线程池实现消息的生产与消费
- kafka java 生产消费程序demo示例
- Kafka学习8_kafka java 生产消费程序demo示例
- kafka_2.11-0.8.2.1+java 生产消费程序demo示例