您的位置:首页 > 其它

基于protobuf service使用rpc入门教程

2017-05-24 21:38 525 查看

protobuf简介

什么是protobuf

Protocol Buffers (ProtocolBuffer/ protobuf )是Google公司开发的一种数据描述语言,类似于XML能够将结构化数据序列化,可用于数据存储、通信协议等方面。现阶段支持C++、JAVA、Python等三种编程语言。在解析速度、数据描述文件大小方面都相比xml有着很大的提高,同时,我们不用再写那些吃力不讨好的协议解析类的接口了,protobuf提供了统一的接口。

protobuf安装方式

由于笔者使用的是ubuntu 14.04系统,所以这里的安装方式是基于以上环境的:

// 源码安装的依赖库
sudo apt-get install autoconf automake libtool curl
//从github上下载
git clone https://github.com/google/protobuf //生成Makefile
cd protobuf
./autogen
./configure
//编译安装
make
make check
sudo make install
//刷新动态链接库缓存
sudo ldconfig


protobuf基本使用

安装完成后,我们开始介绍如何编写编译.proto文件,从而生成.pb文件,以及它的使用方式。这里我们使用protobuf官网的例子进行讲解。

定义一个.proto文件

syntax="proto3";
package tutorial;
message Person
{
string name = 1;
int32 id = 2;
string email = 3;

enum PhoneType
{
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber
{
string number = 1;
PhoneType type = 2;
}

repeated PhoneNumber phone = 4;
}

message AddressBook
{
repeated Person person =1;
}


我们来详细看看上面文件中内容:
syntax
=”proto3”代表版本,目前支持proto2和proto3,不写默认proto2;
package
类似于C++中的namespace概念;
message
是包含了各种类型字段的聚集,相当于struct,并且可以嵌套;proto3版本去掉了
required
optional
类型,保留了
repeated
(数组);
“=1”,“=2”
表示每个元素的标识号,它会用在二进制编码中对域的标识。标识号
1-15
由于使用时会比那些高的标识号少一个字节,从最优化角度考虑,可以将其使用在一些较常用的或repeated元素上,对于
16
以上的则使用在不常用的或optional的元素上。对于repeated的每个元素都需要重复编码该标识号,所以repeated的域进行优化来说是最显示的。

编译.proto文件

进入到源文件目录编译该源文件 ,执行

protoc -I=. --cpp_out=. Person.pro
,分别指定目录路径、输出路径和.proto文件路径。编译后生成Person.proto.pb.h 和Person.pro.pb.cc文件,这两个文件包含了Person以及AddressBook的类,以及对应操作类的方法,在你使用者这些类必须包含头文件。

Protocol Buffer API使用

add_person.cpp 将一个联系人的信息序列化到文件中。

#include "Person.pro.pb.h"
#include <fstream>
#include <iostream>
using namespace std;

void PromptForAddress(tutorial::Person*);

int main(int argc, char* argv[])
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
if(2 != argc)
{
//必须指定电话本名称才执行程序
cerr << "Usage:  " << argv[0] << " ADDRESS_BOOK_FILE" << endl;
return -1;
}

tutorial::AddressBook address_book;

fstream in("ADDRESS_BOOK_FILE", ios::binary | ios::in);
if(!in)
{
cerr << "open file ADDRESS_BOOK_FILE failed!\n";
return -1;
}

if(!address_book.ParseFromIstream(&in))
{
cerr << "Parse File ADDRESS_BOOK_FILE failed!\n";
return -1;
}

in.close();
//增加一个Person
//具有repeated的属性可通过add_fieldname方法增加一个属性
PromptForAddress(address_book.add_person());

fstream out("ADDRESS_BOOK_FILE", ios::binary | ios::out | ios::trunc);
if(!address_book.SerializeToOstream(&out))
{
cerr << "Failed to Write Address Book!\n";
return -1;
}

//可选的,回收所有ProtoBuf分配的对象
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

void PromptForAddress(tutorial::Person* person)
{
cout<<"Enter a Person ID number: ";
int id;
cin >> id;
person->set_id(id);
/*忽略CIN的前256个字符,或者忽略CIN的换行符之前的字符,包括换行符
这样的话不会将换行符之前的其他类型的数据保留在输入缓冲中
*/
cin.ignore(256, '\n');
cout<<"Enter name: ";
getline(cin, *person->mutable_name());

cout<< "Enter email address (blank for none): ";
string email;
getline(cin,email);
if(!email.empty())
person->set_email(email);
while(true)
{
cout<<"Enter a phone number (or leave blank to finish): ";
string number;
getline(cin, number);
if(number.empty())
break;
tutorial::Person::PhoneNumber* phone_number = person->add_phone();
phone_number->set_number(number);

cout<<"Is this a mobile, home, or work phone? ";
string type;
getline(cin, type);
if(type == "mobile")
phone_number->set_type(tutorial::Person::MOBILE);
else if( type == "home")
phone_number->set_type(tutorial::Person::HOME);
else if (type == "work")
phone_number->set_type(tutorial::Person::WORK);
else
{
cout << "Unknown phone type.  Using default." << endl;
phone_number->set_type(tutorial::Person::HOME);
}

}

}


list_person.cpp把文件中的信息反序列化成对象并输出。

#include "Person.pro.pb.h"
#include <iostream>
#include <fstream>

using namespace std;
void ListPeople(const tutorial::AddressBook& address_book);
int main(int argc, char * argv[])
{
if(2!=argc)
{
cerr << "Usage:  " << argv[0] << " ADDRESS_BOOK_FILE" << endl;
return -1;
}

fstream in("ADDRESS_BOOK_FILE", ios::in | ios::binary);
tutorial::AddressBook address_book;
if(!address_book.ParseFromIstream(&in))
{
cerr << "Parse Input File failed!"<< endl;
return -1;
}

ListPeople(address_book);
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

void ListPeople(const tutorial::AddressBook& address_book)
{
//fieldName_size方法返回具有repeated属性的个数
for(int i=0; i< address_book.person_size();i++)
{
const tutorial::Person& person = address_book.person(i);
cout<<"Person ID: "<<person.id();
cout<<"Name: "<<person.name();
cout<<"Email: "<<person.email();

for(int j=0; j< person.phone_size();j++)
{
const tutorial::Person::PhoneNumber& phone_number = person.phone(j);
switch(phone_number.type())
{
case tutorial::Person::MOBILE:
cout << "  Mobile phone #: ";
break;
case tutorial::Person::HOME:
cout << "  Home phone #: ";
break;
case tutorial::Person::WORK:
cout << "  Work phone #: ";
break;
}
cout<<phone_number.number()<<endl;
}

}
}


Makefile

test:add_people list_people

add_people:add_person.cpp protoMid
c++  add_person.cpp Person.pro.pb.cc -o add_people `pkg-config --cflags --libs protobuf`

list_people: list_person.cpp protoMid
c++  list_person.cpp Person.pro.pb.cc -o list_people `pkg-config --cflags --libs protobuf`
protoMid: Person.pro
protoc -I=. --cpp_out=. ./Person.pro

clean:
rm -f add_people list_people protoMid
rm -f Person.pro.pb.cc Person.pro.pb.h


pkg-config 工具通过PKG_CONFIG_PATH环境变量指定的地址去找.pc文件,该文件记录了ProtoBuf安装时头文件和库文件所在的目录。

其实使用protoc编译生成.pb文件后,我们可以使用的API都在.pb.cc文件中,这里我们列出几个常用的API:

1. 简单的message

//name已经被设置返回true,否则false
bool has_name()const;
//执行后has_name()将返回false
void clear_name();
//返回当前值
const::std::string& name()const;
//设置新值,执行后has_name()返回true
void set_name(const::std::string& value);
void set_name(const char* value);
//可以通过返回值直接给name对象赋值。在调用该函数之后has_name将返回true
::std::string* mutable_name();
//释放当前对象对name字段的所有权,同时返回name字段对象指针。调用此函数之后,name字段对象 的所有权将移交给调用者。此后再调用has_name函数时将返回false。
inline ::std::string* release_name();

2. 嵌套的message

int phone_size()const;
void clear_phone();
//获取PhoneNumber字段所表示的容器,该函数返回的容器仅用于遍历并读取,不能直接修改
const::google::protobuf::RepeatedPtrField<::tutorial::Person_PhoneNumber>& phone()const;
//获取PhoneNumber字段所表示的容器指针,该函数返回的容器指针可用于遍历和直接修改
::google::protobuf::RepeatedPtrField<::tutorial::Person_PhoneNumber>* mutable_phone();

const::tutorial::Person_PhoneNumber& phone(int index)const;
// 编译器生成的嵌套类称为Person::PhoneNumber. 实际生成类为Person_PhoneNumber

//没有set_phone(),要使用mutable_phone()返回指针进行设置操作
::tutorial::Person_PhoneNumber* mutable_phone(int index);
::tutorial::Person_PhoneNumber* add_phone();

3. 其他操作

// 返回消息的可读表示,用于调试
string DebugString() const;
//等同于赋值操作符重载
void CopyFrom(const Person& from);
// 清空所有元素为空状态
void Clear();

//序列化消息
bool SerializeToString(string* output) const;
//解析字符串
bool ParseFromString(const string& data):
//写消息给输出流
bool SerializeToOstream(ostream* output) const;
//从输入流中解析消息
bool ParseFromIstream(istream* input);


其中,针对RepeatedPtrField<>遍历的使用有一个简单的示例:

tutorial::Person person = address_book.person(0);
::google::protobuf::RepeatedPtrField< ::tutorial::Person_PhoneNumber >* tempPhone = person.mutable_phone();
::google::protobuf::RepeatedPtrField< ::tutorial::Person_PhoneNumber >::iterator it  = tempPhone->begin();
for (; it != tempPhone->end(); ++it) {
cout << "hahah " <<  it->type() << " " << it->number()
<< endl;
}


通过protobuf service 使用rpc

Protocol Buffer仅仅是提供了一套序列化和反序列化结构数据的机制,本身不具有RPC功能,但是可以基于其实现一套RPC框架。之前总是对使用service进行rpc通信的流程搞不清楚,今天仔细研究了一下,特别深入的东西我还是没搞懂,但是基本的流程是OK的。

首先需要为这个service定义.proto协议文件

option cc_generic_services = true;

message FooRequest {
required string text = 1;
optional int32 times = 2;
}

message FooResponse {
required string text = 1;
optional bool result = 2;
}

service EchoService {
rpc Foo(FooRequest) returns(FooResponse);
}


这个service下只有一个服务:Foo, 它的请求由FooRequest结构体定义,回复由FooResponse定义。

那么该如何使用这个rpc接口?

服务端的流程如下:

class EchoServiceImpl : public EchoService {
public:
EchoServiceImpl() {}
virtual void Foo(::google::protobuf::RpcController* controller,
const ::FooRequest* request,
::FooResponse* response,
::google::protobuf::Closure* done) {
std::string str = request->text();

std::string tmp = str;
for (int i = 1; i < request->times(); i++)
str += (" " + tmp);
response->set_text(str);
response->set_result(true);
if (done)
done->Run();
}
};

int main(int argc, char *argv[]) {
EchoServiceImpl *impl = new EchoServiceImpl();
RpcServer rpc_server;
rpc_server.RegisterService(impl);
rpc_server.Start();
return 0;
}


服务端需要注册服务,也就是创建Service的实现类的对象。然后在一个网络端口上监听连接,解析网络数据包,根据service调用method,这些都是rpc框架内部需要实现的。对于我们使用来说只需要按照这个流程注册启动服务,同时重写定义的Foo()接口,这个函数接口里面用来实现我们的业务处理逻辑。

客户端的流程如下:

RpcClient client ;

RpcChannel channel(&client , "127.0.0.1:18669");

EchoService::Stub echo_clt(& channel);

FooRequest request ;

request.set_text ("test1");

request.set_times (1);

FooResponse response ;

RpcController controller ;

echo_clt.Foo (&controller, & request, &respose,google::protobuf::NewCallback(&FooDone, response2, controller2));


我们定义的service在proto文件编译后生成的.pb文件中,会生成两个类:EchoService和EchoService_Stub。

EchoService_Stub::EchoService_Stub(::google::protobuf::RpcChannel* channel)
: channel_(channel), owns_channel_(false) {}
EchoService_Stub::EchoService_Stub(
::google::protobuf::RpcChannel* channel,
::google::protobuf::Service::ChannelOwnership ownership)
: channel_(channel),
owns_channel_(ownership == ::google::protobuf::Service::STUB_OWNS_CHANNEL) {}
EchoService_Stub::~EchoService_Stub() {
if (owns_channel_) delete channel_;
}

void EchoService_Stub::Foo(::google::protobuf::RpcController* controller,
const ::FooRequest* request,
::FooResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}


客户端通过上面对应service的Stub类来发送请求,也就是EchoService::Stub。其中RpcChannel作为参数传入,是用来和服务端建立网络连接的。RpcController作为辅助类,用来处理记录一些网络错误。(这两个类属于rpc框架内部实现,是需要我们自己实现的)然后我们就可以通过在客户端使用Stub对象直接调用server端的Foo()方法,看起来就像调用本地函数一样,这也就是RPC框架的本质。Foo()中最后一个参数是一个回调函数, 在处理完业务逻辑后调用,也就是done->Run()处。(这里的实现是多种多样的,有的rpc框架会在CallMethod中调用done->Run()并区分同步调用和异步调用)

上面我尽量忽略rpc框架的内部的一些实现,来叙述client和server端通过protobuf来使用rpc的基本流程。现在,稍微看一下内部的实现。上面提到客户端是通过调用Stub对象的Foo方法来发出请求的。我们再把这段代码贴下来:

void EchoService_Stub::Foo(::google::protobuf::RpcController* controller,
const ::FooRequest* request,
::FooResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}


可以看到,通过Stub对象发出请求实际上是调用RpcChannel的CallMethod()方法。所以框架的实现重点就在于RpcChannel的CallMethod方法,发送请求就是在这个函数中完成的,基本实现和一般的网络客户端差不多,下面是给出两个rpc框架中的CallMethod的实现:

void RpcChannel::CallMethod(const MethodDescriptor *method, ::google::protobuf::RpcController *controller, const Message *request, Message *resp
onse, Closure *done) {
if (!_session_id) {
Connect(controller);
if (controller->Failed()) return;
}

const string &service_name = method->service()->name();
unsigned int service_id = SERVICE_NAME2ID::instance()->RpcServiceName2Id(service_name.c_str());
if (service_id == INVALID_SERVICE_ID) {
controller->SetFailed("The Service Not Support!");
return;
}
std::string * content =  new std::string;
request->SerializeToString(content);
_client->CallMsgEnqueue(_session_id, content, service_id, method->index(),
controller, response, done, _write_pipe);
//后台线程异步

if (!done) {
char buf;
read(_read_pipe, &buf, sizeof(buf));//同步处理
}
}


Muduo中的CallMethod的实现

void RpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done)
{
RpcMessage message;
message.set_type(REQUEST);
int64_t id = id_.incrementAndGet();
message.set_id(id);
message.set_service(method->service()->full_name());
message.set_method(method->name());
message.set_request(request->SerializeAsString()); // FIXME: error check

OutstandingCall out = { response, done };
{
MutexLockGuard lock(mutex_);
outstandings_[id] = out;
}
codec_.send(conn_, message);
}


细心的读者也许会发现,CallMethod中除了发送数据之外,参数还有一个MethodDescriptor类,这是用来标识哪一个service的。举个例子,如果定义了多个service,那么每个service的请求包和回复包都是protobuf中的message结构体,在这个例子中是EchoRequest和EchoResponse message。可是,它们仅仅是包体,也就是说,即使你发送了这些消息,在服务器端还需要一个包头来识别到底是哪个请求的包体,然后再进行具体的调用处理,像下面这样:

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), EchoService_descriptor_);
switch(method->index()) {
case 0:
Echo(controller,
::google::protobuf::down_cast<CONST ::echo::EchoRequest*>(request),
::google::protobuf::down_cast< ::echo::EchoResponse*>(response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}


大致先介绍到这里吧,总的来说要实现一个高效稳定的RPC框架还是有难度的,但是通过这里的讲解足够理解使用protobuf进行rpc通信的基本使用流程了。

最后附上两个rpc的实现的项目,第一个链接是网上的小demo,实现的不够好,但是可以更清晰的理解rpc框架。第二个是muduo中的实例,也是比较简单的,供大家参考。

https://github.com/persistentsnail/easy_pb_rpc

https://github.com/chenshuo/muduo/tree/master/examples/protobuf

参考资料

http://www.cnblogs.com/persistentsnail/p/3458342.html

http://blog.csdn.net/wvtear/article/details/50073763

muduo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: