您的位置:首页 > 其它

ActiveMQ入门教程(四) - ActiveMQ Pub/Sub版的HelloWorld

2013-09-28 14:46 363 查看
在上一篇中,我们说了P2P版的HelloWorld,在这一篇,我们简要说一下,基于发布,订阅模式的HelloWorld。

基础知识就不在介绍了,需要的会一点一点讲。

1. pom.xml

这个和上一篇是一样的:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>org.ygy</groupId>
<artifactId>activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>activemq</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>

<!-- activemq,学习中 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.6</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.6</version>
</dependency>

</dependencies>
</project>


2. Pub/Sub版的HelloWorld

生产者:

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;

public class HelloTopicProducer {

public void send(String msg) {
// 生产者的主要流程
Connection connection = null;

try {
// 1.初始化connection工厂,使用默认的URL
// failover://tcp://localhost:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

// 2.创建Connection
connection = connectionFactory.createConnection();

// 3.打开连接
connection.start();

// 4.创建Session,(是否支持事务)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建消息目标
Destination destination_send = session.createTopic(MQConstants.DESTINATION_SEND);

// 6.创建生产者
MessageProducer producer = session.createProducer(destination_send);

// 7.配置消息是否持久化
/*
* DeliverMode有2种方式:
*
* public interface DeliveryMode { static final int NON_PERSISTENT =
* 1;//不持久化:服务器重启之后,消息销毁
*
* static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在 }
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 8.初始化要发送的消息
TextMessage message = session.createTextMessage(msg);

// 9.发送消息
producer.send(message);

connection.close();

} catch (JMSException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
new HelloTopicProducer().send("我来试一试发布/订阅...");
}

}


消费者:

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;

public class HelloTopicConsumer implements MessageListener {

@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;

try {
System.out.println("哈,我接收到了消息:" + txtMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
}

public void receive() {
// 消费者的主要流程
Connection connection = null;

try {
// 1.初始化connection工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

// 2.创建Connection
connection = connectionFactory.createConnection();

// 3.打开连接
connection.start();

// 4.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建消息目标
Destination destination = session.createTopic(MQConstants.DESTINATION_SEND);

// 6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);

// 7.配置监听
consumer.setMessageListener(new HelloTopicConsumer());

} catch (JMSException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
new HelloTopicConsumer().receive();
}

}


3.测试

访问网页:http://localhost:8161/admin/topics.jsp



单击那个Topics连接。

这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。

Name:主题的名称

Number Of Consumers:正在运行的消费者

Message Enqueued:进入消息队列的

Message Dequeued:出消息队列的

Operations:操作

下面就可以开始运行程序了,

注意顺序:先运行消费者:



这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)

我们的消费者一直在运行

接下来,运行生产者:

控制台会输出:



再一次,刷新界面:



消费者还在运行,只生产了一条消息,而且已经被消费了。

在这里遇到了一个问题,就是运行顺序的问题,

我们如果先运行生产者,再运行消费者,消费者是接收不到消息的,郁闷了好久

猜想,应该是对概念,规范的理解出了问题,就找了一下,发现了原因:



这是上一篇介绍的JMS消息模型,哎,

,对概念的理解不清晰。

至于,持久的订阅,会在以后的博客中分享,HelloWorld,就到此结束了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: