您的位置:首页 > 大数据 > Hadoop

Hadoop-Zookeeper环境搭建、Zookeeper的shell操作、Zookeeper的JavaAPI

2020-07-15 06:05 302 查看

环境搭建

1、下载Zookeeper安装包2、解压3、到Zookeeper解压后的文件夹中,将conf中zoo_sample.cfg复制并重命名为zoo.cfg4、将zoo.cfg中的autopurge.snapRetainCount=3autopurge.purgeInterval=1取消注释以及设置机器的消息传输和选举端口server.1=node01:2888:3888server.2=node02:2888:3888server.3=node03:2888:38885、在Zookeeper文件夹中新建zkdatas文件夹,在其中再新建myid向myid中添加机器idecho 1 > myid6、启动服务bin/zkServer.sh start查看状态bin/zkServer.sh status

Zookeeper的架构模型

特点节点具有文件和文件夹的特性原子性文件大小不超过1M采用绝对路径
节点PERSISTENT:永久节点EPHEMERAL:临时节点PERSISTENT_SEQUENTIAL:永久节点、序列化EPHEMERAL_SEQUENTIAL:临时节点、序列化永久节点永久无序节点create /temp aaa永久有序节点create -s /temp aaa临时节点  :不能含有子节点临时无序节点create -e /temp aaa临时有序节点create -e -s temp aaa

watch机制

监听器特点:一次性封装异步先注册再监听ls  /temp watchget /temp watch

Zookeeper的shell操作

1、进入zookeeper客户端bin/zkCli.sh2、相关操作createlssetdeletermr

Zookeeper的JavaAPI

public class ZkOperate {/*** 创建一个永久节点*/@Testpublic void createNode() throws Exception {//定义我们的重试机制ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,3);//得到客户端CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", exponentialBackoffRetry);//开启服务端curatorFramework.start();curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/bigdata/hello/abc","helloworld".getBytes());//关闭服务端curatorFramework.close();}/*** 创建临时节点*/@Testpublic  void  createTempNode() throws Exception {CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181,node02:2181", new ExponentialBackoffRetry(5000, 5));curatorFramework.start();curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/mytempNodenew","tempNode".getBytes());Thread.sleep(8000);curatorFramework.close();}/*创建永久的序列化节点  创建临时的序列化节点  留给你们做*//*** 修改节点的数据*/@Testpublic  void  udpateNodeData() throws Exception {CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181", new ExponentialBackoffRetry(5000, 5));curatorFramework.start();curatorFramework.setData().forPath("/abc", "bbb".getBytes());curatorFramework.close();}/*** 节点数据的查询*/@Testpublic void getData() throws Exception {CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181", new ExponentialBackoffRetry(5000, 5));curatorFramework.start();byte[] bytes = curatorFramework.getData().forPath("/abc");String s = new String(bytes);System.out.println(s);curatorFramework.close();}/*zk的watch机制*/@Testpublic  void  watchNode() throws Exception {CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181", new ExponentialBackoffRetry(5000, 5));curatorFramework.start();//通过使用TreeCache来监听我们的节点TreeCache treeCache = new TreeCache(curatorFramework, "/abc");treeCache.getListenable().addListener(new TreeCacheListener() {/*** 这个方法里面实现我们的监听的逻辑,所有的监听事件,都会回调这个方法* @param curatorFramework* @param event** @throws Exception*/@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {ChildData data = event.getData();if(null != data){//这个type封装的是我们的时间,比如节点的新增,节点的修改,节点的删除等等各种类型的时间TreeCacheEvent.Type type = event.getType();switch (type){case NODE_ADDED:System.out.println("节点新增操作被我监听到了");break;case INITIALIZED:System.out.println("初始化操作我监听到了");break;case NODE_REMOVED:System.out.println("节点移除操作被我监听到了");System.out.println("给运维发邮件,有服务器宕机了");break;case NODE_UPDATED:System.out.println("节点的修改操作被我监听到了");break;default:System.out.println("打印一些东西");break;}}}});//调用start方法开始监听treeCache.start();Thread.sleep(500000000);curatorFramework.close();}}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: