您的位置:首页 > 理论基础 > 计算机网络

淘宝开源网络框架tbnet之buffer

2014-01-27 18:17 288 查看
在上篇博文中,我们讨论了tbnet库中有关数据包的转换和处理的部分,接下来我们将来看看tbnet库的buffer部分,这部分应该来说很重要也很底层,对于buffer的封装这块内容,在很多的场合都使用,尽管大家的封装实现方式不一样,但是思想应该是一致的:如何用一个buffer来完成上层packet的通讯,在底层这个层面,我们看到的buffer都是字节式的buffer,也可以称之为二进制buffer,因为在这个层面buffer是没有结构可言的,开场白说多了,下面我们就来看看tbnet中的buffer实现思想吧,代码如下:

class DataBuffer {

public:
/*
* ¹¹Ôì
*/
DataBuffer() {
_pend = _pfree = _pdata = _pstart = NULL;
}
char *getData() {
return (char*)_pdata;
}

int getDataLen() {
return static_cast<int32_t>(_pfree - _pdata);
}

char *getFree() {
return (char*)_pfree;
}

int getFreeLen() {
return static_cast<int32_t>(_pend - _pfree);
}

void drainData(int len) {
_pdata += len;

if (_pdata >= _pfree) {
clear();
}
}
void shrink() {
if (_pstart == NULL) {
return;
}
if ((_pend - _pstart) <= MAX_BUFFER_SIZE || (_pfree - _pdata) > MAX_BUFFER_SIZE) {
return;
}

int dlen = static_cast<int32_t>(_pfree - _pdata);
if (dlen < 0) dlen = 0;

unsigned char *newbuf = (unsigned char*)malloc(MAX_BUFFER_SIZE);
assert(newbuf != NULL);

if (dlen > 0) {
memcpy(newbuf, _pdata, dlen);
}
free(_pstart);

_pdata = _pstart = newbuf;
_pfree = _pstart + dlen;
_pend = _pstart + MAX_BUFFER_SIZE;

return;
}
...
void writeInt8(uint8_t n) {
expand(1);
*_pfree++ = (unsigned char)n;
}

void writeInt16(uint16_t n) {
expand(2);
_pfree[1] = (unsigned char)n;
n = static_cast<uint16_t>(n >> 8);
_pfree[0] = (unsigned char)n;
_pfree += 2;
}
...
void fillInt8(unsigned char *dst, uint8_t n) {
*dst = n;
}

void fillInt16(unsigned char *dst, uint16_t n) {
dst[1] = (unsigned char)n;
n = static_cast<uint16_t>(n >> 8);
dst[0] = (unsigned char)n;
}

void fillInt32(unsigned char *dst, uint32_t n) {
dst[3] = (unsigned char)n;
n >>= 8;
dst[2] = (unsigned char)n;
n >>= 8;
dst[1] = (unsigned char)n;
n >>= 8;
dst[0] = (unsigned char)n;
}
...
void writeString(const char *str) {
int len = (str ? static_cast<int32_t>(strlen(str)) : 0);
if (len>0) len ++;
expand(static_cast<int32_t>(len+sizeof(uint32_t)));
writeInt32(len);
if (len>0) {
memcpy(_pfree, str, len);
_pfree += (len);
}
}

void writeString(const std::string& str) {
writeString(str.c_str());
}
...
void writeVector(const std::vector<int32_t>& v) {
const uint32_t iLen = static_cast<uint32_t>(v.size());
writeInt32(iLen);
for (uint32_t i = 0; i < iLen; ++i) {
writeInt32(v[i]);
}
}

void writeVector(const std::vector<uint32_t>& v) {
const uint32_t iLen = static_cast<uint32_t>(v.size());
writeInt32(iLen);
for (uint32_t i = 0; i < iLen; ++i) {
writeInt32(v[i]);
}
}
...
uint8_t readInt8() {
return (*_pdata++);
}

uint16_t readInt16() {
uint16_t n = _pdata[0];
n = static_cast<uint16_t>(n << 8);
n = static_cast<uint16_t>(n | _pdata[1]);
_pdata += 2;
return n;
}

uint32_t readInt32() {
uint32_t n = _pdata[0];
n <<= 8;
n |= _pdata[1];
n <<= 8;
n |= _pdata[2];
n <<= 8;
n |= _pdata[3];
_pdata += 4;
assert(_pfree>=_pdata);
return n;
}
...
bool readString(char *&str, int len) {
if (_pdata + sizeof(int) > _pfree) {
return false;
}
int slen = readInt32();
if (_pfree - _pdata < slen) {
slen = static_cast<int32_t>(_pfree - _pdata);
}
if (str == NULL && slen > 0) {
str = (char*)malloc(slen);
len = slen;
}
if (len > slen) {
len = slen;
}
if (len > 0) {
memcpy(str, _pdata, len);
str[len-1] = '\0';
}
_pdata += slen;
assert(_pfree>=_pdata);
return true;
}
...
bool readVector(std::vector<int32_t>& v) {
const uint32_t len = readInt32();
for (uint32_t i = 0; i < len; ++i) {
v.push_back(readInt32());
}
return true;
}

bool readVector(std::vector<uint32_t>& v) {
const uint32_t len = readInt32();
for (uint32_t i = 0; i < len; ++i) {
v.push_back(readInt32());
}
return true;
}
...
private:
/*
* expand
*/
inline void expand(int need) {
if (_pstart == NULL) {
int len = 256;
while (len < need) len <<= 1;
_pfree = _pdata = _pstart = (unsigned char*)malloc(len);
_pend = _pstart + len;
} else if (_pend - _pfree < need) { // ¿Õ¼ä²»¹»
int flen = static_cast<int32_t>((_pend - _pfree) + (_pdata - _pstart));
int dlen = static_cast<int32_t>(_pfree - _pdata);

if (flen < need || flen * 4 < dlen) {
int bufsize = static_cast<int32_t>((_pend - _pstart) * 2);
while (bufsize - dlen < need)
bufsize <<= 1;

unsigned char *newbuf = (unsigned char *)malloc(bufsize);
if (newbuf == NULL)
{
TBSYS_LOG(ERROR, "expand data buffer failed, length: %d", bufsize);
}
assert(newbuf != NULL);
if (dlen > 0) {
memcpy(newbuf, _pdata, dlen);
}
free(_pstart);

_pdata = _pstart = newbuf;
_pfree = _pstart + dlen;
_pend = _pstart + bufsize;
} else {
memmove(_pstart, _pdata, dlen);
_pfree = _pstart + dlen;
_pdata = _pstart;
}
}
}

private:
unsigned char *_pstart;      // buffer¿ªÊ¼
unsigned char *_pend;        // buffer½áÊø
unsigned char *_pfree;        // free²¿·Ö
unsigned char *_pdata;        // data²¿·Ö
};

代码经过了部分删减,但是依旧很长,其实整个实现不是很难看懂,其主要是用了四个buffer指针和一个实际的buffer,所有的操作都是针对我们的buffer结构,而对于buffer的读和写操作,其实很简单,大家自己就慢慢的分析吧,我在这里之分析一点就是buffer的扩容部分,这部分其实在很多的开源库中都有所涉及,在此仅对tbnet所采用的方式来分析底层buffer的扩容,在一些对字节对齐很严格的地方,如果扩容没有考虑到字节对齐限制的话,很可能会造成服务器宕机的可能,而在我们的tbnet库里面其实有这部分的处理,我们就来看看expand函数的实现细节吧,在一开始expand就设置了一个len,这个len就是我们buffer的初始大小,但是随后会通过需要的空间大小need来按照2的指数来扩容,然后以调整后的len大小来分配内存,这只是针对buffer空间还没有开辟的情况,这种情况很简单,但是对于已经分配了buffer空间的情况来说,流程就不一样了,主体思想是通过buffer所剩的空间来扩容,首先判断need与所剩空间的大小,如果所剩空间不够的话,则需要扩容了,扩容的思想就是就是以实际buffer的大小的2次指数倍扩容,等扩容结束之后,就分配新的buffer结构,然后将之前的buffer内容copy到新buffer中,并且修正当前的四个指针的位置,随后释放掉之前的buffer,在对buffer结构进行扩容的时候,如果原始的buffer中总所剩空间大于need,而联系的所剩空间小于need时,这里面就涉及到了移动buffer内容的部分,调用的的函数就是memove,这个函数效率很高,目前在glibc-2.9版中对此采用了比较好的优化方式,有时间的话,大家可以去看看,在对buffer进行读写的时候,我们需要注意读写的顺序,其实buffer的操作我们可以对此进行优化,通过四个指针完全可以实现一个循环缓冲区,而不用每次当空闲空间太大时而移动数据内容,其实在一般的应用中我们完全可以采用这种方式,

另外在实际的读写buffer的时候,我们可以看到基本上就是针对pfree指针的移动,个人感觉tbnet库中的buffer的一大特色在用它的读写字节操作,这方面的东西自己也曾写过,但是对比tbnet库的buffer来讲,我的实现思路有些问题,这块内容也希望大家能够好好地体会,如果可以的话,完全可以将这部分内容实现为一个环形缓存区,然后分别针对读多写少的环境和读少写多的环境来对比下性能情况,尝试着自己去实现一个类似的buffer结构,有时重复造轮子也不是什么坏事,最起码你知道如何去实现一个简单的循环缓冲区,好了,这部分内容到此就技术了,其实也没有说什么,有些东西需要自己去体会体验,东西虽然简单,但是如果不注重细节的话,很可能写出的代码会产生严重的问题,接下来的博文我们将对tbnet中的socket和事件进行讨论,期待吧,呵呵,多谢了,

如果需要,请注明转载.多谢
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: