zeromq 发布订阅 无数据丢失性能测试
2013-07-19 02:43
871 查看
sub接收端代码:
#include <zmq.hpp>
#include <assert.h>
#include <iostream>
#include <string>
using namespace std;
#define BAG_LARGE_NUM 14
long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}
int main(int vArgc, char** vArgv)
{
char* ProtocoltConSub = vArgv[1];
char* ProtocoltConReq = vArgv[2];
cout<<"ProtocoltConSub: "<<ProtocoltConSub<<endl;
cout<<"ProtocoltConReq: "<<ProtocoltConReq<<endl;
cout<<"***********************Client Start********************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketSub(tContext, ZMQ_SUB);
zmq::socket_t tSocketReq(tContext, ZMQ_REQ);
tSocketSub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
tSocketSub.connect(ProtocoltConSub);
tSocketReq.connect(ProtocoltConReq);
long long tTimeStart, tTimeEnd;
zmq::message_t tMsgReq(32), tMsgSub(32);
//同步pub sub
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub); //等待pub端所有req
for(int i=0; i<BAG_LARGE_NUM; i++)
{
long long tLoopTimes = atoi((char*)tMsgReq.data());
tTimeStart = gettime();
for (int k=0; tLoopTimes; k++)
{
if(0 == k%500)
{//sub pub同步
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tMsgSub.size()<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tLoopTimes/(tTimeEnd-tTimeStart) <<endl;
//更新tLoopTimes
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
cout<<"finish"<<endl;
return 0;
}./Client tcp://localhost:5555 tcp://localhost:5556
pub发布端代码:
#include <zmq.hpp>
#include <assert.h>
#include <iostream>
using namespace std;
#define BAG_LARGE_NUM 14
long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}
int main(int vArgc, char** vArgv)
{
char* tProtocolPub = vArgv[1];
char* tProtocolRep = vArgv[2];
int tClientNum = atoi(vArgv[3]);
long long tLoopL = atoi(vArgv[4]);
long long tLoopB = atoi(vArgv[5]);
cout<<"tProtocolPub: "<<tProtocolPub<<endl;
cout<<"tProtocolRep: "<<tProtocolRep<<endl;
cout<<"tLoopL: "<<tLoopL<<endl;
cout<<"tLoopB: "<<tLoopB<<endl;
cout<<"ClientNum: "<<tClientNum<<endl;
cout<<"**********************Server Start*******************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketPub(tContext, ZMQ_PUB);
zmq::socket_t tSocketRep(tContext, ZMQ_REP);
tSocketPub.bind(tProtocolPub);
tSocketRep.bind(tProtocolRep);
std::string tData;
zmq::message_t tMsgRecv(32), tMsgSend(32), tMsg(32);
long long tBagLarge[BAG_LARGE_NUM] = {1, 4, 8, 32, 64, 128, 256, 512,1024,1024*8, 1024*64, 1024*128, 1024*512, 1024*1024};
long long tChansportLoop = tLoopL;
long long tTimeStart, tTimeEnd;
for (int i=0; i<tClientNum; i++)
{//pub sub同步
tMsgRecv.rebuild();
tSocketRep.recv(&tMsgRecv, 0);
tMsgSend.rebuild(sizeof(tBagLarge[0]));
tSocketRep.send(tMsgSend);
}
for (int i = 0; i<BAG_LARGE_NUM; i++)
{
tTimeStart = gettime();
for (long long t=0; t<tChansportLoop; t++)
{
if(0 == t%500)
{
for (int j=0; j<tClientNum; j++)
{//pub sub同步
tMsg.rebuild();
tSocketRep.recv(&tMsg, 0);
tMsg.rebuild(32);
tSocketRep.send(tMsg, 0);
}
}
tMsgSend.rebuild(tBagLarge[i]);
tSocketPub.send(tMsgSend, 0);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tBagLarge[i]<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tChansportLoop/(tTimeEnd-tTimeStart) <<endl;
//next chansport loop
tMsg.rebuild();
tSocketRep.recv(&tMsg);
tMsg.rebuild(tBagLarge[i+1]);
tSocketRep.send(tMsg);
}
cout<<"finish"<<endl;
return 0;
}
脚本命令:./Server tcp://*:5555 tcp://*:5556 3 10000 100000
#include <zmq.hpp>
#include <assert.h>
#include <iostream>
#include <string>
using namespace std;
#define BAG_LARGE_NUM 14
long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}
int main(int vArgc, char** vArgv)
{
char* ProtocoltConSub = vArgv[1];
char* ProtocoltConReq = vArgv[2];
cout<<"ProtocoltConSub: "<<ProtocoltConSub<<endl;
cout<<"ProtocoltConReq: "<<ProtocoltConReq<<endl;
cout<<"***********************Client Start********************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketSub(tContext, ZMQ_SUB);
zmq::socket_t tSocketReq(tContext, ZMQ_REQ);
tSocketSub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
tSocketSub.connect(ProtocoltConSub);
tSocketReq.connect(ProtocoltConReq);
long long tTimeStart, tTimeEnd;
zmq::message_t tMsgReq(32), tMsgSub(32);
//同步pub sub
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub); //等待pub端所有req
for(int i=0; i<BAG_LARGE_NUM; i++)
{
long long tLoopTimes = atoi((char*)tMsgReq.data());
tTimeStart = gettime();
for (int k=0; tLoopTimes; k++)
{
if(0 == k%500)
{//sub pub同步
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tMsgSub.size()<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tLoopTimes/(tTimeEnd-tTimeStart) <<endl;
//更新tLoopTimes
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
cout<<"finish"<<endl;
return 0;
}./Client tcp://localhost:5555 tcp://localhost:5556
pub发布端代码:
#include <zmq.hpp>
#include <assert.h>
#include <iostream>
using namespace std;
#define BAG_LARGE_NUM 14
long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}
int main(int vArgc, char** vArgv)
{
char* tProtocolPub = vArgv[1];
char* tProtocolRep = vArgv[2];
int tClientNum = atoi(vArgv[3]);
long long tLoopL = atoi(vArgv[4]);
long long tLoopB = atoi(vArgv[5]);
cout<<"tProtocolPub: "<<tProtocolPub<<endl;
cout<<"tProtocolRep: "<<tProtocolRep<<endl;
cout<<"tLoopL: "<<tLoopL<<endl;
cout<<"tLoopB: "<<tLoopB<<endl;
cout<<"ClientNum: "<<tClientNum<<endl;
cout<<"**********************Server Start*******************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketPub(tContext, ZMQ_PUB);
zmq::socket_t tSocketRep(tContext, ZMQ_REP);
tSocketPub.bind(tProtocolPub);
tSocketRep.bind(tProtocolRep);
std::string tData;
zmq::message_t tMsgRecv(32), tMsgSend(32), tMsg(32);
long long tBagLarge[BAG_LARGE_NUM] = {1, 4, 8, 32, 64, 128, 256, 512,1024,1024*8, 1024*64, 1024*128, 1024*512, 1024*1024};
long long tChansportLoop = tLoopL;
long long tTimeStart, tTimeEnd;
for (int i=0; i<tClientNum; i++)
{//pub sub同步
tMsgRecv.rebuild();
tSocketRep.recv(&tMsgRecv, 0);
tMsgSend.rebuild(sizeof(tBagLarge[0]));
tSocketRep.send(tMsgSend);
}
for (int i = 0; i<BAG_LARGE_NUM; i++)
{
tTimeStart = gettime();
for (long long t=0; t<tChansportLoop; t++)
{
if(0 == t%500)
{
for (int j=0; j<tClientNum; j++)
{//pub sub同步
tMsg.rebuild();
tSocketRep.recv(&tMsg, 0);
tMsg.rebuild(32);
tSocketRep.send(tMsg, 0);
}
}
tMsgSend.rebuild(tBagLarge[i]);
tSocketPub.send(tMsgSend, 0);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tBagLarge[i]<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tChansportLoop/(tTimeEnd-tTimeStart) <<endl;
//next chansport loop
tMsg.rebuild();
tSocketRep.recv(&tMsg);
tMsg.rebuild(tBagLarge[i+1]);
tSocketRep.send(tMsg);
}
cout<<"finish"<<endl;
return 0;
}
脚本命令:./Server tcp://*:5555 tcp://*:5556 3 10000 100000
相关文章推荐
- 性能测试 PTS 铂金版来袭!阿里云发布T级数据压测的终极秘笈
- 性能测试 PTS 铂金版来袭!阿里云发布T级数据压测的终极秘笈
- 性能测试 PTS 铂金版来袭!阿里云发布T级数据压测的终极秘笈
- 使用SQLServer同义词和SQL邮件,解决发布订阅中订阅库丢失数据的问题
- 性能测试 PTS 铂金版来袭!阿里云发布T级数据压测的终极秘笈
- R显卡VR性能如何?AMD发布 VRMark Cyan Room 测试结果
- Emmagee 1.3.0 发布,Android 性能测试工具
- YDB与spark SQL在百亿级数据上的性能对比测试
- Unity 5.4 公开测试版发布:增强的视觉效果,更佳的性能表现
- SQL 2012 发布与订阅实现数据同步 图解(解决 错误22022)
- 高吞吐量的分布式发布订阅消息系统Kafka--安装及测试
- 新浪云计算数据缓存性能测试(文件、KVDB、Memcache)
- 如何获取手机性能测试数据FPS
- 最全的软件测试工具LR中性能数据翻译(一)
- HBase 高性能获取数据(多线程批量式解决办法) + MySQL和HBase性能测试比较
- 性能测试中批量数据制作实例的多种方法讨论
- 软件性能测试实践 PPT 发布——第九次广州软件测试交流会版本
- 消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试
- Mongodb亿级数据量的性能测试
- 百万数据下几种SQL性能测试