zeromq/jzmq pub/sub发布订阅java代码
2012-04-26 20:37
591 查看
引用:http://iyuan.iteye.com/blog/973013
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
上代码:
发布端:
订阅者:
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
上代码:
发布端:
package com.zeromq.test.Sync_pub_sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; /** * @author 吕桂强 * @email larry.lv.word@gmail.com * @version 创建时间:2012-4-26 下午8:17:40 */ public class syncpub{ //等待10个订阅者 protected static int SUBSCRIBERS_EXPECTED = 10; public static void main (String[] args) { Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5561"); try { //zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布 Thread.sleep (1000); } catch (InterruptedException e) { e.printStackTrace(); } int update_nbr; for (update_nbr = 0; update_nbr < 10; update_nbr++){ publisher.send("Rhubarb".getBytes(), ZMQ.NOBLOCK); } publisher.send("END".getBytes(), 0); publisher.close(); context.term(); } }
订阅者:
package com.zeromq.test.Sync_pub_sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; /** * @author 吕桂强 * @email larry.lv.word@gmail.com * @version 创建时间:2012-4-26 下午8:36:42 */ public class syncsub { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5561"); subscriber.subscribe("".getBytes()); int update_nbr = 0; while (true) { byte[] stringValue = subscriber.recv(0); String string = new String(stringValue); if (string.equals("END")) { break; } update_nbr++; System.out.println("Received " + update_nbr + " updates. :" + string); } subscriber.close(); context.term(); } }
相关文章推荐
- zeromq/jzmq 基于信封-内容的pub/sub发布订阅java代码
- 【Redis】Java实现redis消息订阅/发布(PubSub)
- zeroMQ初体验-2.发布订阅模式(pub/sub)
- Redis的pub/Sub(订阅与发布)在java中的实践
- NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现
- 简单的WCF发布-订阅(Pub/Sub)服务(转)
- JMS(五):订阅/发布模式(SUB/PUB)
- 快速掌握Redis——第五招:pub/sub 发布/订阅
- ZeroMQ:订阅-发布模式的java程序示例
- redis 高级应用之二(Redis的持久化 和 消息的[pub/sub]发布和订阅)
- linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能
- 开发创建XMPP“发布订阅”扩展(xmpp pubsub extend)
- redis 高级应用之二(Redis的持久化 和 消息的[pub/sub]发布和订阅)
- zeroMQ 简单的PUB-SUB 高性能模式,java 语言版本
- redis pub/sub 发布订阅
- redis的发布订阅模式pubsub
- Redis学习笔记(九) 命令进阶:Pub/Sub(发布/订阅)操作
- Redis命令学习-Pub/Sub(发布/订阅)
- Redis的发布/订阅(pub/sub)
- Redis--pub/sub(发布与订阅)