您的位置:首页 > 数据库 > Redis

redis队列-生产消费模式-简单实现

2018-02-19 12:27 459 查看
生产消费,不外乎就是生产新的消息插入到队列尾巴,消费者从队列头部取消息。
基于此,简单实现如下:(还有一种稍微复杂的实现,是结合了spring的实现,复杂实现

往jedis队列尾部塞入消息

/**
* 往列表尾部插入数据
*
* @param key
* @param value
*/
public static void rpush(String key, String value) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.rpush(key, value);
}
}

从jedis队列头部取出数据

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

public class RedisConsumer implements Runnable{
private Logger logger = LoggerFactory.getLogger(RedisConsumer.class);
private static int MAX_ERR_TIMES = 10;
private static int EXCEPTION_SLEEP_SECONDS = 3;
private String key;

public RedisConsumer(String key){
this.key = key;
logger.info("创建消费者");
}

@Override
public void run() {
logger.info("消费者执行,");
try (Jedis jedis = JedisUtil.getJedisPoolUtil().getJedis()) {
int errorTimes = 0;
boolean runFlag = true;
while(runFlag){
try {
String msg = jedis.blpop(0,key).get(1);
logger.info("消费消息,msg={}",msg);
//业务逻辑处理
...
errorTimes = 0;
}catch (Exception e){
errorTimes++;
if(errorTimes > MAX_ERR_TIMES){
logger.warn("消费者异常次数超过阈值,关闭线程,请查看redis服务是否关闭或异常");
runFlag = false;
break;
}
try {
Thread.sleep(EXCEPTION_SLEEP_SECONDS*1000);
} catch (InterruptedException e1) {
logger.warn("消费者异常,睡眠被打断",e1);
}
logger.warn("消费者异常",e);
}
}
}
}
}

jedis工具类

import redis.clients.jedis.Jedis;

public class JedisUtil {
private static JedisPoolUtil jedisPoolUtil = SpringUtils.getBean("jedisPoolUtil");

/**
* 返回连接池
*/
public static JedisPoolUtil getJedisPoolUtil() {
return jedisPoolUtil;
}

/**
* 设置有效期的字符串缓存
*
* @param key
* @param value
* @param seconds
*/
public static void set(String key, String value, int seconds) {
/**
* JDK 7新特性写法,实现了AutoCloseable接口,这个写法会自动调用close方法关闭
*/
try (Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.setex(key, seconds, value);
}
}
/**
* 设置字符串缓存
*
* @param key
* @param value
*/
public static void set(String key, String value) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.set(key, value)
4000
;
}
}
/**
* 设置byte[]缓存
*
* @param key
* @param value
*/
public static void setByte(byte[] key, byte[] value,int expire) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.set(key, value);
expireBytesKey(key, expire);
}
}
/**
* 设置有效期的字符串缓存
*
* @param key
* @param value
* @param seconds
*/
public static void setex(String key, int seconds, String value) {
try (Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.setex(key, seconds, value);
}
}
/**
* 获取字符串数据
*
* @param key
* @return
*/
public static String get(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.get(key);
}
}
/**
* 获取byte[]数据
*
* @param key
* @return
*/
public static byte[] getByte(byte[] key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.get(key);
}
}
/**
* 往列表头部插入数据
*
* @param key
* @param value
*/
public static void lpush(String key, String value) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.lpush(key, value);
}
}
/** * 往列表尾部插入数据 * * @param key * @param value */ public static void rpush(String key, String value) { try(Jedis jedis = jedisPoolUtil.getJedis()) { jedis.rpush(key, value); } }
/**
* 从列表头部获取数据
*
* @param key
* @return
*/
public static String lpop(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.lpop(key);
}
}
/**
* 从列表尾部获取数据
*
* @param key
* @return
*/
public static String rpop(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.rpop(key);
}
}
/**
* 获取列表长度
*
* @param key
* @return 列表长度,若返回-1则表示取值发生异常
*/
public static long llen(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.llen(key);
}
}
/**
* 计数器递增
* @param key
* @return 执行 incr 命令之后 key 的值
*/
public static long incr(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.incr(key);
}
}
/**
* 计数器递减
* @param key
* @return 执行 decr 命令之后 key 的值
*/
public static long decr(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.decr(key);
}
}
/**
* 删除键
* @param key
* @return 执行 decr 命令之后 key 的值
*/
public static long delByte(byte[] key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.del(key);
}
}
/**
* 设置key有效期
* @param key
* @param seconds
*/
public static void expire(String key,int seconds) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.expire(key, seconds);
}
}
public static void expireBytesKey(byte[] key, int expire){
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.expire(key, expire);
}
}
/**
* 判断member是否是集合key的成员
* @param key
* @param member
* @return
*/
public static boolean sismember(String key, String member) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.sismember(key, member);
}
}
/**
* 往集合SET添加数据
* @param key
* @param member
*/
public static void sadd(String key,String member) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.sadd(key, member);
}
}

/**
* 获取键的剩余有效秒数
* 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以毫秒为单位,返回 key 的剩余生存时间
* 注意:在 Redis 2.8 以前,当 key 不存在,或者 key 没有设置剩余生存时间时,命令都返回 -1
* @param key
*/
public static Long pttl(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.pttl(key);
}
}

/**
* 删除键
* @param key
*/
public static void del(String key) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
jedis.del(key);
}
}

/**
* 获取List第n个值
* @param key
*/
public static String lindex(String key,Long index) {
try(Jedis jedis = jedisPoolUtil.getJedis()) {
return jedis.lindex(key,index);
}
}
}

制造多个消费者

new Thread(new RedisConsumer(msgProcessor, MsgConstant.REDIS_QUEUE_KEY)).start();
logger.info("第一个消费者启动完成");
new Thread(new RedisConsumer(msgProcessor,MsgConstant.REDIS_QUEUE_KEY)).start();
logger.info("第二个消费者启动完成");

注意

blpop:阻塞式从redis消息队列头部取出消息。如果没有取到消息,就会导致链接阻塞,直到有新的消息生产并存进队列。
lpop:非阻塞式从redis消息队列头部取出消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: