您的位置:首页 > 大数据 > Hadoop

Hadoop_03_Zookeeper

2020-07-12 17:40 113 查看

文章目录

  • zk的watch机制
  • zk的JavaAPI操作
  • 网络编程
  • zookeeper安装

    安装leader+follower 模式的集群, 大致过程如下:

    1. 配置主机名称到 IP 地址映射配置
    2. 修改 ZooKeeper 配置文件
    3. 远程复制分发安装文件
    4. 设置 myid
    5. 启动 ZooKeeper 集群

    Observer 模式,可在对应节点的配置文件添加如下配置:
    peerType=observer
    其次,必须在配置文件指定哪些节点被指定为 Observer,如:
    server.1:localhost:2181:3181:observer

    步骤一:配置映射
    (个人配置)
    服务器IP
    192.168.190.3
    192.168.190.120
    192.168.190.130
    主机名
    node01
    node02
    node03
    myid的值
    1
    2
    3

    步骤二:解压文件
    cd /export/softwares
    tar -zxvf zookeeper-3.4.9.tar.gz -C …/servers/

    步骤三:修改配置文件
    cd /export/servers/zookeeper-3.4.9/conf/
    cp zoo_sample.cfg zoo.cfg
    mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/
    vim zoo.cfg

    步骤四:添加myid配置
    echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid

    步骤五:安装包分发并修改myid的值
    第一台机器执行:
    scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/
    scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/
    第二台修改:
    echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid
    第三台修改:
    echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid
    步骤六:分别启动每个机器
    /export/servers/zookeeper-3.4.9/bin/zkServer.sh start
    查看启动状态
    /export/servers/zookeeper-3.4.9/bin/zkServer.sh status

    zookeeper的shell操作

    zk的shell操作创建节点:永久节点
    create /abc helloworld

    创建永久的顺序节点
    create -s /bcd helloworld

    创建临时节点
    create -e /mytemp mytemp

    创建临时顺序节点
    create -s -e /mytemp2 mytemp2

    获取节点的值
    get /abc

    设置节点的值
    set /abc world

    删除节点的操作
    delete /hello 只能删除没有子节点的情况
    rmr /hello 递归的删除

    其他命令
    history : 列出命令历史
    redo:该命令可以重新执行指定命令编号的历史命令

    zk的数据模型

    ZooKeeper 的数据模型,在结构上和标准文件系统的非常相似,拥有一个层次的命名空间,都是采用树形层次结构,ZooKeeper 树中的每个节点被称为—Znode。和文件系统的目录树一样,ZooKeeper 树中的每个节点可以拥有子节点。

    不同之处:

    1. Znode 兼具文件和目录两种特点。
      既像文件一样维护着数据、元信息、ACL、 时间戳等数据结构,又像目录一样可以作为路径标识的一部分,并可以具有 子 Znode。用户对 Znode 具有增、删、改、查等操作(权限允许的情况下)。
    2. Znode 具有原子性操作。读操作将获取与节点相关的所有数据,写操作也将 替换掉节点的所有数据。
    3. Znode 存储数据大小有限制,通常以 KB 为大小单位,每个 Znode 的数据大小至多 1M,常规使用中应该远小于此值。
    4. Znode 通过路径引用,路径必须是绝对的,也是唯一的,必须由斜杠字符来开头。

    zk数据结构

    zookeeper 数据结构:

    每个节点称为一个 Znode。 每个 Znode 由 3 部分组成:

    • stat:此为状态信息, 描述该 Znode 的版本, 权限等信息
    • data:与该 Znode 关联的数据
    • children:该 Znode 下的子节点

    节点类型

    zk当中每一个节点都称之为一个znode,每个znode既具有文件夹的特性,也具有文件的特性

    节点类型

    • 永久节点
      有序的永久节点
      无序的永久节点
    • 临时节点
      有序的临时节点
      无序的临时节点

    临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话结束,临时 节点将被自动删除,当然可以也可以手动删除。临时节点不允许拥有子节点。
    永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。

    zk当中节点的四种模型

    • PERSISTENT:永久节点
    • EPHEMERAL:临时节
    • PERSISTENT_SEQUENTIAL:永久节点、序列化
    • EPHEMERAL_SEQUENTIAL:临时节点、序列化

    节点属性

    • dataVersion:数据版本号,每次对节点进行 set 操作,dataVersion 的值都会增加 1(即使设置的是相同的数据),可有效避免了数据更新时出现的先后顺序问题。
    • cversion :子节点的版本号。当 znode 的子节点有变化时,cversion 的值就会增加 1。
    • aclVersion :ACL 的版本号。
    • cZxid :Znode 创建的事务 id。
    • mZxid :Znode 被修改的事务 id,即每次对 znode 的修改都会更新 mZxid。
    • ctime:节点创建时的时间戳.
    • mtime:节点最新一次更新发生时的时间戳
    • ephemeralOwner:如果该节点为临时节点, ephemeralOwner 值表示与该节点绑定的 session id. 如果不是, ephemeralOwner 值为 0.

    在 client 和 server 通信之前,首先需要建立连接,该连接称为 session。连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处于 CLOSED
    状态, 此时 session 结束。

    zk的watch机制

    ZooKeeper 提供了分布式数据发布/订阅功能。
    一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某一个主题对象。

    ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能 。
    触发事件种类如:
    节点创建,节点删除,节点改变,子节点改变等。

    临时节点与zk的watch机制一起搭配使用,可以监听我们临时节点什么时候消失

    Watcher 为以下三个过程

    1. 客户端向服务端注册 Watcher
    2. 服务端事件发生触发 Watcher
    3. 客户端回调 Watcher 得到触发事件情况

    watch机制特点

    • 一次性触发
      监听器是一次性的,触发一次就死了

    • 事件封装
      ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递。
      WatchedEvent 包含了每一个事件的三个基本属性:
      通知状态(keeperState)
      事件类型(EventType)
      节点路径(path)

    • event异步发送
      监听器不影响节点之间的操作,是异步的线程。

    • 先注册再触发
      必须客户端先去服务端注册才能监听

    watch示例:

    1. 设置节点数据变动监听:
      get /abc watch

    2. 通过另一个客户端更改节点数据:
      set /abc hello22

    3. 此时设置监听的节点收到通知

    zk的JavaAPI操作

    这里介绍一个API:curator framework
    “ The Curator Framework is a high-level API that greatly simplifies using ZooKeeper. It adds many features that build on ZooKeeper and handles the complexity of managing connections to the ZooKeeper cluster and retrying operations. ”

    导入jar包

    <dependencies>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
    </dependency>
    <dependency>
    <groupId>com.google.collections</groupId>
    <artifactId>google-collections</artifactId>
    <version>1.0</version>
    </dependency>
    </dependencies>
    <build>
    <plugins>
    <!-- java编译插件 -->
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.2</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    <encoding>UTF-8</encoding>
    </configuration>
    </plugin>
    </plugins>
    </build>

    创建永久节点

    @Test
    public void createNode() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(1000, 1);
    //获取客户端对象
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.52.100:2181,192.168.52.110:2181,192.168.52.120:2181", 1000, 1000, retryPolicy);
    //调用start开启客户端操作
    client.start();
    //通过create来进行创建节点,并且需要指定节点类型
    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello3/world");
    client.close();
    }

    创建临时节点

    @Test
    public void createNode2() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
    client.start();
    client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello5/world");
    Thread.sleep(5000);
    client.close();
    }

    修改节点数据

    /**
    * 节点下面添加数据与修改是类似的,一个节点下面会有一个数据,新的数据会覆盖旧的数据
    * @throws Exception
    */
    @Test
    public void nodeData() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
    client.start();
    client.setData().forPath("/hello5", "hello7".getBytes());
    client.close();
    }

    节点数据查询

    @Test
    public void updateNode() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
    client.start();
    byte[] forPath = client.getData().forPath("/hello5");
    System.out.println(new String(forPath));
    client.close();
    }

    节点watch机制

    @Test
    public void watchNode() throws Exception {
    RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", policy);
    client.start();
    // ExecutorService pool = Executors.newCachedThreadPool();
    //设置节点的cache
    TreeCache treeCache = new TreeCache(client, "/hello5");
    //设置监听器和处理过程
    treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    ChildData data = event.getData();
    if(data !=null){
    switch (event.getType()) {
    case NODE_ADDED:
    System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));
    break;
    case NODE_REMOVED:
    System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));
    break;
    case NODE_UPDATED:
    System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));
    break;
    
    default:
    break;
    }
    }else{
    System.out.println( "data is null : "+ event.getType());
    }
    }
    });
    //开始监听
    treeCache.start();
    Thread.sleep(50000000);
    }

    网络编程

    网络编程是指用来实现网络互联的不同计算机上运行的程序间可以进行数据交换。对我们来说即如何用编程语言 java 实现计算机网络中不同计算机之间的通信。

    网络通信三要素

    • IP地址
    • 端口号
    • 传输协议

    IP地址
    A类网址
    192 0-254 0-254 0-254 可以带:255255255台机器
    B类网址
    192.168 0-254 0-254 可以带:255*255
    C类网址
    192.168.52 0-254

    端口号
    用于标识进程的逻辑地址,不同进程的标识;
    有效端口:0-65535,其中 0-1024 系统使用或保留端口。

    传输协议:(常见协议)
    UDP(用户数据报协议)(不需要建立连接)
    TCP(传输控制协议)(需要建立连接)

    Socket 机制
    Socket,又称为套接字,用于描述 IP 地址和端口。应用程序通常通过 socket向网络发出请求或者应答网络请求。Socket 就是为网络编程提供的一种机制,通信两端都有 socket。

    阻塞和非阻塞
    研究的是如果程序没有准备好,服务端该怎么响应的问题

    同步和异步
    研究的是CPU如果没有准备好,该怎么响应程序的问题

    RPC
    远程过程调用
    屏蔽了调用的细节,就跟我们调用本地的程序是一样的,不需要关心我们的服务端在哪里

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: