activemq订阅,队列Demo(maven,spring管理)(mysql持久化,失败重连机制)
2017-05-02 15:37
477 查看
activemq是基于JMS标准的通信方式;下面将简单的介绍activemq的两种模式:
1、订阅模式(多个接收者consumers在等待接收消息,一个producer生产消息,生产者发出消息后,所有连接到同一地址的消费者都能够得到刚发送的消息,适用于同步的业务逻辑)
2、队列模式(唯一消息生产者,唯一消息消费者,消息生产出来后会扔在队列里,等待消费者去慢慢的消费他们,适用于异步处理的业务逻辑)
具体步骤:1、从官网上下载activemq的适配包,我下载的是5.14.4windows64版本,解压后打开bin目录下的activemq.bat如截图位置,双击.bat批处理文件,启动相关服务,访问localhost:8186,如下图所示,则表示启动成功了;下面可以用生产者,消费者,连接到本机的这个类似于服务的中间容器中去发送,接收消息了;
登陆控制台:admin/admin,查看对应的queue和topic的消费生产情况;
发送和接收消息的Demo:
1、手动生产10条消息,放在queue中;
控制台:待消费的消息10条,没有消费者;
可以看出:第一个红框是待消费的消息条数;第二个红框中是一共生成的消息总数;
2、手动接收queue中的消息,每次5条:
控制台显示如下:被消费了5条消息,5条没有被消费的,队列中一共有10条消息,一个消费者;
订阅模式Demo:定时任务定时生成消息;
生成的消息都放在了名为subject的订阅号中,如果需要取消息,也要从subject中取;
发现subject名字的订阅号下生成了许多消息;没有消费者,没有已消费情况;
生成一个消费者来消费这些消息:
上面这个demo中有两种接收方式:1、手动接收,2、listener自动接收;手动接收的消费者在接受完消息后就死亡了;我们可以启动两个listener来看订阅的独特的一发多收的效果:
本初设计到的两个demo(maven,spring管理的工程)会上传到我的资源中,欢迎大家下载学习,有什么不懂的也可以联系我,共同探讨;谢谢;
1、订阅模式(多个接收者consumers在等待接收消息,一个producer生产消息,生产者发出消息后,所有连接到同一地址的消费者都能够得到刚发送的消息,适用于同步的业务逻辑)
2、队列模式(唯一消息生产者,唯一消息消费者,消息生产出来后会扔在队列里,等待消费者去慢慢的消费他们,适用于异步处理的业务逻辑)
具体步骤:1、从官网上下载activemq的适配包,我下载的是5.14.4windows64版本,解压后打开bin目录下的activemq.bat如截图位置,双击.bat批处理文件,启动相关服务,访问localhost:8186,如下图所示,则表示启动成功了;下面可以用生产者,消费者,连接到本机的这个类似于服务的中间容器中去发送,接收消息了;
登陆控制台:admin/admin,查看对应的queue和topic的消费生产情况;
发送和接收消息的Demo:
1、手动生产10条消息,放在queue中;
控制台:待消费的消息10条,没有消费者;
可以看出:第一个红框是待消费的消息条数;第二个红框中是一共生成的消息总数;
2、手动接收queue中的消息,每次5条:
控制台显示如下:被消费了5条消息,5条没有被消费的,队列中一共有10条消息,一个消费者;
订阅模式Demo:定时任务定时生成消息;
生成的消息都放在了名为subject的订阅号中,如果需要取消息,也要从subject中取;
发现subject名字的订阅号下生成了许多消息;没有消费者,没有已消费情况;
生成一个消费者来消费这些消息:
上面这个demo中有两种接收方式:1、手动接收,2、listener自动接收;手动接收的消费者在接受完消息后就死亡了;我们可以启动两个listener来看订阅的独特的一发多收的效果:
本初设计到的两个demo(maven,spring管理的工程)会上传到我的资源中,欢迎大家下载学习,有什么不懂的也可以联系我,共同探讨;谢谢;
最近又有研究下MQ的消息持久化,实现方式是修改其配置文件activemq.xml;
<!-- <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>--> <bean id="MySQL-DS" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.*.*:3306/*****?useUnicode=true&characterEncoding=UTF-8"/> <property name="username" value="***"/> <property name="password" value="***"/> <property name="poolPreparedStatements" value="true"/> </bean>
失败重连机制,实现方式:
package queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * @description: [发送消息] * @author: zxx * @createDate: 2018/2/26 下午2:16 * @version: [v1.0] */ public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS // Provider 的连接 // Session: 一个发送或接收消息的线程 Connection connection = null; Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, //ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?initialReconnectDelay=1000&maxReconnectDelay=30000"); //打印出用户和密码 System.out.println("ActiveMQConnection.DEFAULT_USER:" + ActiveMQConnection.DEFAULT_USER + ",ActiveMQConnection.DEFAULT_PASSWORD:" + ActiveMQConnection.DEFAULT_PASSWORD); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xx.queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("xx.queue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定(将消息持久化到mysql的Demo) //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e ) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } } }
相关文章推荐
- Maven+Spring+Hibernate+Shiro+Mysql简单的demo框架(二)
- 基于spring mvc + spring+mybatis+easyui+jquery+maven+mysql的后台权限管理系统
- 整合SpringBoot+Mysql+Redis实现缓存机制的一个Demo
- Spring+Mybatis+MySql+Maven 简单的事务管理案例
- 博客园首页新随笔联系管理订阅 随笔- 70 文章- 22 评论- 5 Spring 事务机制详解
- SSM框架整合(Maven+Spring+SpringMVC+Mybatis+mysql 附demo实例)
- 消息队列入门(五)ActiveMQ的JDBC消息持久化机制
- ActiveMQ(三):ActiveMQ的安全机制、api及订阅模式demo
- Spring+Mybatis+MySql+Maven 简单的事务管理案例
- activemq 支持mysql持久化 消息队列使用
- Spring+Mybatis+MySql+Maven 简单的事务管理案例
- Spring+SpringMVC+Hibernate 基本Demo(注解、Maven管理)
- Spring+SpringMVC+Hibernate 基本Demo(注解、Maven管理)
- 发布一个demo maven+freemarker+shiro+springmvc+spring+mybatis+redis+mysql
- SpringMVC+Hibernate+Maven+MySQL实现增删改查的一个小Demo
- Spring整合MyBatis(Maven+spring+MyBatis+mysql实践附demo)
- Spring ActiveMQ 整合(二): 重发机制(消息发送失败后的重新发送)
- Spring+Mybatis+MySql+Maven 简单的事务管理案例
- Spring+Mybatis+MySql+Maven 简单的事务管理案例
- 基于注解的maven+spring+hibernate+mysql简单demo