您的位置:首页 > 运维架构 > 网站架构

libeasy原理,架构和使用方法

2017-07-20 09:05 1411 查看
libeasy 原理、架构和使用方法

简介

libeasy提供一个处理tcp连接的事件驱动的网络框架。框架本身封装好了底层的网络操作,只需要开发者处理其中的各种事件。本文介绍libeasy的一些实现原理,整体框架,以及使用的样例。本文是经过一系列摸索,以及wireshark抓包,再结合一些互联网上一些仅有的资料整理完成,如有理解不当的地方,烦请指出。

基本概念

libeasy 的基本概念有:easy_connection_t(连接), easy_message_t(消息), easy_request_t(请求)。每个连接上有可以有多个消息,通过链表连起来,每个消息可以由多个请求组成,也通过链表连起来。

easy_request_t就相当于应用层的一个具体的包, 多个请求组合起来形成一个完整的消息。在一次长连接中,用户可以接受多次消息。每个request 只属于一个connection。

处理模型

libeasy是基于epoll的事件模型,程序收到事件后,回调注册的事件的函数。调用回调函数的线程池称为IO Thread , 线程的个数在创建eaay事件时指定。

extern easy_io_t *easy_eio_create(easy_io_t *eio, int io_thread_count);


一些简单的请求,可以直接在io thread中处理完成,并且直接返回,这种处理模型称为同步模型。

在一些情况下,处理逻辑比较复杂,比如需要读取磁盘数据,这种情况下,IO Thread会封装成一个请求,放入后端队列,经由另外一个线程池进行处理,同时IO Thread会把当前的事件挂起,等待后端线程唤醒后继续处理当前事件。这种模型称为异步模型。





注册事件的回调

开发者注册一系列回调函数,供libeasy在接受请求时回调。按照回调的顺序,回调函数包括:

on_connect

接受tcp连接时,回调该函数,可以在该事件中做密码验证等事情。

decode

从网络上读取一段文本,并且按照定义的协议,解析成数据结构,供之后处理。

process

处理从decode中解析出的结构,可以是同步处理,也可以是异步处理。

encode

把process的结果,转化成字符串(如果process的结果是要输出的结果,则不需要转化)。然后把结果挂载到request的输出上。r -> opacket = buf;

clean_up

在连接断开前执行的操作,如果在之前的操作中分配了一些内存,需要在这里释放。

on_disconnect

连接断开时的操作。

使用方法

libeasy有一些基本的数据结构

easy_pool_t:共享内存池。在一次请求中,一次性分配大块内存,当用户使用小内存的时候,从pool中分配,从而达到避免分配大量小块内存的情况。因为大量小块内存会非常浪费CPU。

void * ptr = easy_pool_alloc(req->ms->pool,1024);


分配好的内存可以用于初始化任何对象。例如:以下例子中的new不会分配新内存,而是使用ptr指向的内存,并且传入1作为构造函数的参数。

UserInfo * infoPtr = new (ptr)UserInfo(1);


easy_buf_t

管理输入输出缓冲区,用于逐段消费字符串,或分批把结果放入buf进行输出。其中pos指向当前位置,last指向已使用的内存的结束位置,end指向全部内存的结束位置。

easy_buf_t* buf = reinterpret_cast<easy_buf_t*>(easy_pool_alloc(req->ms->pool, 2*1024*1024));
char *data_buffer = reinterpret_cast<char *>(buf + sizeof(easy_buf_t));
buffer_length = 2*1024*1024 - sizeof(easy_buf_t);
buf -> pos = buf -> last =  data_buffer;
buf->end = buf->last + buffer_length;
buf->cleanup = NULL;
easy_list_init(&buf->node);


一个同步处理的样例

#include <iostream>
#include "easy/easy_io_struct.h"
#include "easy/easy_io.h"
using namespace std;
struct MyMessage
{
int mLen;
int mSeq;
string mMessage;
public :
MyMessage(int len,int seq,const string & msg):
mLen(len),mSeq(seq),mMessage(msg)
{}

};
int  my_on_connect (easy_connection_t *c)
{
return 0;
}

int my_easy_io_process_pt(easy_request_t *r)
{
MyMessage * msg = (MyMessage*)r -> ipacket;
char buffer[1024];
sprintf(buffer,"i got you message,len:%d,seq:%d,message:%s",msg ->mLen,msg ->mSeq,msg ->mMessage.c_str());
string ret(buffer);
MyMessage * outMsg = new MyMessage(ret.size(),msg ->mSeq+1,ret);
r -> opacket = outMsg;
delete msg;
return 0;
}
int my_on_disconnect (easy_connection_t * c)
{
return 0;
}
void * my_easy_decode_pt(easy_message_t *m)
{
int len = (*((uint32_t*)m -> input->pos)) >> 8;
int seq = (*((uint32_t*)m -> input->pos)) && 0xff;
MyMessage * msg = new MyMessage(len,seq,string(m ->input->pos+4,len));
m ->input->pos= m ->input -> last;
return msg;
}
int my_easy_encode_pt(easy_request_t *r, void *packet)
{
MyMessage * msg = (MyMessage*) packet;
easy_buf_t * buf =easy_buf_create(r ->  ms -> pool, msg ->mMessage.size());
easy_buf_set_
4000
data(r ->ms ->pool, buf, msg ->mMessage.c_str(),msg ->mMessage.size());
easy_request_addbuf(r, buf); //加入输出队列
delete msg;
return 0;
}
int main(int argc,char ** argv)
{
easy_io_handler_pt handler;
memset(&handler, 0, sizeof(easy_io_handler_pt));
handler.on_connect = my_on_connect;
handler.decode = my_easy_decode_pt;
handler.encode= my_easy_encode_pt;
handler.on_disconnect = my_on_disconnect;
handler.process = my_easy_io_process_pt;
easy_io_t *eio = new easy_io_t();
memset(eio,0,sizeof(easy_io_t));
eio = easy_eio_create(eio,10);//创建10个线程
eio->tcp_defer_accept = 0;
easy_listen_t* listen = easy_connection_add_listen(eio, NULL, 3308, &handler);//侦听3308端口
int rc = easy_eio_start(eio);
easy_eio_wait(eio);
}


异步使用方法

异步模型,最主要的区别是在process 函数中。



在process 中,把请求放入后端队列,并且return EASY_AGAIN 表示将请求挂起,不会继续调用接下来的 encode和clean_up ,直到被后端线程唤醒。encode函数和同步模型一致,把req -> opacket放入输出缓存中。

process再次回调的的函数实现如下:

if (EASY_AGAIN == r->retcode)  //wakeup request thread called when send result set sync
{
//EASY_AGAIN说明后续服务器端还有包需要发给客户端
if (NULL != r->client_wait)
{
if (r->ms->c->conn_has_error == 1)
{
r->client_wait->status = EASY_CONN_CLOSE;
}
easy_client_wait_wakeup_request(r);
ret = EASY_AGAIN;
}
//else no more data send
ret = EASY_OK;
}


后端线程的实现:

req->opacket=buf;//把结果挂载到opacket上
easy_client_wait_t wait_obj;
if(shouldWait)
{
wait_obj.done_count = 0;
//用于IO线程唤醒工作线程
easy_client_wait_init(&wait_obj);
req->client_wait = &wait_obj;
req->retcode = -11;
req->waiting = 1;
}
//io线程被唤醒,r->opacket被挂过去,send_response->easy_connection_request_done
easy_request_wakeup(req);
// IO线程回调 process(easy_request_t* r)的时候唤醒工作线程
if(shouldWait)
{
wait_client_obj(wait_obj);//工作线程在此处阻塞,等待唤醒
if(wait_obj.status==3){
ret=-124;
}
easy_client_wait_cleanup(&wait_obj);
req->client_wait = NULL;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  libeasy