代码笔记 | zeromq 的 router学习分析
2016-09-01 14:57
309 查看
// lbbroker_cpp.cpp : 定义控制台应用程序的入口点。 // /************************************************************** 技术博客 http://www.cnblogs.com/itdef/ **************************************************************/ #include "stdafx.h" #include "zhelpers.hpp" #include <thread> #include <queue> void client_thread(void* arg) { zmq::context_t context(1); zmq::socket_t client(context, ZMQ_REQ); s_set_id(client, (intptr_t)1); client.connect("tcp://localhost:5672"); s_send(client, "HELLO"); std::string reply = s_recv(client); std::cout << "Client: " << reply << std::endl; return; } void worker_thread(void* arg){ zmq::context_t context(1); zmq::socket_t worker(context, ZMQ_REQ); s_set_id(worker, (intptr_t)9); worker.connect("tcp://localhost:5673"); s_send(worker, "READY"); while (1) { std::string address = s_recv(worker); { std::string empty = s_recv(worker); assert(empty.size() == 0); } std::string request = s_recv(worker); std::cout << "Worker: " << request << std::endl; s_sendmore(worker, address); s_sendmore(worker, ""); s_send(worker, "OK"); } return; } int main() { zmq::context_t context(1); zmq::socket_t frontend(context, ZMQ_ROUTER); zmq::socket_t backend(context, ZMQ_ROUTER); frontend.bind("tcp://*:5672"); backend.bind("tcp://*:5673"); int client_nbr; std::thread client[1]; for (client_nbr = 0; client_nbr < 1; client_nbr++) { client[client_nbr] = std::thread(client_thread,(void *)(intptr_t)client_nbr); } int worker_nbr; std::thread worker[1]; for (worker_nbr = 0; worker_nbr < 1; worker_nbr++) { worker[worker_nbr] = std::thread(worker_thread, (void *)(intptr_t)worker_nbr); } std::queue<std::string> worker_queue; while (1) { zmq::pollitem_t items[] = { {backend,0,ZMQ_POLLIN,0}, {frontend,0,ZMQ_POLLIN,0} }; if (worker_queue.size()) zmq::poll(&items[0], 2, -1); else zmq::poll(&items[0], 1, -1); if (items[0].revents & ZMQ_POLLIN) { // Queue worker address for LRU routing worker_queue.push(s_recv(backend)); { // Second frame is empty std::string empty = s_recv(backend); assert(empty.size() == 0); } // Third frame is READY or else a client reply address std::string client_addr = s_recv(backend); // If client reply, send rest back to frontend if (client_addr.compare("READY") != 0) { { std::string empty = s_recv(backend); assert(empty.size() == 0); } std::string reply = s_recv(backend); s_sendmore(frontend, client_addr); s_sendmore(frontend, ""); s_send(frontend, reply); if (--client_nbr == 0) break; } } if (items[1].revents & ZMQ_POLLIN) { // Now get next client request, route to LRU worker // Client request is [address][empty][request] std::string client_addr = s_recv(frontend); { std::string empty = s_recv(frontend); assert(empty.size() == 0); } std::string request = s_recv(frontend); std::string worker_addr = worker_queue.front();//worker_queue [0]; worker_queue.pop(); s_sendmore(backend, worker_addr); s_sendmore(backend, ""); s_sendmore(backend, client_addr); s_sendmore(backend, ""); s_send(backend, request); } } for (client_nbr = 0; client_nbr < 1; client_nbr++) { client[client_nbr].join(); } for (worker_nbr = 0; worker_nbr < 1; worker_nbr++) { worker[worker_nbr].join(); } return 0; }
相关文章推荐
- ARM&LINUX学习笔记(6)------启动代码分析
- eCos学习笔记之中断处理代码分析(转)
- contiki学习笔记-UDP-Client原厂代码分析
- 传智播客c/c++公开课学习笔记--C语言与木马恶意代码分析和360安全防护揭秘
- 32位汇编语言学习笔记(12)--分析switch语句的汇编代码
- HADOOP的学习笔记 (第五期) hadoop示例代码分析
- Androidpn学习笔记-客户端代码分析
- 32位汇编语言学习笔记(8)--分析do-while循环的汇编代码
- Linux netfilter 学习笔记 之九 ip层netfilter的连接跟踪模块代码分析
- Linux netfilter 学习笔记 之十二 ip层netfilter的NAT模块代码分析
- 恶意代码分析实战(7-01实验学习笔记)
- OpenCV学习笔记(三)人脸检测的代码分析(1)
- 学习笔记 --- LINNUX 使用异步通讯机制实现按键驱动代码分析
- eCos学习笔记之启动代码分析
- EasyARM2200开发板学习笔记:启动代码分析
- 新增代码关联Sonar分析学习笔记一
- 学习笔记:1.Hello,World代码分析,函数
- 学习笔记 --- MINI2440 linux按键驱动代码分析
- Linux netfilter 学习笔记 之九 ip层netfilter的连接跟踪模块代码分析
- 32位汇编语言学习笔记(10)--分析for循环的汇编代码