您的位置:首页 > 产品设计 > UI/UE

activemq 应用实践——queue

2014-11-21 18:24 246 查看
首先创建发送端程序SenderTestBase和接收端程序ReceiveTestBase

发送端: SenderTestBase.java

package test;

import java.util.Date;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SenderTestBase {

private static int messageSize = 100 ;

/**

* @param args

*/

public static void main(String[] args) throws Exception {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

//使用事务 自动签收

Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

//创建queue 如果改队列在activemq服务器上存在 那么就获取到该queue的实例

Destination destination = session.createQueue("test-queue");

MessageProducer producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

int i = 0 ;

TextMessage message = null ;

while(i<100){

i++;

message = session.createTextMessage(createMessageText(i));

producer.send(message);

Thread.sleep(80);

}

System.out.println("发送完毕!");

try {

session.commit();

session.close();

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

private static String createMessageText(int index) {

StringBuffer buffer = new StringBuffer(messageSize);

buffer.append("Message: " + index + " sent at: " + new Date());

if (buffer.length() > messageSize) {

return buffer.substring(0, messageSize);

}

for (int i = buffer.length(); i < messageSize; i++) {

buffer.append(' ');

}

return buffer.toString();

}

}

接收端程序: ReceiveTestBase.java

package test;

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveTestBase {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

//使用事务 自动签收

Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("test-queue");

MessageConsumer consumer = session.createConsumer(destination);

int i = 0 ;

TextMessage message = null ;

while(true){

if(i < 100){

System.out.println("i = " + i);

}else{

break;

}

i++;

//该方法为阻塞式的方法 一直等待

message = (TextMessage) consumer.receive();

//该方法告诉服务器指定队列已经被签收

session.commit();

System.out.println("收到消息:" + message.getText());

// Thread.sleep(200);

}

try {

session.close();

connection.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

其中发送端发送格式为 "Message: " + index + " sent at: " + new Date() 的信息100次,

接收端接受该信息。

其次、我们按照下面的操作运行程序。

1、运行ReceiveTestBase ,结果为:

2010-9-24 11:30:11 org.apache.activemq.transport.failover.FailoverTransport doReconnect

信息: Successfully connected to tcp://localhost:61616

i = 0

ReceiveTestBase的console中的光标停在“i = 0”的下一行,程序没有退出。

说明 consumer.receive(); 方法为阻塞式。

我们在打开浏览器,在地址栏中输入 “http://localhost:8161/admin”,点击进入 “Queues” 链接

我们能看到队列 test-queue 中的Numbe Of Consumer 中的数值为 1 。

Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值均为 0 。

2、接着我们不停止 ReceiveTestBase 程序的阻塞,而是运行 SenderTestBase 程序, SenderTestBase的console结果为:

2010-9-24 11:39:08 org.apache.activemq.transport.failover.FailoverTransport doReconnect

信息: Successfully connected to tcp://localhost:61616

发送完毕!

在去观察ReceiveTestBase的console中的结果,发现

i = 99

收到消息:Message: 100 sent at: Fri Sep 24 11:39:16 CST 2010

这种类型的信息接受到了 100条 ,并且程序退出。

说明交互成功,且读取的顺序是和写入的顺序一致!

接着我们去服务器监控页面查看queue中test-queue队列的情况,

发现 Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值均为 100 。

以上的操作时常规的发送消息和接受消息的过程,接下来我们做一下

session.createQueue("test-queue"); 和 session.commit(); 的作用。

3、快速的运行 SenderTestBase 程序三次,每个console都没有抛出错误,说明session.createQueue("test-queue");语句

在test-queue在服务器上没有的时候是创建,在服务器上有的时候是获取该queue的信息创建实例。

我们在打开服务器的监控页面看看queue中的test-queue的情况如何。

Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值分别为300和0 ,

且Number Of Pending Messages (等待信息数)也为300。说明发送端发送出数据之后,

如果找不到客户端接受,那么activemq将把该信息存在服务器中。这个时候如果我们启动3个客户端情况会怎样?

快速运行 ReceiveTestBase 程序三次,我们看看每个客户端的console的结果如何,

每个客户端均得到了100条数据,但是这100条数据是无序的,说民activemq服务器端在queue中有多个客户端的情况下作了负载均衡。

4、我们在看看 接收端的 session.commit(); 有什么作用!

我们首先把ReceiveTestBase.java中的session.commit();行注释掉。

然后运行发送端程序 SenderTestBase ,在运行接收端程序ReceiveTestBase。

你会发送,接收端可以正常的接收到数据,但是当你打开activemq的监控系统时,

你会发现Messages Dequeued列中的值没有变化,说明只有在客户端告知服务器端接收到信息的时候,

Messages Dequeued列才会变化。所以有时候Messages Dequeued也不能反映真实的状况。

如果你运行多次接收端程序,你会发现它每次都能接收到正确的数据。只有把session.commit();注释去掉,

才能只接受到一次。

小结:引用网上的一段话“

JMS Queue执行load balancer语义:

一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。

如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。

一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: