您的位置:首页 > 编程语言 > Java开发

ActiveMQ在spring中整合实例讲解

2017-12-10 23:46 423 查看
一、ActiveMQ介绍

     
  (1)MQ:Message Queue就是消息队列。

(2)ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。

(3)ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事
情了,但是JSM在当今的J2EE应用中仍然扮演者特殊的地位。

(4)ActiveMQ消息的传递有两种类型:

1)点对点:即一个生产者和一个消费者一一对应。

2)发布/订阅模式:即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

(5)主要特点:

1)多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议:OpenWire,Stomp
REST,WS Notification,XMPP,AMQP。

2)完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。

3)对Spring支持,ActiveMQ可以很容易内嵌到使用spring的系统里面去。

4)通过了常见J2EE服务器(如Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5resource
adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上。

5)支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。

6)支持通过JDBC和journal提供高速的消息持久化。

7)从设计上保证了高性能的集群,客户端-服务器,点对点。

8)支持Ajax。

9)支持与Axis的整合。

10)可以很容易的调用内嵌JMS Provider,进行测试。

二、JSM介绍(番外篇)

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件
(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

JMS是一个与具体平台无关的API,具有跨平台性。

它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实
际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的
消息后去完成对应的业务逻辑。

JMS定义了五种不同的消息正文格式:

   • StreamMessage -- Java原始值的数据流

   • MapMessage--一套名称-值对
daa4

   • TextMessage--一个字符串对象

   •ObjectMessage--一个序列化的 Java对象

   • BytesMessage--一个字节的数据流
三、ActiveMQ下载

下载地址:http://activemq.apache.org/

四、ActiveMQ安装(Windows版本)

(1)安装环境

JDK1.7及以上版本,配置好环境变量。(不在赘述)

(2)将下载好的apache-activemq-5.14.5解压后(我放在了D盘下),打开目录

D:\apache-activemq-5.14.5\bin\win64,双击activemq.bat文件。如果你是Windows32位的操作系统,那么

你打开的目录是D:\apache-activemq-5.14.5\bin\win32,然后双击其下的activemq.bat文件。这样你的

ActiveMQ服务器就启动了。(注意:请不要双击D:\apache-activemq-5.14.5\bin目录下的activemq.bat

文件,会出现闪退。如果你下载的apache-activemq-5.15.2,那么请参考其他文章)。

(3)安装成功后我们就可以对activeMQ服务器进行访问了。

输入访问地址:http://192.168.37.161:8161/

     账号:admin

密码:admin

登录成功后出现以下页面:



五、 ActiveMQ整合进Spring

1)在eclipse中创建java项目,创建lib目录,导入jar包,jar包如下:



2)Sender---消息发送者代码

package com.alex.mq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class Sender {

public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-activeMQ.xml");
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
Destination destination = context.getBean(Destination.class);
for (int i = 0; i < 10; i++) {
sendMessage(jmsTemplate,destination);
}
}

public static void sendMessage(JmsTemplate jmsTemplate,Destination destination){
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message =  new ActiveMQTextMessage();

message.setText("##---我是被发送的消息内容---##");
return message;
}
});
}
}


3)MyMessageListener---消息监听器1(接收消息,消息消费者)

package com.alex.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
String name = Thread.currentThread().getName();
if (null != message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println("线程:"+name+"...MyMessageListener接收到消息内容:"+text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}
}


4)MySecondMessageListener---消息监听器2(接收消息,消息消费者)

package com.alex.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MySecondMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
String name = Thread.currentThread().getName();
if (message instanceof TextMessage) {
try {
String content = ((TextMessage) message).getText();
System.out.println("线程:"+name+"...MySecondMessageListener接收到消息内容:"+content);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}


5)spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xsi:schemalocation=""http://www.springframework.org/schema/beans ">

<!-- 真正可以提供connection的connectionFactory,由对应的JMS服务厂商提供 -->
<bean name="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<!-- spring真正用于管理ConnectionFactory的ConnectionFactory -->
<bean name="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
</bean>
<!-- spring提供的JMS工具类,它可以用于发送消息 -->
<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory是spring管理的那个对象 -->
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!-- 这个是队列目的地,点对点的 -->
<!-- <bean name="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
-->
<!-- 这个主题目的地,一对多的 -->
<bean name="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"></constructor-arg>
</bean>
<!-- messageListener实现类 -->
<bean name="messageListener" class="com.alex.mq.MyMessageListener"></bean>
<!-- messageListener实现类 -->
<bean name="secondMessageListener" class="com.alex.mq.MySecondMessageListener"></bean>
<!-- 消息监听器容器1 -->
<bean name="jsmContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="topicDestination"></property>
<property name="messageListener" ref="messageListener"></property>
</bean>
<!-- 消息监听器容器2 -->
<bean name="jsmContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="topicDestination"></property>
<property name="messageListener" ref="secondMessageListener"></property>
</bean>

</beans>


6)java目录图



六、执行结果(topic---发布/订阅模式)



七、执行结果(queue---点对点)



八、总结

1)activeMQ的点对点是一个生产消息,将消息放入queue队列,然后另一个消费消息,每一个消费者都是一个线程,它们消费的消息数量等于生产者生产的数量。

2)发布/订阅模式:生产者生产的所有消息都会被每一个的消费者消费。

3)生产者发送消息到MQ服务器,消费者监听MQ服务器,获取到消息后进行消费消息。

4)消费消息处理监听机制外还有receive(),接收消息,这种在项目中不常用,项目中常用的是监听机制。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: