消息队列篇—详谈ActiveMQ消息队列模式的分析及使用
2018-04-22 01:33
477 查看
消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另一种是订阅\发布的消息模式。
点对点的消息模式
点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。
Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:
点对点的发送方逻辑代码
点对点的接收方代码
订阅/发布的消息模式
订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。
Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:
订阅/发布的发送方代码
订阅/发布的接收方代码
点对点的消息模式
点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。
Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
点对点的发送方逻辑代码
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class MQSender { private String userName = "root"; private String password = "123456"; private String url = "tcp://127.0.0.1:61616"; public static void main(String[] args) { MQSender send = new MQSender(); send.start(); } public void start(){ try { ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建 Destination destination = session.createQueue("textMsg"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage textMsg = session.createTextMessage("消息内容"); for(int i = 0 ; i < 10; i ++){ producer.send(textMsg); } producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
点对点的接收方代码
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class PTPReceive { private String userName = "root"; private String password = "123456"; private String url = "tcp://127.0.0.1:61616"; public static void main(String[] args) { PTPReceive receive = new PTPReceive(); receive.start(); } public void start(){ try { ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("textMsg"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { String text = ((TextMessage)message).getText(); } catch (JMSException e) { e.printStackTrace(); } } }); consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
订阅/发布的消息模式
订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。
Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
订阅/发布的发送方代码
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class MQSender { private String userName = "root"; private String password = "123456"; private String url = "tcp://127.0.0.1:61616"; public static void main(String[] args) { MQSender send = new MQSender(); send.start(); } public void start(){ try { ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建 Destination destination = session.createTopic("textMsg"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage textMsg = session.createTextMessage("消息内容"); for(int i = 0 ; i < 10; i ++){ producer.send(textMsg); } producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
订阅/发布的接收方代码
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class PTPReceive { private String userName = "root"; private String password = "123456"; private String url = "tcp://127.0.0.1:61616"; public static void main(String[] args) { PTPReceive receive = new PTPReceive(); receive.start(); } public void start(){ try { ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("textMsg"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { String text = ((TextMessage)message).getText(); } catch (JMSException e) { e.printStackTrace(); } } }); consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
相关文章推荐
- Java消息中间件学习笔记四 -- ActiveMQ的使用,【队列模式】
- 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
- 【Java消息中间件】Java消息中间件( 第4章 使用activemq - 队列模式、主题模式的消息演示 )
- ActiveMQ消息队列的使用--点对点的消息模式
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- 工业物联网或系统集成中应用消息队列(ActiveMQ,C#的demo)的场景全面分析
- 消息队列-推/拉模式学习 & ActiveMQ及JMS学习
- ActiveMQ常见的高可用架构模式及使用LevelDB、ZooKeeper进行高可用消息架构
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 消息队列(Message Queue)基本概念和使用场景分析
- java消息队列ActiveMQ的简单使用
- 【ActiveMq】ActiveMQ消息队列的使用及应用
- IPC消息队列使用详细分析
- ActiveMQ消息队列的使用及应用
- ActiveMQ两种消息模式以及为什么使用MQ
- Windows平台下的ActiveMQ消息队列的简单使用
- RabbitMq六种使用模式(1)_直接指定消息接收队列
- spring整合activemq消息队列之点对点模式
- ActiveMQ(二):使用队列Queue方式发送消息