您的位置:首页 > 运维架构 > Apache

Apache ActiveMQ - 4 - ActiveMQ生产消费Demo

2017-10-31 10:16 225 查看
这篇笔记,记录几种ActiveMQ生产和消费的实例Demo。

1. 使用JMS发布和订阅消息

2. Queue队列方式发送点对点消息数据

3. 普通方式

4. 普通方式(带监听回调功能)

废话不多说,下面直接看代码...

使用JMS发布和订阅消息

package com.activemq.activemq1_jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息接受者
*
* @author CYX
* @time 2016年12月13日下午4:27:31
*/
public class MessageReceiver {

/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESTINATION = "hoo.mq.queue";

public static void main(String[] args) throws Exception {
MessageReceiver.run();
}

public static void run() throws Exception {

Connection connection = null;
Session session = null;

try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动链接
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者.
MessageConsumer consumer = session.createConsumer(destination);

while (true) {
// 接收数据的时间(等待)100ms
Message message = consumer.receive(1000 * 1000);

TextMessage text = (TextMessage) message;
if (null != text) {
System.out.println("text : " + text.getText());
System.out.println("接收到的 : " + text);
} else {
break;
}
}
// 提交会话.
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}

}

}

package com.activemq.activemq1_jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息发送者<br>
* 使用JMS方式发送接收消息<br>
* @author CYX
* @time 2016年12月13日下午4:13:58
*/
public class MessageSender {

/** 发送次数 */
public static final int SEND_NUM = 5;
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESRINATION = "hoo.mq.queue";

public static void main(String[] args) {
try {
MessageSender.run();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "我是消息 : " + (i + 1) + "....";
TextMessage text = session.createTextMessage(message);

System.out.println("message : " + message);
producer.send(text);

}
}

public static void run() throws Exception {

Connection connection = null;
Session session = null;

try {
// 创建链接工厂.
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个链接.
connection = factory.createConnection();
// 启动链接.
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESRINATION);
// 创建消息制作者.
MessageProducer producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交会话.
session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}

}


Queue队列方式发送点对点消息数据

package com.activemq.activemq1_queue;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;

/**
* 消息接收者
*
* @author CYX
* @time 2016年12月19日下午7:56:45
*/
public class QueueReceiverMessage {

public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
public static final String BROKER = ActiveMQConnection.DEFAULT_BROKER_URL;
public static final String TARGET = "hoo.mq.queue";

public static void main(String[] args) throws Exception {

QueueConnection connection = null;
QueueSession session = null;

try {
// 创建链接工厂
QueueConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER);
// 通过工厂创建一个链接
connection = factory.createQueueConnection();
// 启动链接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(TARGET);
// 创建消息接收者
QueueReceiver receiver = session.createReceiver(queue);

receiver.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message msg) {
if (null != msg) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + " 接收 : " + map.getString("text"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
});

Thread.sleep(1000 * 100);
session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}

}

}

package com.activemq.activemq1_queue;

import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQConnectionMetaData;

/**
* Queue队列方式发送点对点消息数据
*
* Queue方式消息发送
*
* @author CYX
* @time 2016年12月19日下午7:29:07
*/
public class QueueSenderMeesage {

// 消息发送次数
public static final int SEND_SEND = 5;
// tcp地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// tcp://localhost:61616
// 消息队列名称
public static final String DESTINATION = "hoo.mq.queue";
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

/**
* 发送消息
*
* @param session
* @param sender
* @throws Exception
*/
public static void sendMessage(QueueSession session, QueueSender sender) throws Exception {
for (int i = 0; i < SEND_SEND; i++) {

String message = "消息发送者 : " + i;
MapMessage map = session.createMapMessage();
map.setString("text", message);
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);

}
}

public static void main(String[] args) {
QueueConnection connection = null;
QueueSession session = null;

try {

// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 通过工厂创建一个链接.
connection = factory.createQueueConnection();
// 启动链接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(DESTINATION);
// 创建消息发送者
QueueSender sender = session.createSender(queue);
// 设置持久模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送消息
sendMessage(session, sender);
// 提交会话
session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

}


普通方式

package com.activemq.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

public static void main(String[] args) {

// 连接工厂,JMS用它创建链接
ConnectionFactory connectionFactory;
// JMS客户端到JMS Provider的连接.
Connection connection = null;
// 一个发送或接收消息的线程
Session session;
// 消息目的地,消息发送给谁.
Destination destination;
// 消费者,消息接受者.
MessageConsumer consumer;

connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");

try {
// 从工厂方法中获取连接对象.
connection = connectionFactory.createConnection();
// 启动连接.
connection.start();
// 获取操作连接.
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 获取session.
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);

while (true) {
// 设置接收者接收消息时间,为了便与测试,设定100秒.
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("接收到消息 : " + message.getText());
} else {
break;
}
}

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

}

package com.activemq.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

private static final int SEND_NUMBER = 5;

public static void main(String[] args) {

// 连接工厂.
ConnectionFactory connectionFactory;
// JMS客户端到JMS Provider的连接.
Connection connection = null;
// 一个发送或者接受
Session session;
// 消息的目的地,消息发送给谁.
Destination destnation;
// 消息发送者
MessageProducer producer;

// 构造ConnectionFactory实例对象,此处采用ActiveMQ的实现jar.
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");

try {
// 构造工厂得到连接对象.
connection = connectionFactory.createConnection();
// 启动.
connection.start();
// 获取操作连接.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 获取session,注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destnation = session.createQueue("FirstQueue");
// 得到消息生成者(发送者)
producer = session.createProducer(destnation);
// 设置不持久化,根据实际项目需求决定.
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

sendMessage(session, producer);
session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMQ发送的消息 : " + i);
// 发送消息到目的地.
System.out.println("发送消息 : " + "ActiveMQ发送的消息 : " + i);
producer.send(message);
}
}

}


普通方式(带监听回调功能)
package com.activemq.activemq5;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* 消息的生产者
*
* @author CYX
* @time 2016年12月19日下午5:30:44
*/
public class JMSProducer {

// 发送的消息数量
private static final int SENDUNM = 10;

public static void main(String[] args) {

// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息生产者
MessageProducer messageProducer;

try {

connection = ActiveMQPoolsUtil.getConnection();
System.out.println(connection);

// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为HelloWorld的消息队列.
destination = session.createQueue("HelloWorld");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 发送消息
sendMessage(session, messageProducer);

session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

/**
* 发送消息.
*
* @param session
* @param messageProducer
* 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {

for (int i = 0; i < SENDUNM; i++) {

TextMessage message = session.createTextMessage("ActiveMQ 发送消息 : " + i);
// System.out.println("发送消息 : ActiveMQ 发送消息 : " + i);
// 通过消息生产者发送消息
messageProducer.send(message);

}
}

}


package com.activemq.activemq5;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.transport.TransportListener;

/**
* TransportListener : 消息传输监听
*
* 如果ActiveMQ整个集群也宕机,这时消息都无法发送,这可怎么办,还好ActiveMQ提供了消息传输监听(transportListener).<br>
* 可以对ActiveMQConnectionFactory添加一个Activemq的消息传输监听,该监听实现Activemq的TransportListener接口。<br>
* 该接口实现的监听方法有onCommand(),onException(),transportResumed(),transportInterupted()等监听方法。<br>
* 拥有这些方法就足以实时感知ActiveMQ服务器的状态了,当发现服务器无法连接时,就采取相应措施,如把消息存储在本地,当服务器恢复时再进行发送。<br>
*
* @author CYX
* @time 2016年12月24日下午3:14:49
*/
public class JMSConsumer implements TransportListener {

public static void main(String[] args) {

// 链接
Connection connection = null;
// 会话,接收或者发送消息的线程
Session session;
// 消息的目的地
Destination destination;
// 消息的消费者
MessageConsumer messageConsumer;

try {

connection = ActiveMQPoolsUtil.getConnection();

// 启动链接
connection.start();
// 创建Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建一个链接HelloWorld的消息队列
destination = session.createQueue("HelloWorld");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);

while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (null != textMessage) {
System.out.println("收到的消息 : " + textMessage.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}

}

/**
* 对消息传输命令进行监控
*/
@Override
public void onCommand(Object arg0) {
// TODO Auto-generated method stub

}

/**
* 对监控到的异常进行触发
*/
@Override
public void onException(IOException arg0) {
// TODO Auto-generated method stub

}

/**
* 当failover(失效备援)时触发
*/
@Override
public void transportInterupted() {
// TODO Auto-generated method stub

}

/**
* 监控到failover(失效备援)恢复后进行触发
*/
@Override
public void transportResumed() {
// TODO Auto-generated method stub

}

}
package com.activemq.activemq5;

import javax.jms.Connection;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

public class ActiveMQPoolsUtil {

// 默认链接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认链接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认链接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

// 链接
private static Connection connection;

public ActiveMQPoolsUtil() {

}

static {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setUserName(USERNAME);
activeMQConnectionFactory.setPassword(PASSWORD);
activeMQConnectionFactory.setBrokerURL(BROKEURL);

try {
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
// session数量
int maximumActive = 200;

pooledConnectionFactory.setMaximumActiveSessionPerConnection(maximumActive);
pooledConnectionFactory.setIdleTimeout(120);
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
connection =  pooledConnectionFactory.createConnection();
// 必须start,否则无法接收消息
connection.start();

} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 关闭链接
*/
public static void close() {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取一个链接
*
* @return
*/
public static Connection getConnection() {
return connection;
}

/**
* 链接设置.
*
* @param connection
*/
public static void setConnection(PooledConnection connection) {
ActiveMQPoolsUtil.connection = connection;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: