您的位置:首页 > 其它

activeMQ 示例

2015-12-11 15:33 337 查看
转自
这里

1.JMS介绍

JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1)JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

2) 消息管理对象提供对消息进行操作的API。JMS API中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3)消息的生产者和消费者。消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue
receiver)

4)消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:

  消息头(header)――JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

  属性(property)――用来添加删除消息头以外的附加信息。

  消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

2.Messages 通信方式

上面提到JMS通信方式分为点对点通信和发布/订阅方式

1)点对点方式(point-to-point)

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue
,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2)发布/订阅 方式(publish/subscriber Messaging)

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message
listener 接口的onMessage 方法。

3.为什么选用ActiveMQ

1)ActiveMQ是一个开放源码

2)基于Apache 2.0 licenced 发布并实现了JMS 1.1。

3)ActiveMQ现在已经和作为很多项目的异步消息通信核心了

4)在很多中小型项目中采用ActiveMQ+SPRING+TOMCAT开发模式。

4.编程模式

4.1消息产生者向JMS发送消息的步骤

(1)创建连接使用的工厂类JMS ConnectionFactory

(2)使用管理对象JMS ConnectionFactory建立连接Connection

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

4.2消息消费者从JMS接受消息的步骤

(1)创建连接使用的工厂类JMS ConnectionFactory

(2)使用管理对象JMS ConnectionFactory建立连接Connection

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver

(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver

消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

我下载的时候是 ActiveMQ 5.8.0 Release版

2.运行ActiveMQ

解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

3.创建Eclipse项目并运行

创建java project:ActiveMQ-5.8,新建lib文件夹

打开apache-activemq-5.8.0\lib目录

拷贝

activemq-broker-5.8.0.jar

activemq-client-5.8.0.jar

geronimo-j2ee-management_1.1_spec-1.0.1.jar

geronimo-jms_1.1_spec-1.1.1.jar

slf4j-api-1.6.6.jar

这5个jar文件到lib文件夹中,并Build Path->Add to Build Path

目录结构



Sender :

public class Sender {
private static final Integer SEND_NUMBER = 5;

public static void main(String[] args) {
ConnectionFactory connectionFactory;

// Provider 的连接
Connection connection = null;// Connection :JMS 客户端到JMS
Session session;  // Session: 一个发送或接收消息的线程
Destination destination; // Destination :消息的目的地;消息发送给谁.
MessageProducer producer; // MessageProducer:消息发送者

// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); //初始化 connectionFactory

try {
connection = connectionFactory.createConnection();//创建connection
connection.start();

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//使用连接Connection 建立会话Session

destination = session.createQueue("FirstQueue");//使用会话Session和管理对象Destination
producer = session.createProducer(destination);//使用会话Session创建消息生产者MessageSender
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置发送类型

sendMessage(session,producer);

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}

}

public static void sendMessage(Session session,MessageProducer producer) throws JMSException{
//		for(int i = 0 ; i < SEND_NUMBER ; i ++){
//			TextMessage message = session.createTextMessage("ActiveMq 发送的消息"  + i);
//			System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
//            producer.send(message);  // 发送消息到目的地方
//		}

Scanner input = new Scanner(System.in);
String val = null;       // 记录输入的字符串
do{
System.out.print("请输入:");
val = input.next();       // 等待输入值
TextMessage message = session.createTextMessage("ActiveMq 发送的消息"  + val);
System.out.println("发送消息:" + "ActiveMq 发送的消息" + val);
producer.send(message);  // 发送消息到目的地方
session.commit();
}while(!val.equals("#"));   // 如果输入的值不是#就继续输入
System.out.println("你输入了\"#\",程序已经退出!");
input.close(); // 关闭资源
}
}


Receiver

public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;

// Provider 的连接
Connection connection = null;// Connection :JMS 客户端到JMS
Session session; // Session: 一个发送或接收消息的线程
Destination destination; // Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
MessageConsumer consumer;

// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 初始化
// connectionFactory

try {
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 使用连接Connection 建立会话Session
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
// 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000); // 使用消息消费者MessageReceiver接受消息
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: