使用zmq,probuf,缓冲池实现序列化和反序列化框架(二)-ZMQ介绍
2017-09-04 15:10
861 查看
上一篇博客已经介绍了实现的缓冲池的原理和代码,本篇继续介绍我们构造序列化框架的基本单元-ZMQ。
一,ZMQ介绍:
引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ
的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”
二,本篇文章关于ZMQ底层的介绍不多,主要是搞清楚如何去合理的使用zmq,zmq有三种进行通信的方式:a,应答模式;b,订阅模式(广播一样);c,push-pull模式。前两种模式比较简单,从概念就可知大概,而且局限性多,本文着重介绍zmq的push-pull(推送-拉取)模式。
该模式需要一端绑定(bind)一个节点(ip+端点),而外n个节点端连接(connect)到此端点,不管是bind端还是connect端都是既可作为push又可作为pull。但是只能设置其中一种模式,分情况进行讨论:
1,假设bind到某个节点设置的是push模式,那么此节点每次发送数据,connect到该节点的任意一个节点都有可能收到(pull)数据,做到了负载平衡。
2,假设bind到某个节点设置的是pull模式,那么所有connect到该节点任意一个节点都能像该节点发送(push)数据,而该节点会按照收到信息的顺序作队列处理。
三,下面就上面讨论的两种情况模式进行代码实例:
1,bind到某个节点(端口5557)设置push模式:
package com.zjq.zmq.ventilator;
import org.zeromq.ZMQ;
public class Sender{
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
System.out.println("当确认node节点准备好后按下enter键:");
System.in.read();
System.out.println("开始发送信息到各个node\n");
for (int task_nbr = 0; task_nbr < 10; task_nbr++) {
System.out.print("把任务"+task_nbr+"分发出去了\n");
sender.send(Integer.toString(task_nbr),0);
}
Thread.sleep(1000);
sender.close();
context.term();
}
}
2 (端口5557)对应的connect节点设置为pull模式,接收端口5557发送(push)的信息,,同时该节点又connect到端口5558,并设置为push模式,向端口5558发送(push)信息。
node1:
四 模型:
zmq的api中文文档
一,ZMQ介绍:
引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ
的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”
二,本篇文章关于ZMQ底层的介绍不多,主要是搞清楚如何去合理的使用zmq,zmq有三种进行通信的方式:a,应答模式;b,订阅模式(广播一样);c,push-pull模式。前两种模式比较简单,从概念就可知大概,而且局限性多,本文着重介绍zmq的push-pull(推送-拉取)模式。
该模式需要一端绑定(bind)一个节点(ip+端点),而外n个节点端连接(connect)到此端点,不管是bind端还是connect端都是既可作为push又可作为pull。但是只能设置其中一种模式,分情况进行讨论:
1,假设bind到某个节点设置的是push模式,那么此节点每次发送数据,connect到该节点的任意一个节点都有可能收到(pull)数据,做到了负载平衡。
2,假设bind到某个节点设置的是pull模式,那么所有connect到该节点任意一个节点都能像该节点发送(push)数据,而该节点会按照收到信息的顺序作队列处理。
三,下面就上面讨论的两种情况模式进行代码实例:
1,bind到某个节点(端口5557)设置push模式:
package com.zjq.zmq.ventilator;
import org.zeromq.ZMQ;
public class Sender{
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
System.out.println("当确认node节点准备好后按下enter键:");
System.in.read();
System.out.println("开始发送信息到各个node\n");
for (int task_nbr = 0; task_nbr < 10; task_nbr++) {
System.out.print("把任务"+task_nbr+"分发出去了\n");
sender.send(Integer.toString(task_nbr),0);
}
Thread.sleep(1000);
sender.close();
context.term();
}
}
2 (端口5557)对应的connect节点设置为pull模式,接收端口5557发送(push)的信息,,同时该节点又connect到端口5558,并设置为push模式,向端口5558发送(push)信息。
node1:
package com.zjq.zmq.worker; import org.zeromq.ZMQ; public class Node1 { public static void main (String[] args) throws Exception { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.connect("tcp://localhost:5557"); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.connect("tcp://localhost:5558"); while (!Thread.currentThread ().isInterrupted ()) { String address = new String(receiver.recv(0)).trim(); System.out.flush(); System.out.print("Node1接收到信息:参数1 "+ address + "\n"); sender.send("Node1发送的 "+address+",", 0); } sender.close(); receiver.close(); context.term(); } }node2:
package com.zjq.zmq.worker; import org.zeromq.ZMQ; public class Node2 { public static void main (String[] args) throws Exception { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.connect("tcp://localhost:5557"); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.connect("tcp://localhost:5558"); while (!Thread.currentThread ().isInterrupted ()) { String address = new String(receiver.recv(0)).trim(); System.out.flush(); System.out.print("Node2接收到信息:参数1 "+ address + "\n"); sender.send("Node2发送的 "+address+",", 0); } sender.close(); receiver.close(); context.term(); } }2 bind到5558节点,作为pull模式:
package com.zjq.zmq.sink; import org.zeromq.ZMQ; public class CollectInfo { public static void main (String[] args) throws Exception { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.bind("tcp://*:5558"); int task_nbr = 0; while(true){ byte[] b=receiver.recv(0); if(b.length>0){ System.out.print(new String(b)); }else{ break; } Thread.sleep(300); task_nbr++; if(task_nbr%10==0){ System.out.println(); } } receiver.close(); context.term(); } }
四 模型:
zmq的api中文文档
相关文章推荐
- 使用zmq,probuf,缓冲池实现序列化和反序列化框架(一)-使用ConcurrentLinkedQueue实现缓冲池
- java序列化/反序列化之xml、protobuf、protostuff 的比较与使用例子
- java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子
- java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子
- java序列化/反序列化之xml、protobuf、protostuff 的比较与使用例子
- java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子
- java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子
- C#实现JSON序列化与反序列化介绍
- 使用工具类,轻松实现XML序列化、反序列化
- C#实现JSON序列化与反序列化介绍
- java中使用protobuf序列化(反序列化)
- Google Protobuf - 实现跨平台跨语言的序列化/反序列化
- java 常用序列化和反序列化框架使用demo
- 使用AjaxPro框架实现无刷新用户登录验证【原创】
- 在java中使用kryo框架来实现高效序列化与反序列化
- C#使用DataContractJsonSerializer实现Json格式的序列化和反序列化
- ASP.NET C#使用JavaScriptSerializer实现序列化与反序列化得到JSON
- Google Protobuf——实现跨平台跨语言的序列化/反序列化
- c#序列化与反序列化通用方法, 使用protobuf-net实现
- 【protobuf】 3.使用protobuf-csharp-port进行序列化和反序列化