您的位置:首页 > 理论基础 > 计算机网络

使用zmq,probuf,缓冲池实现序列化和反序列化框架(二)-ZMQ介绍

2017-09-04 15:10 861 查看
  上一篇博客已经介绍了实现的缓冲池的原理和代码,本篇继续介绍我们构造序列化框架的基本单元-ZMQ。

  一,ZMQ介绍:

  引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ
的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”

二,本篇文章关于ZMQ底层的介绍不多,主要是搞清楚如何去合理的使用zmq,zmq有三种进行通信的方式:a,应答模式;b,订阅模式(广播一样);c,push-pull模式。前两种模式比较简单,从概念就可知大概,而且局限性多,本文着重介绍zmq的push-pull(推送-拉取)模式。

 该模式需要一端绑定(bind)一个节点(ip+端点),而外n个节点端连接(connect)到此端点,不管是bind端还是connect端都是既可作为push又可作为pull。但是只能设置其中一种模式,分情况进行讨论:

  1,假设bind到某个节点设置的是push模式,那么此节点每次发送数据,connect到该节点的任意一个节点都有可能收到(pull)数据,做到了负载平衡。

  2,假设bind到某个节点设置的是pull模式,那么所有connect到该节点任意一个节点都能像该节点发送(push)数据,而该节点会按照收到信息的顺序作队列处理。

三,下面就上面讨论的两种情况模式进行代码实例:

  1,bind到某个节点(端口5557)设置push模式:

package com.zjq.zmq.ventilator;

import org.zeromq.ZMQ;

public class Sender{
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");

System.out.println("当确认node节点准备好后按下enter键:");
System.in.read();
System.out.println("开始发送信息到各个node\n");

for (int task_nbr = 0; task_nbr < 10; task_nbr++) {
System.out.print("把任务"+task_nbr+"分发出去了\n");
sender.send(Integer.toString(task_nbr),0);
}
Thread.sleep(1000);
sender.close();
context.term();
}

}


2 (端口5557)对应的connect节点设置为pull模式,接收端口5557发送(push)的信息,,同时该节点又connect到端口5558,并设置为push模式,向端口5558发送(push)信息。

 node1:

 

package com.zjq.zmq.worker;

import org.zeromq.ZMQ;

public class Node1 {

public static void main (String[] args) throws Exception {
        ZMQ.Context context = ZMQ.context(1);

        ZMQ.Socket receiver = context.socket(ZMQ.PULL);
        receiver.connect("tcp://localhost:5557");

        ZMQ.Socket sender = context.socket(ZMQ.PUSH);
        sender.connect("tcp://localhost:5558");

        while (!Thread.currentThread ().isInterrupted ()) {
            String address = new String(receiver.recv(0)).trim();
            System.out.flush();
            System.out.print("Node1接收到信息:参数1 "+ address  + "\n");
            sender.send("Node1发送的 "+address+",", 0);
        }
        sender.close();
        receiver.close();
        context.term();
    }
}
node2:

package com.zjq.zmq.worker;

import org.zeromq.ZMQ;

public class Node2 {

public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");

ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.connect("tcp://localhost:5558");

while (!Thread.currentThread ().isInterrupted ()) {
String address = new String(receiver.recv(0)).trim();
System.out.flush();
System.out.print("Node2接收到信息:参数1 "+ address + "\n");
sender.send("Node2发送的 "+address+",", 0);
}
sender.close();
receiver.close();
context.term();
}
}
2 bind到5558节点,作为pull模式:

package com.zjq.zmq.sink;

import org.zeromq.ZMQ;

public class CollectInfo {

public static void main (String[] args) throws Exception {

ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.bind("tcp://*:5558");

int task_nbr = 0;
while(true){
byte[] b=receiver.recv(0);
if(b.length>0){
System.out.print(new String(b));
}else{
break;
}
Thread.sleep(300);
task_nbr++;
if(task_nbr%10==0){
System.out.println();
}
}

receiver.close();
context.term();
}
}


四 模型:



zmq的api中文文档
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java网络通信