您的位置:首页 > 编程语言 > Java开发

zeromq/jzmq pub/sub发布订阅java代码

2012-04-26 20:37 591 查看
引用:http://iyuan.iteye.com/blog/973013

这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。 

按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。 

 上代码:
发布端:
package com.zeromq.test.Sync_pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
* @author 吕桂强
* @email larry.lv.word@gmail.com
* @version 创建时间:2012-4-26 下午8:17:40
*/
public class syncpub{
//等待10个订阅者
protected static int SUBSCRIBERS_EXPECTED = 10;

public static void main (String[] args) {
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5561");
try {
//zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布
Thread.sleep (1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++){
publisher.send("Rhubarb".getBytes(), ZMQ.NOBLOCK);
}
publisher.send("END".getBytes(), 0);

publisher.close();
context.term();
}
}


订阅者:
package com.zeromq.test.Sync_pub_sub;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
* @author 吕桂强
* @email larry.lv.word@gmail.com
* @version 创建时间:2012-4-26 下午8:36:42
*/
public class syncsub {
public static void main(String[] args) {
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5561");
subscriber.subscribe("".getBytes());
int update_nbr = 0;
while (true) {
byte[] stringValue = subscriber.recv(0);
String string = new String(stringValue);
if (string.equals("END")) {
break;
}
update_nbr++;
System.out.println("Received " + update_nbr + " updates. :" + string);
}

subscriber.close();
context.term();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息