您的位置:首页 > 编程语言 > Python开发

python实现: protobuf解释器

2015-09-18 14:59 579 查看
之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。

当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。

https://developers.google.com/protocol-buffers/docs/reference/cpp-generated

这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。

使用python 2.7脚本进行文本的处理。

程序分成4个模块:


expression: 格式的解析



symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。



typecollection:基础类型定义和收集message等对象。



builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。这次就不演示了。



1 测试用protobuf文件。(来源于google示例)

package tutorial;

message Person {
required string name = 1;
required int32 id = 2 ;
optional string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}

repeated PhoneNumber phone = 4;
}

message AddressBook {
repeated Person person = 1;
}


2 expression实现---最简单的扫描方法,分析每一个word。

# -*- coding: UTF-8 -*-
# pb_expression.py
import sys
import os
import string
import shutil
import io
import pb_symbol

class StringBuffer(object):
def __init__(self,src):
self.src  = src;
pass;

def __del__(self):
self.buf = None;
pass;

def OpenFile(self):
self.Data = open(self.src).read()
pass;

class Expression(object):

desc_set = set(['required','optional','repeated'])

b_char_set = set(['A','B','C','D','E'
,'F','G','H','I','J'
,'K','L','M','N','O'
,'P','Q','R','S','T'
,'U','V','W','X','Y','Z'])

l_char_set = set (['a','b','c','d','e'
,'f','g','h','i','j'
,'k','l','m','n','o'
,'p','q','r','s','t'
,'u','v','w','x','y','z'])

digit_set = set([0,1,2,3,4,5,6,7,8,9])

equals_char = '='
space_char = ' '
openbrace_char = '{'
closebrace_char = '}'
semicolon_char = ';'
tab_char = chr(9)
newline_char = chr(10)
return_char = chr(13)
slash_char = chr(47)
ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,'\n','\r','\t','=',';',space_char])

empty_char_set = set ([space_char,tab_char,newline_char,return_char])

symbol_char_set = b_char_set | l_char_set | digit_set
all_char_set = symbol_char_set | ctl_char_set

def backup(self):
return self.index;

def restore(self,prevIndex):
self.index = prevIndex;

def forwardChar(self):
if(self.index < self.count):
self.index = self.index +1

def backChar(self):
if(self.index > 0):
self.index = self.index -1

def getchar(self):
if( self.index < self.count):
char = self.Buf.Data[self.index]
self.forwardChar()
return char
return None;

def skipComment(self):
bkIndex = self.backup();
while 1:
char = self.getchar()
next_char = self.getchar()
if(char != self.slash_char or next_char != self.slash_char):
self.restore(bkIndex)
return;
while 1:
char = self.getchar()
if(char == None):
self.restore(bkIndex)
return;
if(char == self.newline_char):
return;

def getSpecialChar(self,currentchar):
while 1:
self.skipComment()
char = self.getchar();
if(char == None):
break;
else:
if(char == currentchar):
break;
return char;

def getVisibleChar(self):
while 1:
self.skipComment()
char = self.getchar();
if(char is None):
break;
else:
if(char not in self.empty_char_set):
break;
return char;

def getNextword(self):
word = None
got1st = 0
while 1:
self.skipComment()
char = self.getchar()
if(char == None):
break;
if(got1st == 0):
if(char not in self.ctl_char_set):
word = char
got1st = 1
else:
if(char in self.ctl_char_set):
self.backChar()
break;
else:
word = word + char
return word;

def do_enum_item(self,pbEnum):
memText = self.getNextword();
self.getSpecialChar(self.equals_char);
memValue = self.getNextword();
self.getSpecialChar(self.semicolon_char);
pbEnum.append_Member(memText,memValue)

def do_enum_proc(self):
symbol = self.getNextword();
pbEnum = pb_symbol.PBEnum(symbol)
while 1:
currentIndex = self.backup()
word = self.getNextword();
if(word == None):
break;
self.restore(currentIndex)
self.do_enum_item(pbEnum)
end_char_Index = self.backup();
char = self.getVisibleChar();
if(char == self.closebrace_char):
break;
else:
self.restore(end_char_Index);
self.symbol.append_enum(pbEnum)

def do_message_proc(self):
symbol = self.getNextword();
pbMsg = pb_symbol.PBMessage(symbol)
while 1:
currentIndex = self.backup()
word = self.getNextword();
if(word == None):
break;
if(word in self.token_set):
subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False);
subSymbol.update_namespace(symbol);
self.restore(currentIndex);
subExp = Expression(self.Buf,subSymbol);
subExp.index = self.index;
subExp.do_expression();
self.index = subExp.index
self.symbol.append_symbol(subSymbol)
pbMsg.enableSymbol = 1
else:
if(word in self.desc_set):
memType = self.getNextword();
memText = self.getNextword();
pbMsg.append_Member(word,memType,memText)
self.getSpecialChar(self.semicolon_char);

end_char_Index = self.backup();
char = self.getVisibleChar();
if(char == self.closebrace_char):
break;
else:
self.restore(end_char_Index);
self.symbol.append_message(pbMsg)

def do_import_proc(self):
self.getSpecialChar(self.semicolon_char);

def do_package_proc(self):
word = self.getNextword();
self.symbol.update_namespace(word)
self.getSpecialChar(self.semicolon_char);

token_set = { 'message':do_message_proc
,'enum':do_enum_proc
,'import':do_import_proc
,'package':do_package_proc
}

def do_expression(self):
while 1:
current_index = self.backup();
token = self.getNextword();
if(token == None):
break;
if(token in self.token_set):
proc = self.token_set[token];
proc(self);
else:
self.restore(current_index)
break;

def __init__(self,sBuf,symbol):
self.Buf = sBuf;
self.index = 0;
self.count = len(self.Buf.Data)
self.symbol = symbol;


3 symbol--定义对象类型以及层次

# -*- coding: UTF-8 -*-
# pb_symbol.py

import os
import string
import pb_typecollection

class PBEntity(object):
def __init__(self,entName,rtname):
self.entName = entName;
self.orgName = entName
self.rtname = rtname

def outputDebug(self):
pass;

def create_impl(self,entity_indent,top_ns):
batch_list = list();
return batch_list;

def mem_include(self,entName):
return False;

class PBMessageMember(object):
def __init__(self,option,memType,memText):
self.option = option;
self.memType = memType;
self.memText = memText;

def outputDebug(self):
print(self.option,self.memType,self.memText)

@property
def mem_option(self):
return self.option

@property
def mem_type(self):
return self.memType;

@property
def mem_text(self):
return self.memText

class PBMessage(PBEntity):

def __init__(self,entName):
PBEntity.__init__(self,entName, entName );
self.members  = []
self.enableSymbol = 0;
self.rt_ns = '';
self.tpDict = None

@property
def Members(self):
return self.members

def attach_tp_dict(self,tpDict):
self.tpDict = tpDict;

def append_Member(self,option,memType,memText):
msgMem = PBMessageMember(option,memType,memText)
self.members.append(msgMem)

def enable_Symbol(self,enable):
self.enableSymbol = enable;

def outputDebug(self,ns):
print(ns,'message',self.entName);
for entMsg in self.members:
entMsg.outputDebug();
print('');

def attach_tp_dict(self,tpDict):
self.tpDict = tpDict;

def set_rt_ns(self,rt_entity_full_path):
self.rt_ns = rt_entity_full_path

def mem_include(self,entName):
for entMsg in self.members:
if(entName == entMsg.memType):
return True;
return False;

def detect_request(self):
if(self.members.count > 0 ):
return True;
return False;

class PBEnumMember(object):
def __init__(self,memText,memValue):
self.memText = memText;
self.memValue = memValue;

def outputDebug(self):
print(self.memText,self.memValue)

class PBEnum( PBEntity):
def __init__(self,entName):
PBEntity.__init__(self,entName,entName);
self.members  = []

def append_Member(self,memText,memValue):
msgMem = PBEnumMember(memText,memValue)
self.members.append(msgMem)

def outputDebug(self,ns):
print(ns,'enum',self.entName);
for entEnum in self.members:
entEnum.outputDebug();
print('');

class Symbol(object):
def __init__(self,tpDict,fullpath,rooted):
self.namespace = ''
self.tpDict = tpDict
self.rooted = rooted
self.entity_full_path = fullpath
self.rt_entity_full_path = fullpath
self.entitylist = []
self.containerlist = []

def __del__(self):
pass;

def update_namespace(self,namespace):
self.namespace = namespace;
if(self.rooted == False):
if(self.entity_full_path == ''):
self.entity_full_path = namespace
self.rt_entity_full_path = namespace
else:
self.entity_full_path = '%s_%s' %(self.entity_full_path,namespace)
self.rt_entity_full_path = '%s_%s' %(self.entity_full_path,namespace)

def append_type_dict(self,entity,isMsg):
if(isMsg == True):
if(self.entity_full_path == ''):
self.tpDict.insert_type(entity.entName
,entity.rtname
,entity
,'')
else:
self.tpDict.insert_type(entity.entName
,'%s::%s' % (self.rt_entity_full_path, entity.rtname)
,entity
,'')
else:
if(self.entity_full_path == ''):
self.tpDict.insert_type(entity.entName
,entity.rtname
,entity
,entity.rtname)
else:
self.tpDict.insert_type(entity.entName
,'%s::%s' % (self.rt_entity_full_path, entity.rtname)
,entity
,'%s::%s' % (self.entity_full_path, entity.rtname))

def append_message(self,msg):
self.entitylist.append(msg)
self.containerlist.append(msg)
msg.attach_tp_dict(self.tpDict);
if(self.rt_entity_full_path == ''):
msg.set_rt_ns(self.rt_entity_full_path)
else:
msg.set_rt_ns(self.rt_entity_full_path + '_')
self.append_type_dict(msg,True)

def append_enum(self,enum):
self.entitylist.append(enum)
self.append_type_dict(enum,False)

def append_symbol(self,symbol):
self.entitylist.append(symbol)
self.containerlist.append(symbol)

def outputDebug(self,ns):
for entity in self.entitylist:
entity.outputDebug(ns +'::'+self.namespace);

def query_entitylist(self):
return self.entitylist;

def query_containerlist(self):
return self.containerlist;

def query_pb_ns(self):
return self.namespace;

def mem_include(self,entName):
for entity in self.entitylist:
if(entity.mem_include(entName) == True):
return True;
return False;

class PBProxy(object):
def __init__(self,entity):
self.entity = entity

@property
def enableSymbol(self):
return self.entity.enableSymbol

def mem_include(self,entName):
return self.entity.mem_include(entName)

def create_impl(self,entity_indent,top_ns):
return self.entity.create_impl(entity_indent,top_ns)

@property
def entName(self):
return self.entity.entName;

@property
def rtname(self):
return self.entity.rtname;

@property
def orgName(self):
return self.entity.orgName;

@property
def members(self):
return self.entity.members;

@property
def rt_ns(self):
return self.entity.rt_ns;

@property
def namespace(self):
return self.entity.namespace;

@property
def rooted(self):
return self.entity.rooted;

@property
def entity_full_path(self):
return self.entity.entity_full_path;

@property
def rt_entity_full_path(self):
return self.entity.rt_entity_full_path;

@property
def entitylist(self):
return self.entity.entitylist

@property
def containerlist(self):
return self.entity.containerlist

@property
def tpDict(self):
return self.entity.tpDict;

def detect_request(self):
return self.entity.detect_request()

@property
def Members(self):
return self.entity.members

@property
def mem_option(self):
return self.entity.mem_option

@property
def mem_type(self):
return self.entity.mem_type;

@property
def mem_text(self):
return self.entity.mem_text


4 typecollection

# -*- coding: UTF-8 -*-
# pb_typecollection.py

import os
import pb_symbol

class typeDict(object):
op_req_desc = 'required'
op_opt_desc = 'optional'
op_rep_desc = 'repeated'
def __init__(self):
self.collection  = dict()
self.insert_type('int32','__int32',pb_symbol.PBEntity('int32','int32'),'')
self.insert_type('int64','__int64',pb_symbol.PBEntity('int64','int64'),'')
self.insert_type('uint32','unsigned int',pb_symbol.PBEntity('uint32','uint32'),'')
self.insert_type('bool','bool',pb_symbol.PBEntity('bool','bool'),'')
self.insert_type('float','float',pb_symbol.PBEntity('float','float'),'')
self.insert_type('double','double',pb_symbol.PBEntity('double','double'),'')
self.insert_type('string','const char*',pb_symbol.PBEntity('string','string'),'')
self.insert_type('bytes','const char*',pb_symbol.PBEntity('bytes','bytes'),'')

def insert_type(self, entName, rtType,entity,orgType):
self.collection[entName] = (rtType,entity,orgType);

def output_debug(self):
print('type collection')
for item in self.collection.items():
print(item);


5 测试脚本

# -*- coding: UTF-8 -*-

import pb_symbol
import pb_expression
import pb_typecollection

if __name__ == '__main__':

pb_file = 'google_tutorial.proto'
sBuf = pb_expression.StringBuffer(pb_file);
tpDict = pb_typecollection.typeDict()
symbol = pb_symbol.Symbol(tpDict,'',True);
try:
sBuf.OpenFile();
exp = pb_expression.Expression(sBuf,symbol);
exp.do_expression();
symbol.outputDebug('');
tpDict.output_debug();
except Exception as exc:
print("%s",exc);
print("done");


6 输出

命名空间:::tutorial::Person

类型名称:PhoneType

('::tutorial::Person', 'enum', 'PhoneType')   
('MOBILE', '0')

('HOME', '1')

('WORK', '2')

('::tutorial::Person', 'message', 'PhoneNumber')

('required', 'string', 'number')

('optional', 'PhoneType', 'type')

('::tutorial', 'message', 'Person')

('required', 'string', 'name')

('required', 'int32', 'id')

('optional', 'string', 'email')

('repeated', 'PhoneNumber', 'phone')

('::tutorial', 'message', 'AddressBook')

('repeated', 'Person', 'person')

type collection

('PhoneNumber', ('Person::PhoneNumber', <pb_symbol.PBMessage object at 0x02B9DED0>, ''))

('int32', ('__int32', <pb_symbol.PBEntity object at 0x02BE3F70>, ''))

('string', ('const char*', <pb_symbol.PBEntity object at 0x02BEE0F0>, ''))

('double', ('double', <pb_symbol.PBEntity object at 0x02BEE0B0>, ''))

('float', ('float', <pb_symbol.PBEntity object at 0x02BEE070>, ''))

('bytes', ('const char*', <pb_symbol.PBEntity object at 0x02BEE130>, ''))

('Person', ('Person', <pb_symbol.PBMessage object at 0x02BEE210>, ''))

('bool', ('bool', <pb_symbol.PBEntity object at 0x02BEE050>, ''))

('PhoneType', ('Person::PhoneType', <pb_symbol.PBEnum object at 0x02BEE450>, 'Person::PhoneType'))

('int64', ('__int64', <pb_symbol.PBEntity object at 0x02BE3FB0>, ''))

('uint32', ('unsigned int', <pb_symbol.PBEntity object at 0x02BE3FF0>, ''))

('AddressBook', ('AddressBook', <pb_symbol.PBMessage object at 0x02BEE7B0>, ''))

参考

protobuf的git地址:https://github.com/google/protobuf
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: