使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
2016-09-20 22:08
991 查看
前言:
本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换
1、jedis的消息队列方法简述
1.1、发布消息方法
(其中,channel是对应消息通道,message是对应消息体)jedis.publish(channel, message);
1.2、监听消息方法
(其中,jedisPubSub用于处理监听到的消息,channels是对应的通道)jedis.subscribe(jedisPubSub, channels);
2、发布消息
/*** 从jedis连接池获取jedis操作实例
* @return
*/
public static Jedis getJedis() {
return RedisPoolManager.getJedis();
}
/**
* 推入消息到redis消息通道
*
* @param String
* channel
* @param String
* message
*/
public static void publish(String channel, String message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
/**
* 推入消息到redis消息通道
*
* @param byte[]
* channel
* @param byte[]
* message
*/
public void publish(byte[] channel, byte[] message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
}
3、监听消息
3.1、监听消息主体方法
/*** 监听消息通道
* @param jedisPubSub - 监听任务
* @param channels - 要监听的消息通道
*/
public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
/**
* 监听消息通道
* @param jedisPubSub - 监听任务
* @param channels - 要监听的消息通道
*/
public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}
3.2、处理监听到的消息任务
class Tasker implements Runnable {private String[] channel = null;//监听的消息通道
private JedisPubSub jedisPubSub = null;//消息处理任务
public Tasker(JedisPubSub jedisPubSub, String ...channel) {
this.jedisPubSub = jedisPubSub;
this.channel = channel;
}
@Override
public void run() {
// 监听channel通道的消息
RedisMQ.subscribe(jedisPubSub, channel);
}
}
3.3、处理监听到的消息主体类实现
package cn.eguid.livePushServer.redisManager; import java.util.Map; import org.json.JSONObject; import cc.eguid.livepush.PushManager; import redis.clients.jedis.JedisPubSub; public class RedisMQHandler extends JedisPubSub{ PushManager pushManager = null; public RedisMQHandler(PushManager pushManager) { super(); this.pushManager = pushManager; } @Override // 接收到消息后进行分发执行 public void onMessage(String channel, String message) { JSONObject jsonObj = new JSONObject(message); System.out.println(channel+","+message); if ("push".equals(channel)) { Map<String,Object> map=jsonObj.toMap(); System.out.println("接收到一条推流消息,准备推流:"+map); // String appName=pushManager.push(map); //推流完成后还需要发布一个成功消息到返回队列 } else if ("close".equals(channel)) { String appName=jsonObj.getString("appName"); System.out.println("接收到一条关闭消息,准备关闭应用:"+appName); // pushManager.closePush(appName); } } }
4、测试消息队列发布和监听
public static void main(String[] args) throws InterruptedException {PushManager pushManager= new PushManagerImpl();
Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
t1.start();
t2.start();
LivePushEntity livePushInfo=new LivePushEntity();
livePushInfo.setAppName("test1");
JSONObject json=new JSONObject(livePushInfo);
publish("push",json.toString());
publish("close", json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
}
相关文章推荐
- jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换
- PHP使用php-resque库配合Redis实现MQ消息队列的教程
- jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换
- java redis使用之利用jedis实现redis消息队列
- redis实现消息队列&发布/订阅模式使用
- jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换
- 使用redis的发布订阅模式实现消息队列
- java redis使用之利用jedis实现redis消息队列
- java redis使用之利用jedis实现redis消息队列
- PHP使用php-resque库配合Redis实现MQ消息队列的教程
- 使用redis实现消息发布订阅
- 如何使用Jedis操作Redis消息队列
- jedis实现订阅发布-publish/subscribe
- 使用Redis构建消息队列和发布订阅系统
- spring+activemq实战之配置监听多队列实现不同队列消息消费
- Redis 实现消息队列 MQ
- Jedis实现Publish/Subscribe功能(发布和订阅)
- PHP下使用Redis消息队列发布微博(复制)
- PHP使用redis消息队列发布微博的方法示例
- RabbitMQ .NET消息队列使用入门(三)【MVC实现RPC例子】