您的位置:首页 > 理论基础 > 计算机网络

【分析总结框架记录】基于ZMQ的游戏网络层基础架构

2015-06-26 11:28 711 查看
对于内网服务器的通信采用zmq来进行,对于和客户端的通信采用boost的asio来。这里先来搭建zmq的基础结构。

zmq相关的知识可以去zmq官方网站查询。

这里使用zmq的push 和pull来进行通信。

先放一张结构图:



其中PushZmq是推管道, PullZmq是拉管道:

对于Push的流程是:

zmq_init()----> zmq_socket()---->zmq_connect()---->zmq_init_size()----->zmq_init_data--->zmq_send()--->zmq_msg_close()--->zmq_close()---->zma_term()

具体见代码:

PushZmq.h

[cpp] view
plaincopyprint?

#ifndef __PUSH_ZMQ_H__

#define __PUSH_ZMQ_H__

#include <zmq.h>

#include <string.h>

#include <iostream>

#include <glog/logging.h>

using namespace std;

class PushZmq

{

public:

PushZmq(const char* url, void* zmqContext = NULL);

~PushZmq();

size_t Send(const char* buffer, size_t length);

private:

string _strUrl;

void* _ctx;

void* _socket;

};

#endif

PushZmq.cpp

[cpp] view
plaincopyprint?

#include "pushZmq.h"

#include "proto/hello.pb.h"

using namespace hello;

int main()

{

FLAGS_minloglevel = google::INFO;

google::InitGoogleLogging("");

google::SetLogDestination(google::INFO, "../");

google::SetLogFilenameExtension("log_");

google::LogToStderr();

string url = "tcp://127.0.0.1:5555";

PushZmq* push = new PushZmq(url.c_str());

//string sendContent = "Hello Pull.I am From Push!";

PbMsgHello helloMsg;

helloMsg.set_helloint(123456);

helloMsg.set_hellostring("ni hao wang peng ");

int length = helloMsg.ByteSize();

char* buffer = (char*)malloc(length);

helloMsg.SerializeToArray(buffer, length);

push->Send(buffer, length);

free(buffer);

return 0;

}

PushZmq::PushZmq( const char* url, void* zmqContext /*= NULL*/ )

:_strUrl(url)

,_ctx(zmqContext)

{

if(!_ctx)

{

_ctx = zmq_init(1);

}

_socket = zmq_socket(_ctx, ZMQ_PUSH);

if(!_socket)

{

cout << "Error int zmq_socket:" << zmq_strerror(errno) << endl;

return;

}

int rc = zmq_connect(_socket, _strUrl.c_str());

if(rc != 0 )

{

cout << "error in zmq_connect:" << zmq_strerror(errno) << endl;

return;

}

}

PushZmq::~PushZmq()

{

zmq_close(_socket);

zmq_term(_ctx);

}

size_t PushZmq::Send( const char* buffer, size_t length )

{

zmq_msg_t msg;

int rc = zmq_msg_init_size(&msg, length);

memcpy((char*)zmq_msg_data(&msg), buffer, length);

rc = zmq_send(_socket, &msg, ZMQ_NOBLOCK);

if(rc < 0)

{

cout << "error in zmq_send:" << zmq_strerror(errno) << endl;

zmq_msg_close(&msg);

return -1;

}

zmq_msg_close(&msg);

LOG(INFO) << "Send Hello success: rc=" << rc;

return rc;

}

对于Pull的流程是:

zmq_init()--->zmq_socket()--->zmq_bind()--->zmq_poll--->zmq_msg_init()---->zmq_recv()--->zmq_msg_data()--->zmq_msg_size()-------调用具体处理函数--->zmq_close-->zmq_msg_close--->zmq_close()--->zmq_term

PullZmq.h

[cpp] view
plaincopyprint?

#ifndef __PULL_ZMQ_H__

#define __PULL_ZMQ_H__

#include <zmq.h>

#include <iostream>

#include <string.h>

#include <glog/logging.h>

#include <boost/bind.hpp>

#include <boost/function.hpp>

using namespace std;

class PullZmq

{

public:

typedef boost::function<bool(const char*, size_t)> TypeOnMessage;

PullZmq(const char* url, TypeOnMessage onPipeMessage, void* zmqContext=NULL);

~PullZmq();

void Run();

private:

void* _ctx;

string _strUrl;

void* _socket;

TypeOnMessage _onMessage;

};

#endif

PullZmq.Cpp:

[cpp] view
plaincopyprint?

#include "PullZmq.h"

#include "proto/hello.pb.h"

using namespace hello;

bool TestOnMessage( const char* buffer, size_t length );

int main()

{

FLAGS_minloglevel = google::INFO;

google::InitGoogleLogging("");

google::SetLogDestination(google::INFO, "../");

google::SetLogFilenameExtension("log_");

google::LogToStderr();

string url = "tcp://*:5555";

PullZmq* pull = new PullZmq(url.c_str(),

boost::bind(TestOnMessage, _1, _2));

pull->Run();

return 0;

}

PullZmq::PullZmq( const char* url, TypeOnMessage onPipeMessage, void* zmqContext )

:_strUrl(url)

,_onMessage(onPipeMessage)

, _ctx(zmqContext)

{

if(!_ctx)

{

_ctx = zmq_init(1);

if(!_ctx)

{

cout << "error in zmq_init:" << zmq_strerror(errno) << endl;

return;

}

}

_socket = zmq_socket(_ctx, ZMQ_PULL);

if (!_socket)

{

LOG(ERROR) << "Error in zmq_socket:" << zmq_strerror(errno);

return;

}

int rc = zmq_bind(_socket, url);

if(rc != 0)

{

LOG(ERROR) << "error in zmq_bind:" << zmq_strerror(errno);

return;

}

}

PullZmq::~PullZmq()

{

int rc = zmq_close(_socket);

if(rc != 0)

{

LOG(ERROR) << "error in zmq_close:" << zmq_strerror(errno);

}

rc = zmq_term(_ctx);

if(rc !=0 )

{

LOG(ERROR) << "error in zmq_term:" << zmq_strerror(errno);

}

}

void PullZmq::Run()

{

zmq_pollitem_t item;

item.socket = _socket;

item.events = ZMQ_POLLIN;

long pollWaitTime = 1000;

bool bLoop = true;

while(bLoop)

{

int rc = zmq_poll(&item, 1, -1);

if(rc < 0)

{

LOG(ERROR) << "error in zmq_poll:" << zmq_strerror(errno);

}else if(rc ==0)

{

//LOG(ERROR) << "On Idle!";

}else

{

int msgCount = rc;

while(msgCount--)

{

zmq_msg_t msg;

rc = zmq_msg_init(&msg);

if (rc !=0 )

{

LOG(ERROR) << "error in zmq_msg_init:" << zmq_strerror(errno);

return;

}

rc = zmq_recv(_socket, &msg, 0);

if(rc != 0)

{

LOG(ERROR) << "error in zmq_recv:" << zmq_strerror(errno);

zmq_msg_close(&msg);

continue;

}

void* buffer = zmq_msg_data(&msg);

size_t len = zmq_msg_size(&msg);

bLoop = _onMessage((const char*)buffer, len);

zmq_msg_close(&msg);

}

}

}

}

bool TestOnMessage( const char* buffer, size_t length )

{

LOG(INFO) << "TestOnMessage:";

PbMsgHello helloMsg;

helloMsg.ParseFromArray(buffer, length);

LOG(INFO) << " helloInt = " << helloMsg.helloint()

<< " helloString = " << helloMsg.hellostring();

//string content;

//content.append(buffer);

//LOG(INFO) << "buffer = " << content << " length = " << length;

return true;

}

对应Makefile为:

[cpp] view
plaincopyprint?

all: pull push

hello.o:

g++ -c -o hello.o proto/hello.pb.cc

pull: hello.o

g++ -o pullZmq hello.o PullZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf

push: hello.o

g++ -o pushZmq hello.o PushZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf

clean:

rm -rf *.o

rm -rf pullZmq

rm -rf pushZmq

对于上文的cpp中,开启了Protobuffer的 因此需要导入protobuffer的支持,对应proto文件

hello.proto为:

[cpp] view
plaincopyprint?

package hello;

message PbMsgHello

{

required string helloString = 1;

required int32 helloInt =2;

}

运行以上cpp 可以实现 在push端包装一个Protobuffer的Message 在序列化之后Push到Pull端, Pull端接受到消息后进行解析 并读Message中的内容。

结果如下:

pull端:



Push端:



可见在Push端组装的 int 和string 在pull端成功解析。

下一步应该进行Message的包装,以及ProtoBuffer的反射解析。即根据类型来自动生成解析所需的Message类型。

1-6章节对于源码下载:http://download.csdn.net/detail/jcracker/6267125
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: