您的位置:首页 > 其它

服务框架学习(一):消息队列的原理与实现

2019-02-21 23:31 337 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq272437543/article/details/87868990

消息队列

1. 概念

消息队列是消息请求的队列,是承载消息请求的队列

在实际应用场景中,有队列,发送者和接收者
基本的工作原理是:发送者向指定的队列发送消息,该队列让消息入栈,然后当接收者监听指定的队列时,就会接收到来自发送者的消息。

2. 简单实现方法

消息队列在后端十分有用:

  1. 应用解除耦合:每次只用一个简单的方法,不需要和其他接口打交道
  2. 处理异步任务:发送者发布消息后就不用再管了,接受者需要接收消息就可以得到
  3. 减少访问峰值:发送端可能短时间有超大的访问量,接收端处理速度没那么快,不过没关系,接收者可以一件一件的完成,防止服务器因峰值访问量过高而宕机(广泛应用于抢购)

其实具体的实现代码是很简单的(拿SpringBoot下整合Kafka举例)
pom.xml 中添加依赖

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

发送端1.java

@Autowired // 用于执行高级操作的模块
private KafkaTemplate<String, String> kafkaTemplate;
public void sendChannelMessage(String channel, String message)
{
// 向指定channel发送message
kafkaTemplate.send(channel, message);
}

接收端1.java(监听topics为t1的队列)

@KafkaListener(topics={"t1"})
public void receive(String message)
{
System.out.printlb("received: " + message);
}

当发送端1向topics=t1的队列发送消息时
测试类.java

@Test
public void senderTest()
{
new Sender().sendChannelMessage("t1", "Hello Drake!");
}

接收端1就可以收打印

received: Hello Drake!

我们注意topics是数组,说明接收端可以监听多个队列

3. 简单手写实现

学习一个框架的好方法是尝试手写一个,能理解原理
首先写一个队列类,要求是能实现存入发送端的信息,并让接收端能得到消息
MyMessageQueue.java

// 简单的队列 (java.util.queue) 即可
private final Queue<String> queue = new LinkedList<>();
// 向本对象发送消息
public void send(String msg)
{
try{
synchronized(this)
{
// 恢复getMessage()方法开启的线程
nodify();
queue.add(msg);
}
}catch(Exception e){
}
}

public String getMessage()
{
// 开一个接收消息的
new Thread(new Runnable{
public void run()
{
while (true)
{
try{
synchronized(this)
{
if (!queue.isEmpty())
{
return queue.poll();
}
wait();
// 若队列没有东西,则将线程暂停,等待消息入队
}
}catch(Exception e){
}
}
}
}).start();
}

调用方法:
Main.java

// 省去主函数
int index = 0;
void sendTest()
{
MyMessageQueue mq = new MyMessageQueue();
new Thread(new Runnable{
public void run()
{
while(true)
{
// 每秒钟发送一次数据
String msg = "Drake " + index++;
mq.send(msg);
System.out.println("send: " + msg);
try{
Thread.sleep(1000);
}catch(Exception e){
}
}
}
}).start();
receiveTest(mq);
}
void receiveTest(MyMessageQueue mq)
{
new Thread(new Runnable{
public void run()
{
while(true)
{
// 每秒钟发送一次数据
String msg = mq.receive();
if (msg != null)
System.out.println("receive: " + msg);
try{
Thread.sleep(2000);
}catch(Exception e){
}
}
}
}).start();
}

OK,开始测试:

send: Drake 0
receive: Drake 0
send: Drake 1
send: Drake 2
receive: Drake 1
send: Drake 3
send: Drake 4
receive: Drake 2
send: Drake 5

可见这个效果和我们预期的差不对。当然我们的MQ只是简单的实现基本功能,要达到可用还差点,比如多线程队列调度,优化,网络服务等。
源代码:https://github.com/272437543/FrameWorkStudy
喜欢的star一下哟

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐