您的位置:首页 > 其它

zookeeper订阅与发布实现

2015-10-30 14:10 281 查看
方式一:

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.CuratorFrameworkFactory.Builder;

import org.apache.curator.framework.api.BackgroundPathable;

import org.apache.curator.framework.api.CuratorWatcher;

import org.apache.curator.framework.recipes.cache.PathChildrenCache;

import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.utils.ZKPaths;

import org.apache.zookeeper.WatchedEvent;

public class CuratorWatch {

    static CuratorFramework zkclient = null;

    static String nameSpace = "mo";

    static {

        String zkhost = "10.144.5.218:2181";// zk的host

        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制

        Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)

                .connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);

        builder.namespace(nameSpace);

        CuratorFramework zclient = builder.build();

        zkclient = zclient;

        zkclient.start();// 放在这前面执行

        zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);

        CuratorWatcher callbackWatcher = new CuratorWatcher() {

            public void process(WatchedEvent event) throws Exception {

                

                //由于监听触发后即消失,若想持续监听,需要再次追加监听

                byte[] data = (byte[]) ((BackgroundPathable) zkclient.getData().usingWatcher(this))

                        .forPath("/mojianli");

                //TODO 具体业务逻辑

            }

        };

        try {

            //首次添加监听

            ((BackgroundPathable) zkclient.getData().usingWatcher(callbackWatcher))

                    .forPath("/mojianli");

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

    public static void main(String[] args) throws Exception {

        Thread.sleep(Long.MAX_VALUE);

    }



方式二:

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.CuratorFrameworkFactory.Builder;

import org.apache.curator.framework.api.BackgroundPathable;

import org.apache.curator.framework.api.CuratorWatcher;

import org.apache.curator.framework.recipes.cache.PathChildrenCache;

import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.utils.ZKPaths;

import org.apache.zookeeper.WatchedEvent;

public class CuratorWatch {

    static CuratorFramework zkclient = null;

    static String nameSpace = "mo";

    static {

        String zkhost = "10.144.5.218:2181";// zk的host

        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);// 重试机制

        Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)

                .connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(rp);

        builder.namespace(nameSpace);

        CuratorFramework zclient = builder.build();

        zkclient = zclient;

        zkclient.start();// 放在这前面执行

        zkclient.newNamespaceAwareEnsurePath("/" + nameSpace);

       

    }

    public static void main(String[] args) throws Exception {

        watch();

        Thread.sleep(Long.MAX_VALUE);

    }

    /**

     *

     * 监听节点变化

     *

     */

    public static void watch() throws Exception {

        PathChildrenCache cache = new PathChildrenCache(zkclient, "/mo", false);

        cache.start();

        System.out.println("监听开始/zk........");

        PathChildrenCacheListener plis = new PathChildrenCacheListener() {

            @Override

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)

                    throws Exception {

                switch (event.getType()) {

                    case CHILD_ADDED: {

                        System.out.println("Node added: "

                                + ZKPaths.getNodeFromPath(event.getData().getPath()));

                        break;

                    }

                    case CHILD_UPDATED: {

                        System.out.println("Node changed: "

                                + ZKPaths.getNodeFromPath(event.getData().getPath()));

                        break;

                    }

                    case CHILD_REMOVED: {

                        System.out.println("Node removed: "

                                + ZKPaths.getNodeFromPath(event.getData().getPath()));

                        break;

                    }

                }

            }

        };

        // 注册监听

        cache.getListenable().addListener(plis);

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: