利用redis简单实现消息订阅和发布
2017-03-19 13:01
756 查看
好久没写博客了,最近关于redis消费者,生产者的功能,看了很多的资料,个人觉得很多的MQ开源都很好用,redis相对小俏,简单实现,一下先介绍redis实现;
1, redis实现消息发布和订阅,
2 spring boot初始化;
subscribe 会阻塞进程,,,,
jedis.public(通道名,消息体…) 既可以发布消息
a,上面模式适合 单一服务器使用,,如果是分布式多服务器,很多其他的MQ很好实现,redis需要使用到队列功能了(因为消息发布和订阅是一对多的模式(redis)),,,不能仅仅使用消息的订阅和发布处理数据了,只能使用消息的发布用来通知其他的分布式节点,进行抢占时向队列里面取数据,
e redis.lpush(); 和 redis.rpop() 可实现
o 工具类:
1, redis实现消息发布和订阅,
/** * Created by SDingBa.xiong on 17-3-9. */ public class RedisMsgPubSubListener extends JedisPubSub { private static final Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class); @Resource private AdOriginalityMapper adOriginalityMapper; @Override public void unsubscribe() { super.unsubscribe(); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } @Override public void subscribe(String... channels) { super.subscribe(channels); } @Override public void psubscribe(String... patterns) { super.psubscribe(patterns); } @Override public void punsubscribe() { super.punsubscribe(); } @Override public void punsubscribe(String... patterns) { super.punsubscribe(patterns); } @Override public void onMessage(String channel, String message) { logger.info("channel={}, receives message={}", channel, message); code... } @Override public void onPMessage(String pattern, String channel, String message) { } @Override public void onSubscribe(String channel, int subscribedChannels) { logger.info("====start Sub ====== channel={}, receives subscribedChannels={}", channel, subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { } @Override public void onPSubscribe(String pattern, int subscribedChannels) { } @Override public void onUnsubscribe(String channel, int subscribedChannels) { logger.info("====end Sub ======= channel={}, receives subscribedChannels={}", channel, subscribedChannels); } }
2 spring boot初始化;
@Configuration @Component public class RedisMQSub implements ApplicationListener<ContextRefreshedEvent> { private final Logger logger = LoggerFactory.getLogger(RedisMQSub.class); @Resource private RedisService redisService; // 运行 监听的 主机(123.56.86.1), 可以使用队列解决 多个服务器处理一个通知请求(先通过 MQ -> 处理队列) // private static final String CRM_ACTIVE_HOSTNAME = "iZ25o9jw9hbZ"; private static final String CRM_ACTIVE_HOSTNAME = "su"; private void redisInit() { // 本地测试 ; // Jedis jedis = new Jedis("localhost"); // System.out.println((redisService == null) + " redis server"); // RedisMsgPubSubListener listener = new RedisMsgPubSubListener(); // new Thread(() -> jedis.subscribe(listener, CreativeData.CREATIVE_ACTIVE_REDIS_MESSAGE_MQ)).start(); // listener.unsubscribe();//关闭监听 } @Override public void onApplicationEvent(ContextRefreshedEvent event) { redisInit(); } }
subscribe 会阻塞进程,,,,
jedis.public(通道名,消息体…) 既可以发布消息
a,上面模式适合 单一服务器使用,,如果是分布式多服务器,很多其他的MQ很好实现,redis需要使用到队列功能了(因为消息发布和订阅是一对多的模式(redis)),,,不能仅仅使用消息的订阅和发布处理数据了,只能使用消息的发布用来通知其他的分布式节点,进行抢占时向队列里面取数据,
e redis.lpush(); 和 redis.rpop() 可实现
o 工具类:
@Scope("singleton") @Service public class RedisService implements InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(RedisService.class); /** * Redis操作接口 * * @author 林计钦 * @version 1.0 2013-6-14 上午08:54:14 */ @Resource private SystemConfig systemConfig; private JedisPool pool = null; /** * 返还到连接池 * * @param redis */ public static void returnResource(Jedis redis) { if (redis != null) { redis.close(); } } public boolean setNX(String key, int expired) { Jedis jedis = null; int result = 0; try { jedis = pool.getResource(); result = jedis.setnx(key, "1").intValue(); jedis.expire(key, expired); } catch (CacheException e) { LOGGER.error("sedis_error.key:{}", key, e.getMessage()); } finally { returnResource(jedis); } return result > 0; } public List<String> brpop(String key) { Jedis jedis = null; List<String> listStr = Lists.newArrayList(); try { jedis = pool.getResource(); listStr = jedis.brpop(10, key); } catch (CacheException e) { LOGGER.error("sedis_error.key:{}", key, e.getMessage()); } finally { returnResource(jedis); } return listStr; } /** * 获取数据 * * @param key * @return */ public String getString(String key) { String value = null; Jedis jedis = null; try { jedis = pool.getResource(); value = jedis.get(key); } catch (Exception e) { // 释放redis对象 if (jedis != null) { jedis.close(); } LOGGER.error("jedis_pool_err", e); } finally { // 返还到连接池 returnResource(jedis); } return value; } /** * Strings结构:set command * * @param key * @param object 存储object * @param expired 过期时间(秒) */ public boolean setString(String key, Object object, int expired) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.setex(key, expired, String.valueOf(object)); return true; } catch (Exception e) { LOGGER.error("redis error:key:{};value{}", key, object, e); } finally { returnResource(jedis); } return false; } /** * Hash结构:hset command * * @param key cache key * @param field second key * @param obj 直接存对象 */ public void hset(String key, String field, Object obj, int expired) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.hset(key, field, JSON.toJSONString(obj)); jedis.expire(key.getBytes(), expired); } catch (CacheException e) { LOGGER.error("sedis_error.key:{}", key, e.getMessage()); } finally { returnResource(jedis); } } @Override public void afterPropertiesSet() throws Exception { JedisPoolConfig config = new JedisPoolConfig(); // 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 config.setMaxTotal(500); // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。 config.setMaxIdle(5); // 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException; config.setMaxWaitMillis(1000 * 100); // 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的; config.setTestOnBorrow(true); String redisIp = systemConfig.getRedisIp(); String redisPassword = systemConfig.getRedisPassword(); int redisPort = systemConfig.getRedisPort(); pool = new JedisPool(config, redisIp, redisPort, 2000, redisPassword); } /** * 获取数据 * * @param key * @return */ public Map<String, String> hgetAll(String key) { Map<String, String> value = null; Jedis jedis = null; try { jedis = pool.getResource(); value = jedis.hgetAll(key); } catch (Exception e) { // 释放redis对象 if (jedis != null) { jedis.close(); } LOGGER.error("jedis_pool_err", e); } finally { // 返还到连接池 returnResource(jedis); } return value; } public String hget(String key, String field) { Jedis jedis = null; try { jedis = pool.getResource(); return jedis.hget(key, field); } catch (CacheException e) { LOGGER.error("sedis_error.key:{}", key, e.getMessage()); } finally { returnResource(jedis); } return ""; } public Set<String> keys(String field) { Jedis jedis = null; try { jedis = pool.getResource(); return jedis.hkeys(field); } catch (CacheException e) { LOGGER.error("redis_error.key:{}", field, e.getMessage()); } finally { returnResource(jedis); } return null; } /** * Hash结构:hset command * * @param key cache key * @param field second key * @param obj 直接存对象 */ public void hset(String key, String field, Object obj) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.hset(key, field, JSON.toJSONString(obj)); } catch (CacheException e) { LOGGER.error("sedis_error.key:{}", key, e.getMessage()); } finally { returnResource(jedis); } } /** * 存储REDIS队列 顺序存储 * * @param key reids键名 * @param value 键值 */ public void lpush(String key, String value) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.lpush(key, value); } catch (Exception e) { LOGGER.error("sedis_error.key:{}", key, e); } finally { returnResource(jedis); } } /** * 获取队列数据 * * @param key 键名 */ public String rpop(String key) { String bytes = null; Jedis jedis = null; try { jedis = pool.getResource(); bytes = jedis.rpop(key); } catch (Exception e) { LOGGER.error("sedis_error.key:{}", key, e); } finally { returnResource(jedis); } return bytes; } /** * 获取 队列 长度 * * @param key 队列名 */ public long llen(String key) { long bytes = 0; Jedis jedis = null; try { jedis = pool.getResource(); bytes = jedis.llen(key); } catch (Exception e) { LOGGER.error("sedis_error.key:{}", key, e); } finally { returnResource(jedis); } return bytes; } /** * 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到! * * 存在堵塞进程 * @param jedisPubSub * @param channels */ public void subscribeMsg(JedisPubSub jedisPubSub, String channels) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.subscribe(jedisPubSub, channels); LOGGER.debug("subscribeMsg {} = {}", jedisPubSub, channels); } catch (Exception e) { LOGGER.error("subscribeMsg {} = {}", jedisPubSub, channels, e); } finally { returnResource(jedis); } } public void publishMsg(String channel, String message) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.publish(channel, message); } catch (Exception e) { LOGGER.error("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } } }
相关文章推荐
- php 实现redis发布订阅消息及时通讯
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- Java实现Redis的消息订阅和发布
- python 实现简单的redis 消息订阅推送
- ZooKeeper应用场景之消息发布订阅的简单代码实现
- 使用redis的发布订阅模式实现消息队列
- redis 消息队列发布订阅模式spring boot实现
- php 实现redis发布订阅消息及时通讯
- Java for Web学习笔记(九十):消息和集群(5)利用websocket实现订阅和发布(上)
- linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能 标签: hiredishiredis异步APIhiredis事件处理redis消息订阅发布redis c接口 2016-
- Redis中的简单事物以及消息订阅发布
- 使用Spring-Redis实现消息的发布/订阅
- Java实现Redis的消息订阅和发布
- spring-redis实现消息生产者发布和消费者订阅
- 利用Thrift和zk简单实现服务治理框架中的订阅发布机制
- Java实现redis的消息订阅和发布
- Redis实现消息的发布/订阅
- redis利用pipline实现发布订阅机制
- Java for Web学习笔记(九五):消息和集群(10)利用RabbitMQ实现订阅和发布
- 【转】redis 消息队列发布订阅模式spring boot实现