您的位置:首页 > 数据库 > Redis

利用redis实现消息队列之queue模式

2018-01-11 09:48 821 查看
可以利用redis存储数据类型的list类型实现消息发送与消费的一对一模式,使用lpush向list的左端推送数据(发送消息),使用rpop从右端接收数据(消费消息)。由于rpop需要周期性的从list中获取数据,可以考虑使用brpop代替rpop,brpop是一个阻塞方法,直到获取到数据。代码如下

生产者的pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.tansun</groupId>
<artifactId>ProducerTest</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>

</project>
生产者的主方法
package com.tansun;

import redis.clients.jedis.Jedis;

public class ProducerTest {

@SuppressWarnings("resource")
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.229.128", 6379);
// 向键为“test queue”的值的左端推入数据
jedis.lpush("test queue", "message: hello redis queue");
}

}


消费者的pom文件与生产者相同

消费者的主方法

package com.tansun;

import java.util.List;

import redis.clients.jedis.Jedis;

public class ConsumerTest {

@SuppressWarnings("resource")
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.229.128", 6379);
while(true){
// 设置超时时间为0,表示无限期阻塞
List<String> message = jedis.brpop(0, "test queue");
System.out.println(message);
}
}

}

至此,已经实现了消息队列的queue模式发送和消费消息。

看一下brpop方法源码,发现还有一个重载的方法,源码如下:

public List<String> brpop(final int timeout, final String... keys) {
return brpop(getArgsAddTimeout(timeout, keys));
}
public List<String> brpop(String... args) {
checkIsInMultiOrPipeline();
client.brpop(args);
client.setTimeoutInfinite();
try {
return client.getMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}
public void brpop(final String[] args) {
final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < bargs.length; i++) {
bargs[i] = SafeEncoder.encode(args[i]);
}
brpop(bargs);
}


protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
/*
* When client send request which formed by invalid protocol, Redis send back error message
* before close connection. We try to read it to provide reason of failure.
*/
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length()
4000
> 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}

从源码中可知这个方法可以从多个key获取元素,并且先从第一个key中获取元素,再依次向后获取,根据这个方法的特点,我们可以实现具有优先级的任务队列,代码不再赘述。

如果想了解redis实现消息队列的发布订阅模式,可以参考我的另一篇文章 http://blog.csdn.net/jia_costa/article/details/79033899
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: