您的位置:首页 > 其它

ActiveMQ入门示例

2015-11-03 14:39 351 查看
如果你是第一次使用ActiveMQ,可以先通过下面的链接了解一下JMS、MQ、ActiveMQ之间的关系。

http://blog.csdn.net/u011983531/article/details/49522691

下面,我们从ActiveMQ的下载安装、通信方式与收发步骤、入门示例、管理界面四个方面实现我们的第一个ActiveMQ项目。

一、ActiveMQ的下载安装

到官网下载ActiveMQ:http://activemq.apache.org/download.html

下载完成后,解压缩就可以完成安装了,如下图所示:



下面我们进入bin目录,双击activemq.bat就可以启动ActiveMQ了。启动后运行界面如下:



二、通信方式与收发步骤

点对点方式(point-to-point)

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息。具体点就是Sender Client发送消息到Message Queue ,而 receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。

发布/订阅 方式(publish/subscriber Messaging)

发布/订阅方式用于多接收客户端的方式。作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

发送消息的基本步骤:

(1)创建连接使用的工厂类ConnectionFactory

(2)使用连接工厂建立连接Connection,并启动

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

接受消息的基本步骤:

(1)创建连接使用的工厂类ConnectionFactory

(2)使用连接工厂建立连接Connection,并启动

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息接收者MessageReceiver

(5)使用消息接收者MessageReceiver接受消息,需要用setMessageListener将实现MessageListener接口的对象绑定到MessageReceiver上。

两种通信方式使用时类之间的区别:



三、入门示例

点对点入门示例

项目结构如下:



1、MessageSender(用于发送消息)

package com.ghs.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MessageSender {

public static void main(String[] args) {
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
try {
connection = ConnUtils.createConnection(SystemSettings.BROKER_URL);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(SystemSettings.QUEUE);
producer = session.createProducer(destination);
sendMessage(session,producer);
} catch (JMSException e) {
e.printStackTrace();
}finally{
ConnUtils.free(session, connection);
}
}

public static void sendMessage(Session session,MessageProducer producer){
for(int i=0; i<10; i++){
String text = "第"+i+"条消息";
try {
TextMessage message = session.createTextMessage(text);
System.out.println("发送消息:"+text);
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}


2、MessageReceiver(用于接收消息)

package com.ghs.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MessageReceiver {

public static void main(String[] args) {
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
try {
connection = ConnUtils.createConnection(SystemSettings.BROKER_URL);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(SystemSettings.QUEUE);
consumer = session.createConsumer(destination);
receiverMessage(consumer);
} catch (JMSException e) {
e.printStackTrace();
}finally{
ConnUtils.free(session, connection);
}
}

public static void receiverMessage(MessageConsumer consumer){
while (true) {
TextMessage message;
try {
message = (TextMessage) consumer.receive(500000);
if (message != null) {
System.out.println("收到:" + message.getText());
} else {
break;
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}


另外,还有一个连接工具类和设置工具类。

//用于建立和释放连接
package com.ghs.jms;

import java.net.URI;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConnUtils {

public static Connection createConnection(String user,String password,URI uri) throws JMSException{
ConnectionFactory factory = new ActiveMQConnectionFactory(user,password,uri);
return factory.createConnection();
}

public static Connection createConnection(URI uri) throws JMSException{
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return factory.createConnection();
}

public static Connection createConnection(String brokerUrl) throws JMSException{
ConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
return factory.createConnection();
}

public static void free(Session session,Connection connection){
if(session!=null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally{
if(connection == null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
}

//定义了URI、队列名等常量
package com.ghs.jms;

public class SystemSettings {

public static final String BROKER_URL = "tcp://localhost:61616";
public static final String QUEUE = "FirstQueue";
}


我们先运行MessageReceiver,然后运行MessageSender,结果如下:





注意:我们是通过下面的控制台按钮来切换控制台的



源码下载:

http://download.csdn.net/detail/u011983531/9237079

四、ActiveMQ管理控制台

在下载安装部分启动ActiveMQ后,访问http://localhost:8161/admin/就可以进入ActiveMQ的管理控制台,如下:



进入菜单Queues,我们可以看到,里面有我们新建的队列FirstQueue。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息