您的位置:首页 > 其它

2、ActiveMQ 学习记录 之 基本通信方式

2017-04-06 11:07 411 查看
在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activemq的基本通信方式进行探讨。activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。

一、 基础流程

在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。

    按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.

在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:

1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

2. 利用factory构造JMS connection

3. 启动connection

4. 通过connection创建JMS session.

5. 指定JMS destination.

6. 创建JMS producer或者创建JMS message并提供destination.

7. 创建JMS consumer或注册JMS message listener.

8. 发送和接收JMS message.

9. 关闭所有JMS资源,包括connection, session, producer, consumer等。



二、publish-subscribe发布订阅模式

   发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。

在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、

潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的

报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。

这种关系如下图所示:



现在,假定我们用前面讨论的场景来写一个简单的示例。我们首先需要定义的是publisher.

publisher

      publisher是属于发布信息的一方,它通过定义一个或者多个topic,然后给这些topic发送消息。

    publisher的构造函数如下:

public class JMSPublisher {

private static final String USER=ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;

private static final int SENDNUM=10;

public static void main(String[] args) throws JMSException {
ConnectionFactory factory;
Connection conn;
Session session;
Destination des;
MessageProducer producer;

//1、获取链接工厂
factory = new org.apache.activemq.ActiveMQConnectionFactory(JMSPublisher.
4000
USER, JMSPublisher.PASSWORD, JMSPublisher.BROKEURL);
//2、获取链接
conn = factory.createConnection();
//3、开启链接
conn.start();
//4、获取会话
session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//5、创建队列
des=session.createTopic("topic1");
//6、创建消息生产者
producer=session.createProducer(des);

//7、发送消息
for(int i=0;i<JMSPublisher.SENDNUM;i++){
TextMessage message=session.createTextMessage("生产者生产消息:   第"+i+"条==========");
System.out.println("生产者生产消息:   第"+i+"条==========");
producer.send(message);
}

//8、提交
session.commit();
}

}


  Consumer1

Consumer的代码也很类似,具体的步骤无非就是1.初始化资源。 2. 接收消息。 3. 必要的时候关闭资源。

public class JMSConsumer {

private static final String USER=ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;

private static final int SENDNUM=10;

public static void main(String[] args) throws JMSException {
ConnectionFactory factory;
Connection conn;
Session session;
Destination des;
MessageConsumer consumer;

//1、实例化连接工厂
factory=new ActiveMQConnectionFactory(JMSConsumer.USER, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
//2、获取链接
conn=factory.createConnection();
//3、开启链接
conn.start();
//4、创建会话
session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//5、消息目的地
des=session.createTopic("topic1");
//6、创建消息消费者
consumer = session.createConsumer(des);
//7、获取消息
consumer.setMessageListener(new MyTopicListener1());
}
}


Consumer2

public class JMSConsumer2 {

private static final String USER=ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) throws JMSException {
ConnectionFactory connFactory;
Connection conn;
Session session;
Destination des;
MessageConsumer consumer;

//1、连接工厂
connFactory = new ActiveMQConnectionFactory(JMSConsumer2.USER, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
//2、连接
conn = connFactory.createConnection();
//3、开启连接
conn.start();
//4、会话
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//5、目的
des = session.createTopic("topic_2");
//6、消费者
consumer = session.createConsumer(des);

//7、监听
consumer.setMessageListener(new MyMessageListener2());

}
}

在发布订阅模式中,消费者要主动监听生产者的生产消息

MessageListener1:

 Listener对象的职责很简单,主要就是处理接收到的消息:

public class MyMessageListener implements MessageListener{

public void onMessage(Message message) {
if(message!=null){
try {
System.out.println("111111111111消费者 接收消息 : "+((TextMessage)message).getText()+"<<<<============");
} catch (JMSException e) {
e.printStackTrace();
}

}
}

}

MessageListener2:

public class MyMessageListener2 implements MessageListener{

public void onMessage(Message message) {
if(message!=null){
try {
System.out.println("22222222222消费者 接收消息 : "+((TextMessage)message).getText()+"<<<<============");
} catch (JMSException e) {
e.printStackTrace();
}

}
}

}
结果:







它实现了MessageListener接口,里面的onMessage方法就是在接收到消息之后会被调用的方法。

    现在,通过实现前面的publisher和consumer我们已经实现了pub-sub模式的一个实例。仔细回想它的步骤的话,

主要就是要两者设定一个共同的topic,有了这个topic之后他们可以实现一方发消息另外一方接收。另外,为了连接到具体的

message server,这里是使用了连接tcp://localhost:16161作为定义ActiveMQConnectionFactory的路径。在publisher端通过

session创建producer,根据指定的参数创建destination,然后将消息和destination作为producer.send()方法的参数发消息。

在consumer端也要创建类似的connection,session。通过session得到destination,再通过session.createConsumer(destination)

来得到一个MessageConsumer对象。有了这个MessageConsumer我们就可以自行选择是直接同步的receive消息还是注册listener了。




三、P2P

 

p2p的过程则理解起来更加简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,

就这么简单。在实际应用中因为有多个用户对使用p2p的链路,它的通信场景如下图所示:



我们再来看看一个p2p的示例:

    在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者

和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。

Producer:

public class JMSProducer {

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接用户名
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
private static final int SENDNUM = 10;//发送消息数量

public static void main(String[] args) throws Exception {
ConnectionFactory connFactory;//连接工厂
Connection conn;//连接
Session session;//会话	接受或者发送消息的线程
Destination destination;//消息目的地
MessageProducer producer;//消息生产者

//1、实例化连接工厂
connFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
//2、获取连接
conn = connFactory.createConnection();
//3、启动连接
conn.start();
//4、创建会话--是否开启事务--消息确认方式
session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE );
//5、创建消息队列
destination = session.createQueue("FirstQueue1");
//6、创建消息生产者
producer = session.createProducer(destination);

//7、发送消息
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage textMessage = session.createTextMessage("ActiveMQ 发送消息:"+i);
System.out.println("生产者生产消息:"+textMessage.getText());
producer.send(textMessage);
}

session.commit();
if(conn!=null){
conn.close();
}
}
}


Consumer:

public class JMSConsumer {

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接用户名
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

public static void main(String[] args) throws Exception {
ConnectionFactory connFactory;//连接工厂
Connection conn;//连接
Session session;//会话	接受或者发送消息的线程
Destination destination;//消息目的地
MessageConsumer consumer;//消息生产者

//1、实例化连接工厂
connFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNA
d7e5
ME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
//2、获取连接
conn = connFactory.createConnection();
//3、启动连接
conn.start();
//4、创建会话--是否开启事务--消息确认方式
session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE );
//5、创建消息队列
destination = session.createQueue("FirstQueue1");
//6、创建消费者
consumer = session.createConsumer(destination);

//7、注册消息监听
consumer.setMessageListener(new MyMessageListener());

}
}

Listener:

public class MyMessageListener implements javax.jms.MessageListener {

public void onMessage(Message message) {
if(message!=null){
try {
System.out.println("消费者监听消息:---->"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

  这里代码和前面pub-sub的具体实现代码非常相似,就不再赘述。

 

      现在如果我们比较一下pub-sub和p2p模式的具体实现步骤的话,我们会发现他们基本的处理流程都是类似的,

除了在pub-sub中要通过createTopic来设置topic,而在p2p中要通过createQueue来创建通信队列。他们之间存在着很多的重复之处,

在具体的开发过程中,我们是否可以进行一些工程上的优化呢?别急,后面我们会讨论到的。

  回顾前面三种基本的通信方式,我们会发现,他们都存在着一定的共同点,比如说都要初始化ConnectionFactory, Connection,

Session等。在使用完之后都要将这些资源关闭。如果每一个实现它的通信端都这么写一通的话,其实是一种简单的重复。

从工程的角度来看是完全没有必要的。那么,我们有什么办法可以减少这种重复呢?

    一种简单的方式就是通过工厂方法封装这些对象的创建和销毁,然后简单的通过调用工厂方法的方式得到他们。另外,

既然基本的流程都是在开头创建资源在结尾销毁,我们也可以采用Template Method模式的思路。通过继承一个抽象类,

在抽象类里提供了资源的封装。所有继承的类只要实现怎么去使用这些资源的方法就可以了。Spring中间的JMSTemplate就提供了

这种类似思想的封装。

在上诉的代码中几个常量需要写一下:

1、

ActiveMQConnection.DEFAULT_USER 表示默认连接用户名

ActiveMQConnection.DEFAULT_PASSWORD;表示默认连接用户名

ActiveMQConnection.DEFAULT_BROKER_URL;表示默认地址

2、

Session.AUTO_ACKNOWLEDGE

当客户端从
receive 或 onMessage成功返回时,Session 自动签收客户端的这条消息的收条。

Session.CLIENT_ACKNOWLEDGE

客户端通过调用消息的
acknowledge 方法签收消息。message.acknowledge();

客户通过消息的acknowledge 方法确认消

息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被

消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消

费者消费了10 个消息,然后确认第5 个消息,那么所有10 个消息都被确

认。

Session.DUPS_ACKNOWLEDGE。

该选择只是会话迟钝的确认消息的提交。如

果JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的

消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置为

true。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: