您的位置:首页 > 数据库 > Redis

利用redis简单实现消息订阅和发布

2017-03-19 13:01 756 查看
好久没写博客了,最近关于redis消费者,生产者的功能,看了很多的资料,个人觉得很多的MQ开源都很好用,redis相对小俏,简单实现,一下先介绍redis实现;

1, redis实现消息发布和订阅,

/**
* Created by SDingBa.xiong on 17-3-9.
*/
public class RedisMsgPubSubListener extends JedisPubSub {

private static final Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);

@Resource
private AdOriginalityMapper adOriginalityMapper;

@Override
public void unsubscribe() {
super.unsubscribe();
}

@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
}

@Override
public void subscribe(String... channels) {
super.subscribe(channels);
}

@Override
public void psubscribe(String... patterns) {
super.psubscribe(patterns);
}

@Override
public void punsubscribe() {
super.punsubscribe();
}

@Override
public void punsubscribe(String... patterns) {
super.punsubscribe(patterns);
}

@Override
public void onMessage(String channel, String message) {
logger.info("channel={}, receives message={}", channel, message);

code...
}

@Override
public void onPMessage(String pattern, String channel, String message) {

}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
logger.info("====start Sub ====== channel={}, receives subscribedChannels={}", channel, subscribedChannels);
}

@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {

}

@Override
public void onPSubscribe(String pattern, int subscribedChannels) {

}

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
logger.info("====end Sub ======= channel={}, receives subscribedChannels={}", channel, subscribedChannels);
}
}


2 spring boot初始化;

@Configuration
@Component
public class RedisMQSub implements ApplicationListener<ContextRefreshedEvent> {

private final Logger logger = LoggerFactory.getLogger(RedisMQSub.class);

@Resource
private RedisService redisService;

// 运行 监听的 主机(123.56.86.1), 可以使用队列解决 多个服务器处理一个通知请求(先通过 MQ -> 处理队列)
// private static final String CRM_ACTIVE_HOSTNAME = "iZ25o9jw9hbZ";
private static final String CRM_ACTIVE_HOSTNAME = "su";

private void redisInit() {
// 本地测试 ;
// Jedis jedis = new Jedis("localhost");
// System.out.println((redisService == null) + " redis server");
// RedisMsgPubSubListener listener = new RedisMsgPubSubListener();
// new Thread(() -> jedis.subscribe(listener, CreativeData.CREATIVE_ACTIVE_REDIS_MESSAGE_MQ)).start();

// listener.unsubscribe();//关闭监听
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
redisInit();
}
}


subscribe 会阻塞进程,,,,

jedis.public(通道名,消息体…) 既可以发布消息

a,上面模式适合 单一服务器使用,,如果是分布式多服务器,很多其他的MQ很好实现,redis需要使用到队列功能了(因为消息发布和订阅是一对多的模式(redis)),,,不能仅仅使用消息的订阅和发布处理数据了,只能使用消息的发布用来通知其他的分布式节点,进行抢占时向队列里面取数据,

e redis.lpush(); 和 redis.rpop() 可实现

o 工具类:

@Scope("singleton")
@Service
public class RedisService implements InitializingBean {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisService.class);
/**
* Redis操作接口
*
* @author 林计钦
* @version 1.0 2013-6-14 上午08:54:14
*/

@Resource
private SystemConfig systemConfig;
private JedisPool pool = null;

/**
* 返还到连接池
*
* @param redis
*/
public static void returnResource(Jedis redis) {
if (redis != null) {
redis.close();
}
}

public boolean setNX(String key, int expired) {
Jedis jedis = null;
int result = 0;
try {
jedis = pool.getResource();
result = jedis.setnx(key, "1").intValue();
jedis.expire(key, expired);
} catch (CacheException e) {
LOGGER.error("sedis_error.key:{}", key, e.getMessage());
} finally {
returnResource(jedis);
}
return result > 0;
}

public List<String> brpop(String key) {
Jedis jedis = null;
List<String> listStr = Lists.newArrayList();
try {
jedis = pool.getResource();
listStr = jedis.brpop(10, key);
} catch (CacheException e) {
LOGGER.error("sedis_error.key:{}", key, e.getMessage());
} finally {
returnResource(jedis);
}
return listStr;
}

/**
* 获取数据
*
* @param key
* @return
*/
public String getString(String key) {
String value = null;

Jedis jedis = null;
try {
jedis = pool.getResource();
value = jedis.get(key);
} catch (Exception e) {
// 释放redis对象
if (jedis != null) {
jedis.close();
}
LOGGER.error("jedis_pool_err", e);
} finally {
// 返还到连接池
returnResource(jedis);
}
return value;
}

/**
* Strings结构:set command
*
* @param key
* @param object 存储object
* @param expired 过期时间(秒)
*/
public boolean setString(String key, Object object, int expired) {
Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.setex(key, expired, String.valueOf(object));
return true;
} catch (Exception e) {
LOGGER.error("redis error:key:{};value{}", key, object, e);
} finally {
returnResource(jedis);
}
return false;
}

/**
* Hash结构:hset command
*
* @param key cache key
* @param field second key
* @param obj 直接存对象
*/
public void hset(String key, String field, Object obj, int expired) {
Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.hset(key, field, JSON.toJSONString(obj));
jedis.expire(key.getBytes(), expired);
} catch (CacheException e) {
LOGGER.error("sedis_error.key:{}", key, e.getMessage());
} finally {
returnResource(jedis);
}

}

@Override
public void afterPropertiesSet() throws Exception {
JedisPoolConfig config = new JedisPoolConfig();
// 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(500);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
String redisIp = systemConfig.getRedisIp();
String redisPassword = systemConfig.getRedisPassword();
int redisPort = systemConfig.getRedisPort();
pool = new JedisPool(config, redisIp, redisPort, 2000, redisPassword);

}

/**
* 获取数据
*
* @param key
* @return
*/
public Map<String, String> hgetAll(String key) {
Map<String, String> value = null;

Jedis jedis = null;
try {
jedis = pool.getResource();
value = jedis.hgetAll(key);
} catch (Exception e) {
// 释放redis对象
if (jedis != null) {
jedis.close();
}
LOGGER.error("jedis_pool_err", e);
} finally {
// 返还到连接池
returnResource(jedis);
}
return value;
}

public String hget(String key, String field) {
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.hget(key, field);
} catch (CacheException e) {
LOGGER.error("sedis_error.key:{}", key, e.getMessage());
} finally {
returnResource(jedis);
}
return "";
}

public Set<String> keys(String field) {
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.hkeys(field);
} catch (CacheException e) {
LOGGER.error("redis_error.key:{}", field, e.getMessage());
} finally {
returnResource(jedis);
}
return null;
}

/**
* Hash结构:hset command
*
* @param key cache key
* @param field second key
* @param obj 直接存对象
*/
public void hset(String key, String field, Object obj) {
Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.hset(key, field, JSON.toJSONString(obj));
} catch (CacheException e) {
LOGGER.error("sedis_error.key:{}", key, e.getMessage());
} finally {
returnResource(jedis);
}

}

/**
* 存储REDIS队列 顺序存储
*
* @param key reids键名
* @param value 键值
*/
public void lpush(String key, String value) {

Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
LOGGER.error("sedis_error.key:{}", key, e);
} finally {
returnResource(jedis);
}

}

/**
* 获取队列数据
*
* @param key 键名
*/
public String rpop(String key) {
String bytes = null;
Jedis jedis = null;
try {
jedis = pool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
LOGGER.error("sedis_error.key:{}", key, e);
} finally {
returnResource(jedis);
}
return bytes;
}

/**
* 获取 队列 长度
*
* @param key 队列名
*/
public long llen(String key) {
long bytes = 0;
Jedis jedis = null;
try {
jedis = pool.getResource();
bytes = jedis.llen(key);
} catch (Exception e) {
LOGGER.error("sedis_error.key:{}", key, e);
} finally {
returnResource(jedis);
}
return bytes;
}

/**
* 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
*
* 存在堵塞进程
* @param jedisPubSub
* @param channels

*/
public void subscribeMsg(JedisPubSub jedisPubSub, String channels) {
Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.subscribe(jedisPubSub, channels);
LOGGER.debug("subscribeMsg {} = {}", jedisPubSub, channels);
} catch (Exception e) {
LOGGER.error("subscribeMsg {} = {}", jedisPubSub, channels, e);
} finally {
returnResource(jedis);
}
}

public void publishMsg(String channel, String message) {
Jedis jedis = null;
try {
jedis = pool.getResource();

jedis.publish(channel, message);
} catch (Exception e) {
LOGGER.error("publishMsg {} = {}", channel, message, e);
} finally {
returnResource(jedis);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis