您的位置:首页 > 其它

RabbitMQ消息中间件入门

2018-05-10 12:51 405 查看

第一章:RabbitMQ起步

1.1 课程导航

  • RabbitMQ简介及AMQP协议
  • RabbitMQ安装与使用
  • RabbitMQ核心概念
  • 与SpringBoot整合
  • 保障100%的消息可靠性投递方案落地实现
  • 学习源码

1.2 RabbitMQ简介

初识RabbitMQ

  • RabbitMQ是一个开源的消息代理和队列服务器
  • 用来通过普通协议在完全不同的应用之间共享数据
  • RabbitMQ是使用Erlang语言来编写的
  • 并且RabbitMQ是基于AMQP协议的

RabbitMQ简介

  • 目前很多互联网大厂都在使用RabbitMQ
  • RabbitMQ底层采用Erlang语言进行编写
  • 开源、性能优秀,稳定性保障
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性
  • AMQP全称:Advanced Message Queuing Protocol
  • AMQP翻译:高级消息队列协议

AMQP协议模型

1.3 RabbitMQ安装

 

0.安装准备
官网地址:http://www.rabbitmq.com/
安装Linux必要依赖包<Linux7>
下载RabbitMQ安装包

yum install
build-essential openssl openssl-devel unixODBC unixODBC-devel
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

1.下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

2.相关安装配置
三个软件包的安装
修改相关配置文件
vim /etc/hostname
vim /etc/hosts
(Linux防火墙)

3.修改RabbitMQ配置
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app
比如修改密码、配置等等;例如:loopback_users中的<<"guest">>,只保留guest
服务启动:rabbitmq-server start &
默认进程号:29123
默认端口号:5672
服务停止:rabbitmqctl app_stop

4.安装RabbitMQ web管理插件
rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问管控台地址:http://192.168.11.81:15672/
默认用户名密码:guest/guest

 

 

1.4 RabbitMQ概念

RabbitMQ的整体架构

RabbitMQ核心概念

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道

几乎所有的操作都在Channel中进行
Channel是进行消息读写的通道
客户端可建立多个Channel
每个Channel代表一个会话任务

  • Message:消息

服务器和应用程序之间传送的数据,由Properties和Body组成
Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
Body则就是消息体内容

  • Virtual host:虚拟机

用于进行逻辑隔离,最上层的消息路由
一个Virtual host里面可以有若干个Exchange和Queue
同一个Virtual host里面不能有相同名称的Exchange或Queue

  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ消息的流转过程

 

第二章:RabbitMQ整合SpringBoot2.x

2.1 发送消息Producer

SpringBoot与RabbitMQ集成

  • 引入相关依赖
  • 对application.properties进行配置

1、创建名为rabbitmq-producer的maven工程pom如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>47-rabbitmq</artifactId>
<groupId>com.myimooc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rabbitmq-producer</artifactId>

<properties>
<spring.boot.version>2.0.4.RELEASE</spring.boot.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--工具类依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

 

2、编写application.properties类

# rabbitmq地址
spring.rabbitmq.addresses=10.1.195.196:5672
# rabbitmq用户名
spring.rabbitmq.username=guest
# rabbitmq密码
spring.rabbitmq.password=guest
# rabbitmq默认虚拟主机地址
spring.rabbitmq.virtual-host=/
# rabbitmq超时时间为15秒
spring.rabbitmq.connection-timeout=15000

#字符集
spring.http.encoding.charset=UTF-8
#格式化
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type= com.alibaba.druid.pool.DruidDataSource

# 项目路径
server.servlet.context-path=/
# 服务端口号
server.port=8080

 

3、创建数据库

-- ----------------------------
-- Table structure for broker_message_log
-- ----------------------------
DROP TABLE IF EXISTS `broker_message_log`;
CREATE TABLE `broker_message_log` (
`message_id` varchar(255) NOT NULL COMMENT '消息唯一ID',
`message` varchar(4000) NOT NULL COMMENT '消息内容',
`try_count` int(4) DEFAULT '0' COMMENT '重试次数',
`status` varchar(10) DEFAULT '' COMMENT '消息投递状态 0投递中,1投递成功,2投递失败',
`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重试时间',
`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`message_id` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;

 

4、编写Order类

package com.bfxy.springboot.entity;

import java.io.Serializable;

/**
* 订单实体
* @author wangc
*/
public class Order implements Serializable {
private static final long serialVersionUID = -1502291609049620042L;
private String id;
private String name;

/**
* 存储消息发送的唯一标识
*/
private String messageId;

public Order() {
}

public Order(String id, String name, String messageId) {
this.id = id;
this.name = name;
this.messageId = messageId;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getMessageId() {
return messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}
}

 

5、编写OrderSender类

package com.bfxy.springboot.producer;

import com.bfxy.springboot.entity.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 订单消息发送者
*
* @author wangc
*/
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送订单
*
* @param order 订单
* @throws Exception 异常
*/
public void send(Order order) throws Exception {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange",
"order.abcd",
order,
correlationData);
}
}

 

6、编写OrderSenderTest类

package com.bfxy.springboot;

import com.bfxy.springboot.entity.Order;
import com.bfxy.springboot.producer.OrderSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.omg.CORBA.PUBLIC_MEMBER;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootProducerApplicationTests {

@Test
public void contextLoads() {
}

@Autowired
private OrderSender orderSender;

/**
* 订单消息发送者测试
* @author wangc
*/
@Test
public void testSend1() throws Exception {
Order order = new Order();
order.setId("201808180000000001");
order.setName("测试订单1");
order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
orderSender.send(order);

}
}

 

2.2 接收消息Cunsumer

1、编写application.properties类

# SpringBoot整合rabbitMQ的基本配置:
# rabbitmq地址
spring.rabbitmq.addresses=10.1.195.196:5672
# rabbitmq用户名
spring.rabbitmq.username=guest
# rabbitmq密码
spring.rabbitmq.password=guest
# rabbitmq默认虚拟主机地址
spring.rabbitmq.virtual-host=/
# rabbitmq超时时间为15秒
spring.rabbitmq.connection-timeout=15000

#字符集
#spring.http.encoding.charset=UTF-8
#格式化
#spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
#spring.jackson.time-zone=GMT+8
#spring.jackson.default-property-inclusion=NON_NULL

spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type= com.alibaba.druid.pool.DruidDataSource

# SpringBoot整合rabbitMQ 消费端配置:
# 基本并发:5
spring.rabbitmq.listener.simple.concurrency=5
# 签收模式:手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 最大并发:10
spring.rabbitmq.listener.simple.max-concurrency=10
# 限流策略:同一时间只有1条消息发送过来消费
spring.rabbitmq.listener.simple.prefetch=1

# Server配置:
# 项目路径
server.servlet.context-path=/
# 服务端口号
server.port=8082

 

2、编写OrderReceiver类

package com.bfxy.springboot.consumer;

import com.bfxy.springboot.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* 订单接收者
*
* @author wangc
*/
@Component
public class OrderReceiver {
/**
* 接收消息
*
* @RabbitListener 绑定监听
*
* @param order   消息体内容
* @param headers 消息头内容
* @param channel 网络信道
* @throws Exception 异常
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue", durable = "true"),
exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
key = "order.*"
)

)
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 消费者操作
System.out.println("------------收到消息,开始收费-------------------");
System.out.println("订单ID" + order.getId());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收消息
//        channel.basicAck(deliveryTag, false);
}

}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: