您的位置:首页 > 其它

active MQ消息治理中间件

2018-02-22 11:02 344 查看
activeMQ的使用方式相对比较简单,有如下几个步骤:创建连接工厂
创建连接
创建会话
创建目的地
创建生产者或消费者
生产或消费消息
关闭生产或消费者、关闭会话、关闭连接
我们主要针对他的消息传播机制和持久化方式做一个简单的学习。在会用的同时我们也需要理解一些基本的概念,这样才不至于在出错后无从下手。

1.activemq服务器工作模型

我们先看一下消息发送的时序图:

ConnectionFactory 对象创建一个连接工厂,消息的发送和接受服务均由此进行;ConnectionFactory 创建一个活动Connection作为当前使用的连接;      Session 是一个用于生成和使用消息的单线程上下文,它用于创建发送的生产者和接收消息的消费者,并为所发送的消息定义发送顺序。会话通过大量确认选项或通过事务来支持可靠传送。      客户端使用 MessageProducer 向指定的物理目标发送消息,生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级和有效期值,以控制生产者向物理目标发送的所有消息;     消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册 MessageListener 来实现。当会话线程调用 MessageListener 对象的 onMessage 方法时,客户端将使用消息。

2.ActiveMQ消息传送模型

ActiveMQ 支持两种消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布 /订阅模型)PTP(使用Queue即队列目标)           消息从一个生产者传送至一个消费者。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者。Pub/Sub(使用Topic即主题目标)          消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。

3.消息选择器

      ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。      消息选择器是根据 header 和 properties 允许客户端选择性的制定需要接收的消息,消息选择器是无法利用 消息主体(Body)进行过滤的。无论你的消息主题是什么类型, 文本、或者对象、或者键值对。下面我们讲一下消息选择器的语法以及使用规范:可接收的类型包括:byte,int,double,boolean,String;属性标识符定义:
变量名与java定义一样;

要么在heads中定义 要么在 properties中定义,如果在sender中是在heads中定义而receiver中却从properties中寻找的话,找不到的情况下他是不会自动去heads中寻找的,而是会返回null;

根据不同类型的变量选择不同的方法:

message.setIntProperty("test",14);

那么在接收端可以对该变量进行拦截:

session.createConsumer(destination,"test > 14");

属性标志符是区分大小写的;
拦截器中的部分表示方式:
可以是条件表达式

可以是算术表达式

可以是比较运算和逻辑运算组成的表达式
支持 () 左右括号;支持逻辑运算的优先顺序表达式 例如: NOT , AND , OR;比较运算符有: = , > , >= , < , <= , <> (not equal);Eg:
标识符是null
"prop_name IS NULL"
标识符非空 not null
"prop_name IS NOT NULL"
"age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
"Country NOT IN (' UK', 'US', 'France') "
代码很简单,只需要在Sender端做如下改写:
TextMessage message = session.createTextMessage();
message.setIntProperty("test",14);
message.setText("test");
Receiver端:
consumer = session.createConsumer(destination,"test > 14");
对发送端的特定字符做一个判断符合条件即被拦截

4.消息确认机制

jms消息只有在被确认之后才认为成功消费了这条消息。消息的成功消费通常包括三个步骤:(1)client接收消息(2)client处理消息(3)消息被确认(也就是client给一个确认消息)      在事务性会话中当一个事务被提交的时候,确认自动发生,和应答模式没关系,这个值可以随便写。(这里多提一句异步消息接收中不能使用事务性会话)。在非事务性会话中消息何时被确认取决于创建的session中设置的消息应答模式(acknowledge model)该参数有三个值:Session.AUTO_ACKNOWLEDGE:当client端成功的从receive方法或从onMessage(Message message) 方法返回的时候,会话自动确认client收到消息。
Session.CLIENT_ACKNOWLEDGE: 客户单通过调用acknowledge方法来确认客户端收到消息。但需要注意在这种应答模式下,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已消费的其他消息。比如一个消费者已经消费了10条消息,然后确认了第5条消息被消费,则这10条都被确认消费了。、
acknowledge()通知方法是在Message对象上,同步接收,调用acknowledge()方法进行确认如下所示:consumer = session.createConsumer(queue); 
Message message = consumer.receive(); 
message.acknowledge();异步接受,调用acknowledge()方法进行确认:
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String value = textMessage.getText();
System.out.println("value: " + value);
message.acknowledge(); //消息消费确认通知
} catch (JMSException e) {
e.printStackTrace();
}
}
});
      3.Session.DUPS_ACKNOWLEDGE:不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

5. 持久化消息

JMS 支持以下两种消息提交模式:5.1 ERSISTENT 持久消息       持久消息是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。如果消息消费者在进行消费过程发生失败,则消息会被再次投递。       DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
AMQ       AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
KahaDB        KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。
JDBC可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。具体使用方式大家下去查一下,限于篇幅在此就不做太详细的介绍。
5.2 NON_PERSISTENT 非持久消息      非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。      DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。

6. ActiveMQ消息过期设置

       允许消息过期 。默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。 
有两种方法设置消息的过期时间,时间单位为毫秒:使用 setTimeToLive 方法为所有的消息设置过期时间;
使用 send 方法为每一条消息设置过期时间。
     消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。若有不同建议或者疑惑请入群讨论或者留言,谢谢!


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息