RabbitMQ消息中间件入门
2018-05-10 12:51
405 查看
第一章:RabbitMQ起步
1.1 课程导航
- RabbitMQ简介及AMQP协议
- RabbitMQ安装与使用
- RabbitMQ核心概念
- 与SpringBoot整合
- 保障100%的消息可靠性投递方案落地实现
- 学习源码
1.2 RabbitMQ简介
初识RabbitMQ
- RabbitMQ是一个开源的消息代理和队列服务器
- 用来通过普通协议在完全不同的应用之间共享数据
- RabbitMQ是使用Erlang语言来编写的
- 并且RabbitMQ是基于AMQP协议的
RabbitMQ简介
- 目前很多互联网大厂都在使用RabbitMQ
- RabbitMQ底层采用Erlang语言进行编写
- 开源、性能优秀,稳定性保障
- 与SpringAMQP完美的整合、API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
- AMQP全称:Advanced Message Queuing Protocol
- AMQP翻译:高级消息队列协议
AMQP协议模型
1.3 RabbitMQ安装
0.安装准备 官网地址:http://www.rabbitmq.com/ 安装Linux必要依赖包<Linux7> 下载RabbitMQ安装包 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz 1.下载: wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm 2.相关安装配置 三个软件包的安装 修改相关配置文件 vim /etc/hostname vim /etc/hosts (Linux防火墙) 3.修改RabbitMQ配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app 比如修改密码、配置等等;例如:loopback_users中的<<"guest">>,只保留guest 服务启动:rabbitmq-server start & 默认进程号:29123 默认端口号:5672 服务停止:rabbitmqctl app_stop 4.安装RabbitMQ web管理插件 rabbitmq-plugins enable rabbitmq_management sudo systemctl restart rabbitmq-server 访问管控台地址:http://192.168.11.81:15672/ 默认用户名密码:guest/guest
1.4 RabbitMQ概念
RabbitMQ的整体架构
RabbitMQ核心概念
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接
- Channel:网络信道
几乎所有的操作都在Channel中进行
Channel是进行消息读写的通道
客户端可建立多个Channel
每个Channel代表一个会话任务
- Message:消息
服务器和应用程序之间传送的数据,由Properties和Body组成
Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
Body则就是消息体内容
- Virtual host:虚拟机
用于进行逻辑隔离,最上层的消息路由
一个Virtual host里面可以有若干个Exchange和Queue
同一个Virtual host里面不能有相同名称的Exchange或Queue
- Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
- Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者
RabbitMQ消息的流转过程
第二章:RabbitMQ整合SpringBoot2.x
2.1 发送消息Producer
SpringBoot与RabbitMQ集成
- 引入相关依赖
- 对application.properties进行配置
1、创建名为rabbitmq-producer的maven工程pom如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>47-rabbitmq</artifactId> <groupId>com.myimooc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rabbitmq-producer</artifactId> <properties> <spring.boot.version>2.0.4.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--工具类依赖--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、编写application.properties类
# rabbitmq地址 spring.rabbitmq.addresses=10.1.195.196:5672 # rabbitmq用户名 spring.rabbitmq.username=guest # rabbitmq密码 spring.rabbitmq.password=guest # rabbitmq默认虚拟主机地址 spring.rabbitmq.virtual-host=/ # rabbitmq超时时间为15秒 spring.rabbitmq.connection-timeout=15000 #字符集 spring.http.encoding.charset=UTF-8 #格式化 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull spring.datasource.username=root spring.datasource.password= spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type= com.alibaba.druid.pool.DruidDataSource # 项目路径 server.servlet.context-path=/ # 服务端口号 server.port=8080
3、创建数据库
-- ---------------------------- -- Table structure for broker_message_log -- ---------------------------- DROP TABLE IF EXISTS `broker_message_log`; CREATE TABLE `broker_message_log` ( `message_id` varchar(255) NOT NULL COMMENT '消息唯一ID', `message` varchar(4000) NOT NULL COMMENT '消息内容', `try_count` int(4) DEFAULT '0' COMMENT '重试次数', `status` varchar(10) DEFAULT '' COMMENT '消息投递状态 0投递中,1投递成功,2投递失败', `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重试时间', `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for t_order -- ---------------------------- DROP TABLE IF EXISTS `t_order`; CREATE TABLE `t_order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `message_id` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;
4、编写Order类
package com.bfxy.springboot.entity; import java.io.Serializable; /** * 订单实体 * @author wangc */ public class Order implements Serializable { private static final long serialVersionUID = -1502291609049620042L; private String id; private String name; /** * 存储消息发送的唯一标识 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
5、编写OrderSender类
package com.bfxy.springboot.producer; import com.bfxy.springboot.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 订单消息发送者 * * @author wangc */ @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送订单 * * @param order 订单 * @throws Exception 异常 */ public void send(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange", "order.abcd", order, correlationData); } }
6、编写OrderSenderTest类
package com.bfxy.springboot; import com.bfxy.springboot.entity.Order; import com.bfxy.springboot.producer.OrderSender; import org.junit.Test; import org.junit.runner.RunWith; import org.omg.CORBA.PUBLIC_MEMBER; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; @RunWith(SpringRunner.class) @SpringBootTest public class SpringbootProducerApplicationTests { @Test public void contextLoads() { } @Autowired private OrderSender orderSender; /** * 订单消息发送者测试 * @author wangc */ @Test public void testSend1() throws Exception { Order order = new Order(); order.setId("201808180000000001"); order.setName("测试订单1"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString()); orderSender.send(order); } }
2.2 接收消息Cunsumer
1、编写application.properties类
# SpringBoot整合rabbitMQ的基本配置: # rabbitmq地址 spring.rabbitmq.addresses=10.1.195.196:5672 # rabbitmq用户名 spring.rabbitmq.username=guest # rabbitmq密码 spring.rabbitmq.password=guest # rabbitmq默认虚拟主机地址 spring.rabbitmq.virtual-host=/ # rabbitmq超时时间为15秒 spring.rabbitmq.connection-timeout=15000 #字符集 #spring.http.encoding.charset=UTF-8 #格式化 #spring.jackson.date-format=yyyy-MM-dd HH:mm:ss #spring.jackson.time-zone=GMT+8 #spring.jackson.default-property-inclusion=NON_NULL spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type= com.alibaba.druid.pool.DruidDataSource # SpringBoot整合rabbitMQ 消费端配置: # 基本并发:5 spring.rabbitmq.listener.simple.concurrency=5 # 签收模式:手动签收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 最大并发:10 spring.rabbitmq.listener.simple.max-concurrency=10 # 限流策略:同一时间只有1条消息发送过来消费 spring.rabbitmq.listener.simple.prefetch=1 # Server配置: # 项目路径 server.servlet.context-path=/ # 服务端口号 server.port=8082
2、编写OrderReceiver类
package com.bfxy.springboot.consumer; import com.bfxy.springboot.entity.Order; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** * 订单接收者 * * @author wangc */ @Component public class OrderReceiver { /** * 接收消息 * * @RabbitListener 绑定监听 * * @param order 消息体内容 * @param headers 消息头内容 * @param channel 网络信道 * @throws Exception 异常 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue", durable = "true"), exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"), key = "order.*" ) ) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 消费者操作 System.out.println("------------收到消息,开始收费-------------------"); System.out.println("订单ID" + order.getId()); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收消息 // channel.basicAck(deliveryTag, false); } }
相关文章推荐
- 【消息中间件RabbitMQ】Spring AMQP分析与实战视频教程
- 【RabbitMQ消息中间件】2.安装RabbitMQ
- 分布式系统消息中间件——RabbitMQ的使用基础篇
- 深入理解消息中间件技术之RabbitMQ服务
- 快速入门分布式消息队列之 RabbitMQ(1)
- 消息中间件JMS发布/订阅入门小Demo
- 转:Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能 (阿里中间件团队博客)
- rabbitmq消息队列的简单入门
- 消息中间件--RabbitMQ
- 使用rabbitmq消息中间件
- RabbitMQ入门-消息派发那些事儿
- 分布式系统消息中间件——RabbitMQ的使用进阶篇
- 消息中间件RabbitMQ 初探
- 在 CentOS7 上安装 RabbitMQ 消息队列中间件
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- Spring Boot 入门之消息中间件篇(五)
- RabbitMQ入门-10-2(整合spring-发送同步消息注解实现)
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ
- 消息中间件-ActiveMQ入门实例
- 消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ