您的位置:首页 > 其它

ActiveMQ 快速入门教程系列 第二章 发布-订阅者模式实现

2016-02-07 17:15 691 查看
第二章我们会介绍怎样实现一个发布者对多个订阅者的消息传递

Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口

package cn.com.evan.Jms.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message msg) {
		
		try {
			System.out.println("订阅者1接受消息:"+((TextMessage)msg).getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	

}


package cn.com.evan.Jms.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener2 implements MessageListener{

	@Override
	public void onMessage(Message msg) {
		
		try {
			System.out.println("订阅者2接受消息:"+((TextMessage)msg).getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	

}


然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器

package cn.com.evan.Jms.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsComsumer {
	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	private static Integer SENDNUM = 10;

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer messageConsumer;

		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
				BROKERURL);
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic("MyTopic1");//create topic
			messageConsumer = session.createConsumer(destination);// Create
																	// producer

			messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

}
package cn.com.evan.Jms.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsComsumer2 {
	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	private static Integer SENDNUM = 10;

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer messageConsumer;

		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
				BROKERURL);
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic("MyTopic1");//create topic
			messageConsumer = session.createConsumer(destination);// Create
																	// producer

			messageConsumer.setMessageListener(new MyMessageListener2());// set listener which we created before
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

}

接着我们创建发布者,发布10条消息

package cn.com.evan.Jms.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsProducer {

	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	private static Integer SENDNUM = 10;

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageProducer messageProducer;

		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
				BROKERURL);
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic("MyTopic1");// Create topic
			messageProducer = session.createProducer(destination);// Create producer
			sendMessage(session,messageProducer);
			session.commit();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}
	
	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
		for(int i=0;i<JmsProducer.SENDNUM;i++){
			TextMessage message = session.createTextMessage("ActiveMQ"+i);
			messageProducer.send(message);
			System.out.println("发布者发布消息:"+message.getText());
		}
	}

}
以上发布者和2个订阅者都已经创建完毕,我们先运行订阅者1,2,再运行发布者(就像我们平时要接收到什么新闻,都得先订阅那频道,才可以接收到相关频道发布的消息),运行结果如下







我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: