您的位置:首页 > 编程语言 > Java开发

activemq订阅,队列Demo(maven,spring管理)(mysql持久化,失败重连机制)

2017-05-02 15:37 477 查看
activemq是基于JMS标准的通信方式;下面将简单的介绍activemq的两种模式:

1、订阅模式(多个接收者consumers在等待接收消息,一个producer生产消息,生产者发出消息后,所有连接到同一地址的消费者都能够得到刚发送的消息,适用于同步的业务逻辑)

2、队列模式(唯一消息生产者,唯一消息消费者,消息生产出来后会扔在队列里,等待消费者去慢慢的消费他们,适用于异步处理的业务逻辑)

具体步骤:1、从官网上下载activemq的适配包,我下载的是5.14.4windows64版本,解压后打开bin目录下的activemq.bat如截图位置,双击.bat批处理文件,启动相关服务,访问localhost:8186,如下图所示,则表示启动成功了;下面可以用生产者,消费者,连接到本机的这个类似于服务的中间容器中去发送,接收消息了;



登陆控制台:admin/admin,查看对应的queue和topic的消费生产情况;



发送和接收消息的Demo:

1、手动生产10条消息,放在queue中;



控制台:待消费的消息10条,没有消费者;



可以看出:第一个红框是待消费的消息条数;第二个红框中是一共生成的消息总数;

2、手动接收queue中的消息,每次5条:



控制台显示如下:被消费了5条消息,5条没有被消费的,队列中一共有10条消息,一个消费者;



订阅模式Demo:定时任务定时生成消息;



生成的消息都放在了名为subject的订阅号中,如果需要取消息,也要从subject中取;



发现subject名字的订阅号下生成了许多消息;没有消费者,没有已消费情况;



生成一个消费者来消费这些消息:



上面这个demo中有两种接收方式:1、手动接收,2、listener自动接收;手动接收的消费者在接受完消息后就死亡了;我们可以启动两个listener来看订阅的独特的一发多收的效果:



本初设计到的两个demo(maven,spring管理的工程)会上传到我的资源中,欢迎大家下载学习,有什么不懂的也可以联系我,共同探讨;谢谢;

最近又有研究下MQ的消息持久化,实现方式是修改其配置文件activemq.xml;

<!-- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>-->
<bean id="MySQL-DS" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.*.*:3306/*****?useUnicode=true&characterEncoding=UTF-8"/>
<property name="username" value="***"/>
<property name="password" value="***"/>
<property name="poolPreparedStatements" value="true"/>
</bean>


失败重连机制,实现方式:

package queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* @description: [发送消息]
* @author: zxx
* @createDate: 2018/2/26 下午2:16
* @version: [v1.0]
*/
public class Sender {
private static final int SEND_NUMBER = 5;

public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS
// Provider 的连接
// Session: 一个发送或接收消息的线程
Connection connection = null;
Session session; // Destination :消息的目的地;消息发送给谁.
Destination destination; // MessageProducer:消息发送者
MessageProducer producer; // TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
//ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
ActiveMQConnection.DEFAULT_PASSWORD,     "failover:(tcp://localhost:61616,tcp://localhost:61617)?initialReconnectDelay=1000&maxReconnectDelay=30000");
//打印出用户和密码
System.out.println("ActiveMQConnection.DEFAULT_USER:" + ActiveMQConnection.DEFAULT_USER
+ ",ActiveMQConnection.DEFAULT_PASSWORD:" + ActiveMQConnection.DEFAULT_PASSWORD);
try { // 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xx.queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("xx.queue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定(将消息持久化到mysql的Demo)
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e
) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息