您的位置:首页 > 其它

ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现

2012-05-16 00:14 721 查看
http://my.oschina.net/u/264430/blog/57318

 

最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化;

真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,最终还是在一天半之后整出来鸟~~

首先向大家介绍一本书籍《ActiveMQ in Action》,我大部分代码都是参考这本书实现的。好了,废话少说,看代码:

1.首先启动activeMQ的服务

view source

print?

01
public
class
RunServer {
02
 
03
/** 启动activeMQ服务 */
04
public

static
void
main(String[] args)
throws
Exception {
05
RunServer rs =
new
RunServer();
06
BrokerService broker = rs.startServer();
07
}
08
09
public

BrokerService startServer()
throws

Exception{
10
// java代码调用activemq相关的类来构造并启动brokerService
11
BrokerService broker =
new
BrokerService();
12
 
13
// 以下是持久化的配置
14
// 持久化文件存储位置
15
File dataFilterDir =
new
File(
"targer/amq-in-action/kahadb"
);
16
KahaDBStore kaha =
new
KahaDBStore();
17
kaha.setDirectory(dataFilterDir);
18
// use a bigger journal file
19
kaha.setJournalMaxFileLength(
1024
*
100
);
20
// small batch means more frequent and smaller writes
21
kaha.setIndexWriteBatchSize(
100
);
22
// do the index write in a separate thread
23
kaha.setEnableIndexWriteAsync(
true
);
24
25
broker.setPersistenceAdapter(kaha);
26
// create a transport connector
27
broker.addConnector(
"tcp://localhost:61616"
);
28
broker.setUseJmx(
true
);
29
//broker.setDataDirectory("data/");
30
31
 
32
// 以下是ManagementContext的配置,从这个容器中可以取得消息队列中未执行的消息数、消费者数、出队数等等
33
// 设置ManagementContext
34
ManagementContext context = broker.getManagementContext();
35
context.setConnectorPort(
2011
);
36
context.setJmxDomainName(
"my-broker"
);
37
context.setConnectorPath(
"/jmxrmi"
);
38
broker.start();
39
System.in.read();
40
return

broker;
41
}
2.发送消息

view source

print?

01
public
class
Sender {
02
private

static
final
int
SEND_NUMBER =
1
;
03
 
04
public

static
void
main(String[] args){
05
// ConnectionFactory :连接工厂,JMS 用它创建连接
06
ConnectionFactory connectionFactory;
07
// Connection :JMS 客户端到JMS Provider 的连接
08
Connection connection =
null
;
09
// Session: 一个发送或接收消息的线程
10
Session session;
11
// Destination :消息的目的地;消息发送给谁.
12
Destination destination;
13
// MessageProducer:消息发送者
14
MessageProducer producer;
15
// TextMessage message;
16
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
17
 
18
connectionFactory =
new
ActiveMQConnectionFactory(
19
ActiveMQConnection.DEFAULT_USER,
20
ActiveMQConnection.DEFAULT_PASSWORD,
21
"tcp://localhost:61616"
);
22
try

{
23
// 构造从工厂得到连接对象
24
connection = connectionFactory.createConnection();
25
// 启动
26
connection.start();
27
// 获取操作连接
28
session = connection.createSession(Boolean.TRUE,
29
Session.AUTO_ACKNOWLEDGE);
30
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
31
destination = session.createQueue(
"test-persistence"
);
32
// 得到消息生成者【发送者】
33
producer = session.createProducer(destination);
34
// 设置不持久化,可以更改
35
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
36
// 构造消息
37
sendMessage(session, producer);
38
session.commit();
39
 
40
}

catch
(Exception e) {
41
e.printStackTrace();
42
}

finally
{
43
try

{
44
if

(
null
!= connection)
45
connection.close();
46
}

catch
(Throwable ignore) {
47
}
48
}
49
 
50
}
51
 
52
public

static
void
sendMessage(Session session, MessageProducer producer)
53
throws

Exception {
54
for

(
int
i =
1
; i <= SEND_NUMBER; i++) {
55
TextMessage message = session
56
.createTextMessage(
"ActiveMq 发送的消息"

+ i);
57
// 发送消息到目的地方
58
System.out.println(
"发送消息:"

+ i);
59
producer.send(message);
60
}
61
}
3.收消息

view source

print?

01
public
class
Receiver {
02
public

static
void
main(String[] args){
03
// ConnectionFactory :连接工厂,JMS 用它创建连接
04
ConnectionFactory connectionFactory;
05
// Connection :JMS 客户端到JMS Provider 的连接
06
Connection connection =
null
;
07
// Session: 一个发送或接收消息的线程
08
Session session;
09
// Destination :消息的目的地;消息发送给谁.
10
Destination destination;
11
// 消费者,消息接收者
12
MessageConsumer consumer;
13
 
14
connectionFactory =
new
ActiveMQConnectionFactory(
15
ActiveMQConnection.DEFAULT_USER,
16
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
17
try

{
18
// 构造从工厂得到连接对象
19
connection = connectionFactory.createConnection();
20
// 启动
21
connection.start();
22
// 获取操作连接
23
session = connection.createSession(Boolean.TRUE,
24
Session.AUTO_ACKNOWLEDGE);
25
//test-queue跟sender的保持一致,一个创建一个来接收
26
destination = session.createQueue(
"test-persistence"
);
27
consumer = session.createConsumer(destination);
28
29
consumer.setMessageListener(
new

MessageListener() {
30
public

void
onMessage(Message arg0) {
31
try

{
32
Thread.sleep(
1000
);
33
}

catch
(InterruptedException e1) {
34
e1.printStackTrace();
35
}
36
System.out.println(
"=================="
);
37
try

{
38
System.out.println(
"RECEIVE1第一个获得者:"
39
+ ((TextMessage) arg0).getText());
40
}

catch
(JMSException e) {
41
// TODO Auto-generated catch block
42
e.printStackTrace();
43
}
44
45
}
46
});
47
}

catch
(Exception e) {
48
e.printStackTrace();
49
}
50
finally

{
51
try

{
52
if

(
null
!= connection)
53
connection.close();
54
}

catch
(Throwable ignore) {
55
}
56
}
57
}
58
}
4.获取消息的状态,也就是上面所说的获得消息队列中未执行的消息数、消费者数、出队数等等

view source

print?

01
public
class
StateTest {
02
 
03
/**
04
* 获取状态
05
* @throws Exception
06
*/
07
public

static
void
main(String[] args)
throws
Exception {
08
JMXServiceURL url =
new
JMXServiceURL(
"service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi"
);
09
JMXConnector connector = JMXConnectorFactory.connect(url,
null
);
10
connector.connect();
11
MBeanServerConnection connection = connector.getMBeanServerConnection();
12
 
13
// 需要注意的是,这里的my-broker必须和上面配置的名称相同
14
ObjectName name =
new
ObjectName(
"my-broker:BrokerName=localhost,Type=Broker"
);
15
BrokerViewMBean mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.
class
,
true
);
16
// System.out.println(mBean.getBrokerName());
17
18
for
(ObjectName queueName : mBean.getQueues()) {
19
QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.
class
,
true
);
20
System.out.println(
"\n------------------------------\n"
);
21
 
22
// 消息队列名称
23
System.out.println(
"States for queue --- "

+ queueMBean.getName());
24
 
25
// 队列中剩余的消息数
26
System.out.println(
"Size --- "

+ queueMBean.getQueueSize());
27
 
28
// 消费者数
29
System.out.println(
"Number of consumers --- "

+ queueMBean.getConsumerCount());
30
 
31
// 出队数
32
System.out.println(
"Number of dequeue ---"

+ queueMBean.getDequeueCount() );
33
}
34
35
}
36
 
37
}
到此结束,希望可以为大家做个参考~~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息