您的位置:首页 > 编程语言 > Java开发

Springboot + Rabbitmq 实现简单延迟队列

2020-01-15 11:00 1016 查看

Springboot + Rabbitmq 实现简单延迟队列

一、了解Rabbitmq

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。

所以在下面使用Spring boot 整合 Rabbitmq之前,我们需要首先安装Erlang。

之后再来了解mq,MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。

附上大神的一段话

为什么选择RabbitMQ

现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?

  1. 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
  2. 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  3. 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  4. 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
  5. 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

使用场景

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

工作机制

生产者、消费者和代理

在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

生产者:消息的创建者,负责创建和推送数据到消息服务器;

消费者:消息的接收方,用于处理数据和确认消息;

代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

再次附上大佬画的图

ConnectionFactory(连接管理器):**应用程序与Rabbit之间建立连接的管理器,程序代码中使用;

Channel(信道):消息推送使用的通道;

Exchange(交换器):用于接受、分配消息;

Queue(队列):用于存储生产者的消息;

RoutingKey(路由键):用于把生成者的数据分配到交换器上;

BindingKey(绑定键):用于把交换器的消息绑定到队列上;

二丶配置环境

首先安装Erlang

链接

http://www.erlang.org/downloads

下载对应版本,一直下一步就好,之后配置环境变量。

就像配置java的环境变量一样,在此电脑的属性里配置。

怎么配置环境变量

最后效果如图

再在系统变量path里配置一下bin目录就好。

配置好之后,按住

win
+
R
,打开cmd,输入
erl
查看版本号

如果正确出现了版本号就算成功了。

之后再安装

Rabbitmq

下载:http://www.rabbitmq.com/download.html

安装完成后依旧要记得配置环境变量。

path变量的配置在上上图中可以看到。

之后在控制台中进入到sbin目录中安装RabbitMQ-Plugins。

命令是:

rabbitmq-plugins enable rabbitmq_management

成功后访问 http://localhost:15672

如果正确出现了需要输入账号密码的网站,说明成功了。

账号密码都是guest。

三丶简单实例

在idea中创建一个springboot项目,导入jar包

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>

创建连接对象

public static Connection GetRabbitConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(Config.UserName);
factory.setPassword(Config.Password);
factory.setVirtualHost(Config.VHost);
factory.setHost(Config.Host);
factory.setPort(Config.Port);
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}

这里咱们使用最简单的方式

public static void main(String[] args) {
Publisher(); // 推送消息

Consumer(); // 消费消息
}

/**
* 推送消息
*/
public static void Publisher() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);
String content = String.format("当前时间:%s", new Date().getTime());
// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
System.out.println("已发送消息:" + content);
// 关闭连接
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 消费消息
*/
public static void Consumer() {
// 创建一个连接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 创建通道
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(Config.QueueName, false, false, false, null);

// 创建订阅器,并接受消息
channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey(); // 队列名称
String contentType = properties.getContentType(); // 内容类型
String content = new String(body, "utf-8"); // 消息正文
System.out.println("消息正文:" + content);
channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
}
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
  • 点赞 3
  • 收藏
  • 分享
  • 文章举报
colahhh 发布了5 篇原创文章 · 获赞 3 · 访问量 258 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: