ActiveMQ学习 第一篇 入门篇
2016-03-23 00:53
579 查看
ActiveMQ 学习一 入门篇
实际应用中有多种MQ,包括MSMQ,Active MQ,Jboss MQ等。其中ActiveMQ是一个开源的JMS服务器,本文简单介绍java环境下Active MQ入门。
准备工作
1、下载ActiveMQ
http://pan.baidu.com/s/1c1qwlJq 我下载的是apache-activemq-5.13.2,下载后解压缩即可。
2、启动ActiveMQ
在bin目录下,找到activemq.bat命令,双击启动即可。其中需要注意的是,如果是64位机器,需要启动win64文件夹下的activemq.bat命令。
3、测试是否启动成功
在浏览器输入http://localhost:8161/admin 用户名/密码:admin/admin。
这些都是默认配置,都可以在conf文件夹中进行配置。我们先用默认的进行测试就行了
4、简单java测试项目
/**
* 消息产生者向JMS发送消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息
* @throws JMSException
*/
public class SendMessage {
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
//ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory = null;
//Connection:JMS客户端到JMS Provider的连接
Connection connection =null;
//Session:一个发送或接收信息的线程
Session session = null;
//Destination:消息的目的地,消息发送给谁
Destination destination = null;
//MessageProducer:消息发送者
MessageProducer producer = null;
try{
connectionFactory =new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,Constants.URL);
connection = (Connection)connectionFactory.createConnection();
connection.start();
session = (Session)connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Constants.QUEUE_NAME);
producer = session.createProducer(destination);
//设置不持久化,此处可根据实际项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//TextMessage:消息体
// TextMessage message = session.createTextMessage(expectedBody);
// message.setStringProperty("headname", "remoteB");
// producer.send(message);
sendMessage(session,producer);
session.commit();
}catch(Exception e){
e.printStackTrace();
}finally{
if(null!=connection){
connection.close();
}
}
}
private static void sendMessage(Session session, MessageProducer producer) throws JMSException {
for(int i=1;i<=Constants.SEND_NUMBER;i++){
TextMessage message = session.createTextMessage("ActiveMq发送的消息"+i);
producer.send(message);
}
}
public static void main(String[] args){
SendMessage sndMsg = new SendMessage();
try{
sndMsg.sendMessage();
}catch(Exception ex){
System.out.println(ex.toString());
}
}
}
/**
* 消息消费者从JMS接受消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。
*/
public class ReceiveMessage {
public void receiveMessage() {
//JMS用它创建连接
ConnectionFactory connectionFactory = null;
//JMS客户端到JMS Provider的连接
Connection connection = null;
//一个发送或接收消息的线程
Session session = null;
//消息目的地
Destination destination = null;
//消费者,消息接收者
MessageConsumer consumer;
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,Constants.URL);
connection = connectionFactory.createConnection();
connection.start();
/**在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
• AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。
• CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。
• DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。
• SESSION_TRANSACTED
**/
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Constants.QUEUE_NAME);
consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection, session, consumer);
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
}
}
protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)
throws JMSException {
for (int i = 0; i <=Constants.SEND_NUMBER;i++) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
public class Constants {
public static final String URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "choice.queue";
public static int SEND_NUMBER = 5;
}
Jar包管理:
a.可以引入maven依赖
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>
b.导入 activemq-all-5.13.2.jar
5运行项目
首先,运行SendMessage的main主函数,没有错误后,刷新主页面,可以看到队列choice.queue中有5条消息。
第二,运行ReceiveMessage的Main主函数,效果如下图:
第三,刷新主页面,可以看到队列中为读消息变成2条:
后记:简单的activeMQ入门篇就到这里啦,下期第二篇开讲springmvc集成activeMQ,敬请关注。
实际应用中有多种MQ,包括MSMQ,Active MQ,Jboss MQ等。其中ActiveMQ是一个开源的JMS服务器,本文简单介绍java环境下Active MQ入门。
准备工作
1、下载ActiveMQ
http://pan.baidu.com/s/1c1qwlJq 我下载的是apache-activemq-5.13.2,下载后解压缩即可。
2、启动ActiveMQ
在bin目录下,找到activemq.bat命令,双击启动即可。其中需要注意的是,如果是64位机器,需要启动win64文件夹下的activemq.bat命令。
3、测试是否启动成功
在浏览器输入http://localhost:8161/admin 用户名/密码:admin/admin。
这些都是默认配置,都可以在conf文件夹中进行配置。我们先用默认的进行测试就行了
4、简单java测试项目
/**
* 消息产生者向JMS发送消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息
* @throws JMSException
*/
public class SendMessage {
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
//ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory = null;
//Connection:JMS客户端到JMS Provider的连接
Connection connection =null;
//Session:一个发送或接收信息的线程
Session session = null;
//Destination:消息的目的地,消息发送给谁
Destination destination = null;
//MessageProducer:消息发送者
MessageProducer producer = null;
try{
connectionFactory =new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,Constants.URL);
connection = (Connection)connectionFactory.createConnection();
connection.start();
session = (Session)connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Constants.QUEUE_NAME);
producer = session.createProducer(destination);
//设置不持久化,此处可根据实际项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//TextMessage:消息体
// TextMessage message = session.createTextMessage(expectedBody);
// message.setStringProperty("headname", "remoteB");
// producer.send(message);
sendMessage(session,producer);
session.commit();
}catch(Exception e){
e.printStackTrace();
}finally{
if(null!=connection){
connection.close();
}
}
}
private static void sendMessage(Session session, MessageProducer producer) throws JMSException {
for(int i=1;i<=Constants.SEND_NUMBER;i++){
TextMessage message = session.createTextMessage("ActiveMq发送的消息"+i);
producer.send(message);
}
}
public static void main(String[] args){
SendMessage sndMsg = new SendMessage();
try{
sndMsg.sendMessage();
}catch(Exception ex){
System.out.println(ex.toString());
}
}
}
/**
* 消息消费者从JMS接受消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。
*/
public class ReceiveMessage {
public void receiveMessage() {
//JMS用它创建连接
ConnectionFactory connectionFactory = null;
//JMS客户端到JMS Provider的连接
Connection connection = null;
//一个发送或接收消息的线程
Session session = null;
//消息目的地
Destination destination = null;
//消费者,消息接收者
MessageConsumer consumer;
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,Constants.URL);
connection = connectionFactory.createConnection();
connection.start();
/**在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
• AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。
• CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。
• DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。
• SESSION_TRANSACTED
**/
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(Constants.QUEUE_NAME);
consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection, session, consumer);
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
}
}
protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)
throws JMSException {
for (int i = 0; i <=Constants.SEND_NUMBER;i++) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
public class Constants {
public static final String URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "choice.queue";
public static int SEND_NUMBER = 5;
}
Jar包管理:
a.可以引入maven依赖
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>
b.导入 activemq-all-5.13.2.jar
5运行项目
首先,运行SendMessage的main主函数,没有错误后,刷新主页面,可以看到队列choice.queue中有5条消息。
第二,运行ReceiveMessage的Main主函数,效果如下图:
第三,刷新主页面,可以看到队列中为读消息变成2条:
后记:简单的activeMQ入门篇就到这里啦,下期第二篇开讲springmvc集成activeMQ,敬请关注。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- mongo实现消息队列
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序