您的位置:首页 > 编程语言

ZooKeeper应用场景之消息发布订阅的简单代码实现

2018-03-03 13:43 656 查看
这个是主类的代码,用两个线程模拟消息发布者和订阅者

 XML Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

package com.lyzx.zk.test.multiThread;

import java.util.List;

import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.EventType;

import org.apache.zookeeper.ZooKeeper;

/**

 *  模拟消息发布订阅

 *      消息发布者发布消息和删除消息

 *      消息订阅者发现消息变换后做相应的逻辑处理

 */

public class PublishAndSubscribe {

    

    public static void main(String[] args) throws InterruptedException {

        /*

         * 首先在新增/test下新增 z1、z2、z3节点、然后删除掉z2节点(对于子节点的修改未涉及)

         */

        String[] paths = {"/test/z1","/test/z2","/test/z3","/test/z2"};

        String[] datas = {"v_z1"    ,"v_z2"    ,"v_z3"    ,""};

        int[] operates = {1         ,1         ,1         ,2};

        //一个发布者

        new Thread(new Publicer(paths,datas,operates),"X-MAX").start();

        

        //3个订阅者

        new Thread(new Subscriber("/test"),"A").start();

        new Thread(new Subscriber("/test"),"B").start();

        new Thread(new Subscriber("/test"),"C").start();

        

        Thread.sleep(Long.MAX_VALUE);

    }

}

/**

 * 消息的发布者

 *  用于对指定路径的增删改

 */

class Publicer implements Runnable{

    private String[] paths;

    private String[] datas;

    private int[] operates;

    

    public Publicer(String[] paths,String[] datas,int[] operates){

        this.paths = paths;

        this.datas = datas;

        this.operates = operates;

    }

    

    @Override

    public void run(){

        ZkUtil zkUtil = new ZkUtil();

        int count = datas.length;

        for(int i = 0;i < count; i++){

            try {

                TimeUnit.SECONDS.sleep(2);

            }catch (InterruptedException e){

                e.printStackTrace();

            }

            zkUtil.operate(paths[i],datas[i],operates[i]);

            System.out.println("消息发布者["+Thread.currentThread().getName()+"],path:"+paths[i]+",data:"+datas[i]+",operate:"+operates[i]);

        }

    }

}

/**

 * 消息订阅者 

 */

class Subscriber implements Runnable{

    private String path;

    private ZooKeeper zk = ZkUtil.getZk();

    

    public Subscriber(String path){

        this.path = path;

    }

    

    public void subScribe(final String name){

        try {

            zk.getChildren(path,new Watcher() {

                @Override

                public void process(WatchedEvent event) {

                    EventType t = event.getType();

                    //只有子节点被改变(子节点被删除或者新增)才会通知它,所以在这里不做判断

                    System.out.println("我是消息订阅者["+name+"],当前发生的事件是:"+t);

                    try {

                        List<String> children = zk.getChildren(path, false);

                        System.out.println("我是消息订阅者["+name+"],当前的子节点是:"+children);

                    } catch (KeeperException | InterruptedException e) {

                        e.printStackTrace();

                    }

                    //一次注册事件完成后再次注册,当然上面的代码只是演示注册事件接到通知后的demo,实际业务要比这个复杂的多

                    subScribe(name);

                }

            });

        }catch (KeeperException | InterruptedException e) {

            e.printStackTrace();

        }

    }

    

    @Override

    public void run(){

        subScribe(Thread.currentThread().getName());

    }

}
这个是用到的工具类(写的很简单)

 XML Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

package com.lyzx.zk.test.multiThread;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.ZooDefs.Ids;

public class ZkUtil {

    public static ZooKeeper zk = getZk();

    

    public static ZooKeeper getZk(){

        try {

            final String url = "192.168.29.167:2181,192.168.29.168:2181,192.168.29.169:2181";

            ZooKeeper zk = new ZooKeeper(url,300,new Watcher() {

                @Override

                public void process(WatchedEvent event){

                    System.out.println(">>::"+event.getType());

                }

            });

            System.out.println("zk客户端初始化完毕.....");

            return zk;

        }catch (IOException e) {

            e.printStackTrace();

        }

        return null;

    }

    public void closeZk(ZooKeeper zk){

            try {

                if(null != zk)

                zk.close();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

    }

    

    public void operate(String path,String data,int operateType){

        if(operateType == 1){

            //create

            try {

                zk.create(path,data.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);

            } catch (KeeperException | InterruptedException e) {

                e.printStackTrace();

            }

        }else if(operateType == 2){

            //delete

            try {

                zk.delete(path,-1);

            } catch (InterruptedException | KeeperException e) {

                e.printStackTrace();

            }

        }else if(operateType == 3){

            //setData

            try {

                zk.setData(path, data.getBytes(),-1);

            } catch (KeeperException | InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐