您的位置:首页 > 其它

使用protobuf和socket实现服务器间消息的传递

2016-10-22 20:30 555 查看
Protobuf产生于Google,是一种序列化和反序列化协议,具有空间开销小、解析速度快、兼容性好等优点,非常适合于对性能要求高的RPC(Remote Procedure Call)调用。网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个Socket。本文旨在实现通过将消息使用protobuf序列化后经过Socket发送到对端,对端使用protobuf反序列化得到原始的信息。这个想法来源于在集群中从节点需要定时的发送心跳信息到主节点,以宣告自己的存在;本文旨在以自己的想法实现这种通信。

继续阅读文章之前,您需要了解protobuf的基本知识,以及unix环境中socket通信的基本知识,可以参考文章最后给出的几篇参考文档。下文代码的功能是client端会每隔5s向server端发送自己的状态信息,信息中包括client端发送此次心跳的时间和client端的地址。server端解析client端的心跳信息后输出该信息。本文的代码调试均在Linux环境下。

protobuf的安装,比较简单,如不确定是否已经安装,可以使用 protoc –version查看版本,如未安装,bash会提示找不到protoc命令.下载安装包(**.tar.gz),解压后,进入代码的根目录,分别执行:

# ./configure

# make

# make install

默认会安装到/usr/local。

首先,来定义protobuf数据结构Test.proto

//Test.proto
package Test.protobuf ;//包名:在生成对应的C++文件时,将被替换为名称空间,在代码中会有体现
option optimize_for = SPEED ;//文件级别的选项,Protobuf优化级别
//心跳信息数据结构
message HeartInfo
{
required int32 curtime = 1;
required string hostip = 2 ;
};


编译Test.proto生产对于的解析代码,并将其打包成和静态库,对应的makefile文件如下:

CC = g++ -g -D__LINUX__ -Wall -fPIC
AR = ar -r

PROJ_LIB = libTest.a
PROJ_OBJ = Test.pb.o

all:$(PROJ_LIB)

prepare:
protoc -I=./  --cpp_out=./  Test.proto

$(PROJ_LIB):$(PROJ_OBJ)
$(AR) $(PROJ_LIB) $(PROJ_OBJ)

.cc.o:
$(CC) -c $< -o $@

clean:
rm -f *.o
rm -f Test.pb.*


执行 make prepare,生成Test.proto对应的Test.pb.h和Test.pb.cc

执行make生成将生成的文件编译后并打包成libTest.a

其次,使用protobuf和socket进行双向通信的代码:

client端代码:

#include <iostream>
#include <string>
#include <ctime>
//for protobuf
#include "Test.pb.h"
//for socket
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;
int main()
{
//建立socket
int sock
ccff
etfd ;
struct sockaddr_in seraddr ;
string hostip = "127.0.0.1";
//链接,尝试3次
for(int i = 0 ; i < 3;++i)
{
if((socketfd = socket(AF_INET,SOCK_STREAM,0)) > 0)
{
cout<<"create socket success..."<<endl;
break;
}
sleep(2);
}
//地址置空
bzero( &seraddr, sizeof(seraddr) );
//
seraddr.sin_family = AF_INET ;
seraddr.sin_port = htons(9999);
seraddr.sin_addr.s_addr = inet_addr(hostip.c_str());
//尝试连接到服务端地址
if(connect(socketfd,(struct sockaddr *)&seraddr, sizeof(seraddr)) < 0)
{
cout<<"connect to server failed ..."<<endl;
close(socketfd);
return -1;
}

HeartInfo myprotobuf;
while(1)
{
int curtime = time(NULL) ;
//以下方法的实现可以Test.pb.h中找到
myprotobuf.set_curtime(curtime);
myprotobuf.set_hostip("127.0.0.1");
//protobuf的序列化方式之一
char buff[BUFFSIZE];
myprotobuf.SerializeToArray(buff,BUFFSIZE);

if(send(socketfd,buff,strlen(buff),0) < 0)
{
cout<<curtime<<": send failed ..."<<endl;
break;
}
cout<<curtime<<": send success ..."<<endl;
sleep(5); //每隔5s发送一次
}
close(socketfd);
return 0;
}


server端代码

#include <iostream>
#include <string>
#include <ctime>
//for protobuf
#include "Test.pb.h"
//for socket
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <fcntl.h>
#include <sys/shm.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;
const int QLEN = 10 ;

int main()
{

int listenfd ;
int connfd ;
struct sockaddr_in seraddr ;
//建立socket
//AF_INET:IPv4因特网域
//SOCK_STREAM:TCP链接
//0:给定前两个参数,选择默认的协议
listenfd = socket(AF_INET,SOCK_STREAM,0);
if(listenfd < 0 )
{
cout<<"socket failed"<<endl;
}
//地址置空
bzero(&seraddr,sizeof(seraddr));
//
seraddr.sin_family = AF_INET ;
seraddr.sin_port = htons(9999);
seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
//关联地址和套接字
if(bind(listenfd,(struct sockaddr *)&seraddr, sizeof(seraddr)) < 0)
{
cout<<"bind address with socket failed..."<<endl;
close(listenfd);
return -1;
}
//调用listen,宣告server愿意接受链接请求
if(listen(listenfd,QLEN) == -1)
{
cout<<"listen on socket failed..."<<endl;
close(listenfd);
return -1;
}
//获得连接请求,并建立连接
if( (connfd = accept(listenfd,(struct sockaddr *)NULL,NULL)) < 0 )
{
cout<<"accept the request failed"<<endl;
close(listenfd);
return -1;
}
HeartInfo myprotobuf;
char buff[BUFFSIZE];
while(1)
{
if(recv(connfd,buff,sizeof(buff),0) < 0)
{
cout<<"recv failed ..."<<endl;
break;
}
//protobuf反序列化
myprotobuf.ParseFromArray(buff,BUFFSIZE);
cout<<"last heart time:"<<myprotobuf.curtime()<<"\t"
<<"host ip:"<<myprotobuf.hostip()<<endl;
}
close(listenfd);
close(connfd);
return 0;
}


分别编译client端和server端代码,得到两个可执行文件,因为本人只有一台机器,代码中的服务端也为本地地址。

编译client端和server端时,请记得链接上静态库libTest.a以及libprotobuf.so。

最后,执行情况如下:

client端:

[root@localhost send]# ./run

connect success…

1477136553: send success …

1477136558: send success …

1477136563: send success …

1477136568: send success …

1477136573: send success …

1477136578: send success …

1477136583: send success …

…….

server端:

[root@localhost recv]# ./run

last heart time:1477136553 host ip:127.0.0.1

last heart time:1477136558 host ip:127.0.0.1

last heart time:1477136563 host ip:127.0.0.1

last heart time:1477136568 host ip:127.0.0.1

last heart time:1477136573 host ip:127.0.0.1

last heart time:1477136578 host ip:127.0.0.1

last heart time:1477136583 host ip:127.0.0.1

写在最后:实际集群中(如何zookeeper)主从节点是如何实现的,还有待进一步的学习,本文client和server之间的通信使用的是TCP协议,实际中是不是仅需UDP协议即可了?本文是闲暇之余的练习,旨在熟悉socket的通信,protobuf的使用。

[1].史蒂文斯(美),芬纳(美)等. 《UNIX网络编程卷1:套接字联网API 》中文第3版,人民邮电出版社,2010年7月.

[2].刘丁.《序列化和反序列化》. http://tech.meituan.com/serialization_vs_deserialization.html?utm_source=tuicool

[3].Stephen Liu.《Protocol Buffer技术详解》.http://www.cnblogs.com/stephen-liu74/archive/2013/01/04/2842533.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: