您的位置:首页 > 其它

zeromq 发布订阅 无数据丢失性能测试

2013-07-19 02:43 871 查看
sub接收端代码:

#include <zmq.hpp>
#include <assert.h>
#include <iostream>
#include <string>

using namespace std;
#define BAG_LARGE_NUM 14

long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}

int main(int vArgc, char** vArgv)
{
char* ProtocoltConSub = vArgv[1];
char* ProtocoltConReq = vArgv[2];
cout<<"ProtocoltConSub: "<<ProtocoltConSub<<endl;
cout<<"ProtocoltConReq: "<<ProtocoltConReq<<endl;
cout<<"***********************Client Start********************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketSub(tContext, ZMQ_SUB);
zmq::socket_t tSocketReq(tContext, ZMQ_REQ);
tSocketSub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
tSocketSub.connect(ProtocoltConSub);
tSocketReq.connect(ProtocoltConReq);

long long tTimeStart, tTimeEnd;
zmq::message_t tMsgReq(32), tMsgSub(32);
//同步pub sub
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub); //等待pub端所有req
for(int i=0; i<BAG_LARGE_NUM; i++)
{
long long tLoopTimes = atoi((char*)tMsgReq.data());
tTimeStart = gettime();
for (int k=0; tLoopTimes; k++)
{
if(0 == k%500)
{//sub pub同步
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
tMsgSub.rebuild();
tSocketSub.recv(&tMsgSub);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tMsgSub.size()<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tLoopTimes/(tTimeEnd-tTimeStart) <<endl;
//更新tLoopTimes
tMsgReq.rebuild(32);
tSocketReq.send(tMsgReq);
tMsgReq.rebuild();
tSocketReq.recv(&tMsgReq);
}
cout<<"finish"<<endl;
return 0;
}./Client tcp://localhost:5555 tcp://localhost:5556

pub发布端代码:
#include <zmq.hpp>
#include <assert.h>
#include <iostream>

using namespace std;
#define BAG_LARGE_NUM 14

long long int gettime(){
struct timeval tv = {0, 0};
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000 + tv.tv_usec;
}

int main(int vArgc, char** vArgv)
{
char* tProtocolPub = vArgv[1];
char* tProtocolRep = vArgv[2];
int tClientNum = atoi(vArgv[3]);
long long tLoopL = atoi(vArgv[4]);
long long tLoopB = atoi(vArgv[5]);

cout<<"tProtocolPub: "<<tProtocolPub<<endl;
cout<<"tProtocolRep: "<<tProtocolRep<<endl;
cout<<"tLoopL: "<<tLoopL<<endl;
cout<<"tLoopB: "<<tLoopB<<endl;
cout<<"ClientNum: "<<tClientNum<<endl;
cout<<"**********************Server Start*******************"<<endl;
zmq::context_t tContext(1);
zmq::socket_t tSocketPub(tContext, ZMQ_PUB);
zmq::socket_t tSocketRep(tContext, ZMQ_REP);
tSocketPub.bind(tProtocolPub);
tSocketRep.bind(tProtocolRep);

std::string tData;
zmq::message_t tMsgRecv(32), tMsgSend(32), tMsg(32);
long long tBagLarge[BAG_LARGE_NUM] = {1, 4, 8, 32, 64, 128, 256, 512,1024,1024*8, 1024*64, 1024*128, 1024*512, 1024*1024};
long long tChansportLoop = tLoopL;
long long tTimeStart, tTimeEnd;
for (int i=0; i<tClientNum; i++)
{//pub sub同步
tMsgRecv.rebuild();
tSocketRep.recv(&tMsgRecv, 0);
tMsgSend.rebuild(sizeof(tBagLarge[0]));
tSocketRep.send(tMsgSend);
}

for (int i = 0; i<BAG_LARGE_NUM; i++)
{
tTimeStart = gettime();
for (long long t=0; t<tChansportLoop; t++)
{
if(0 == t%500)
{
for (int j=0; j<tClientNum; j++)
{//pub sub同步
tMsg.rebuild();
tSocketRep.recv(&tMsg, 0);
tMsg.rebuild(32);
tSocketRep.send(tMsg, 0);
}
}
tMsgSend.rebuild(tBagLarge[i]);
tSocketPub.send(tMsgSend, 0);
}
tTimeEnd = gettime();
cout<<"bag large: "<<tBagLarge[i]<<" take time: "<<tTimeEnd-tTimeStart<<" tps: "<<tChansportLoop/(tTimeEnd-tTimeStart) <<endl;
//next chansport loop
tMsg.rebuild();
tSocketRep.recv(&tMsg);
tMsg.rebuild(tBagLarge[i+1]);
tSocketRep.send(tMsg);
}

cout<<"finish"<<endl;
return 0;
}

脚本命令:./Server tcp://*:5555 tcp://*:5556 3 10000 100000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息