ZooKeeper应用场景之消息发布订阅的简单代码实现
2018-03-03 13:43
656 查看
这个是主类的代码,用两个线程模拟消息发布者和订阅者
XML Code
这个是用到的工具类(写的很简单)
XML Code
XML Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | package com.lyzx.zk.test.multiThread; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooKeeper; /** * 模拟消息发布订阅 * 消息发布者发布消息和删除消息 * 消息订阅者发现消息变换后做相应的逻辑处理 */ public class PublishAndSubscribe { public static void main(String[] args) throws InterruptedException { /* * 首先在新增/test下新增 z1、z2、z3节点、然后删除掉z2节点(对于子节点的修改未涉及) */ String[] paths = {"/test/z1","/test/z2","/test/z3","/test/z2"}; String[] datas = {"v_z1" ,"v_z2" ,"v_z3" ,""}; int[] operates = {1 ,1 ,1 ,2}; //一个发布者 new Thread(new Publicer(paths,datas,operates),"X-MAX").start(); //3个订阅者 new Thread(new Subscriber("/test"),"A").start(); new Thread(new Subscriber("/test"),"B").start(); new Thread(new Subscriber("/test"),"C").start(); Thread.sleep(Long.MAX_VALUE); } } /** * 消息的发布者 * 用于对指定路径的增删改 */ class Publicer implements Runnable{ private String[] paths; private String[] datas; private int[] operates; public Publicer(String[] paths,String[] datas,int[] operates){ this.paths = paths; this.datas = datas; this.operates = operates; } @Override public void run(){ ZkUtil zkUtil = new ZkUtil(); int count = datas.length; for(int i = 0;i < count; i++){ try { TimeUnit.SECONDS.sleep(2); }catch (InterruptedException e){ e.printStackTrace(); } zkUtil.operate(paths[i],datas[i],operates[i]); System.out.println("消息发布者["+Thread.currentThread().getName()+"],path:"+paths[i]+",data:"+datas[i]+",operate:"+operates[i]); } } } /** * 消息订阅者 */ class Subscriber implements Runnable{ private String path; private ZooKeeper zk = ZkUtil.getZk(); public Subscriber(String path){ this.path = path; } public void subScribe(final String name){ try { zk.getChildren(path,new Watcher() { @Override public void process(WatchedEvent event) { EventType t = event.getType(); //只有子节点被改变(子节点被删除或者新增)才会通知它,所以在这里不做判断 System.out.println("我是消息订阅者["+name+"],当前发生的事件是:"+t); try { List<String> children = zk.getChildren(path, false); System.out.println("我是消息订阅者["+name+"],当前的子节点是:"+children); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } //一次注册事件完成后再次注册,当然上面的代码只是演示注册事件接到通知后的demo,实际业务要比这个复杂的多 subScribe(name); } }); }catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } @Override public void run(){ subScribe(Thread.currentThread().getName()); } } |
XML Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | package com.lyzx.zk.test.multiThread; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; public class ZkUtil { public static ZooKeeper zk = getZk(); public static ZooKeeper getZk(){ try { final String url = "192.168.29.167:2181,192.168.29.168:2181,192.168.29.169:2181"; ZooKeeper zk = new ZooKeeper(url,300,new Watcher() { @Override public void process(WatchedEvent event){ System.out.println(">>::"+event.getType()); } }); System.out.println("zk客户端初始化完毕....."); return zk; }catch (IOException e) { e.printStackTrace(); } return null; } public void closeZk(ZooKeeper zk){ try { if(null != zk) zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void operate(String path,String data,int operateType){ if(operateType == 1){ //create try { zk.create(path,data.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }else if(operateType == 2){ //delete try { zk.delete(path,-1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } }else if(operateType == 3){ //setData try { zk.setData(path, data.getBytes(),-1); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } } } |
相关文章推荐
- zookeeper应用场景-消息的订阅和发布
- zookeeper应用场景练习(数据发布/订阅)
- zookeeper 主要应用场景及代码实现
- 用zookeeper实现简单的发布订阅功能
- 利用redis简单实现消息订阅和发布
- ZooKeeper的典型应用场景之数据发布/订阅。
- Cocos2d-x简单游戏<植物大战僵尸>代码实现|第九部分:游戏场景GameScene.h<后续会提供源码下载链接>
- MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现
- 局部内部类的例子代码及实现最简单闹钟应用
- cocos2d-x简单游戏<打飞机>代码实现|第四部分:主场景<Helloworld.m>
- 分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载
- Cocos2d-x简单游戏<植物大战僵尸>代码实现|第三部分:通关场景<后续会提供源码下载链接>
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- Cocos2d-x简单游戏<植物大战僵尸>代码实现|第九部分:游戏场景GameScene.cpp<后续会提供源码下载链接>
- Cocos2d-x简单游戏<植物大战僵尸>代码实现|第一部分:开始场景<后续会提供源码下载链接>
- 使用php+Ajax实现唯一校验实现代码[简单应用]
- 用VBS代码实现简单的场景恢复实例
- Cocos2d-x简单游戏<捕鱼达人>代码实现|第四部分:加载场景类
- 动态分发,站内短信等web2.0应用的百万级消息机制简单实现