您的位置:首页 > 其它

SPServer源码分析(四): 核心服务器类SP_Server分析

2010-09-26 12:09 399 查看
1。进入SP_Server的runForever函数,该函数最终转入start()函数执行

2。start函数是主要执行函数

函数内部使用了libevent.有关libevent编程参考官方主页。

函数内部涉及重要的类有:

SP_Executor(任务执行者,内部封装线程池)

/*
* Copyright 2007 Stephen Liu
* For license terms, see the file COPYING along with this library.
*/

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <signal.h>

#include "spserver.hpp"
#include "speventcb.hpp"
#include "sphandler.hpp"
#include "spsession.hpp"
#include "spexecutor.hpp"
#include "sputils.hpp"
#include "spiochannel.hpp"
#include "spioutils.hpp"

#include "event_msgqueue.h"

SP_Server :: SP_Server( const char * bindIP, int port,
SP_HandlerFactory * handlerFactory )
{
snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );
mPort = port;
mIsShutdown = 0;
mIsRunning = 0;

mHandlerFactory = handlerFactory;
mIOChannelFactory = NULL;

mTimeout = 600;
mMaxThreads = 4;
mReqQueueSize = 128;
mMaxConnections = 256;
mRefusedMsg = strdup( "System busy, try again later." );
}

SP_Server :: ~SP_Server()
{
if( NULL != mHandlerFactory ) delete mHandlerFactory;
mHandlerFactory = NULL;

if( NULL != mIOChannelFactory ) delete mIOChannelFactory;
mIOChannelFactory = NULL;

if( NULL != mRefusedMsg ) free( mRefusedMsg );
mRefusedMsg = NULL;
}

void SP_Server :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory )
{
mIOChannelFactory = ioChannelFactory;
}

void SP_Server :: setTimeout( int timeout )
{
mTimeout = timeout > 0 ? timeout : mTimeout;
}

void SP_Server :: setMaxThreads( int maxThreads )
{
mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;
}

void SP_Server :: setMaxConnections( int maxConnections )
{
mMaxConnections = maxConnections > 0 ? maxConnections : mMaxConnections;
}

void SP_Server :: setReqQueueSize( int reqQueueSize, const char * refusedMsg )
{
mReqQueueSize = reqQueueSize > 0 ? reqQueueSize : mReqQueueSize;

if( NULL != mRefusedMsg ) free( mRefusedMsg );
mRefusedMsg = strdup( refusedMsg );
}

void SP_Server :: shutdown()
{
mIsShutdown = 1;
}

int SP_Server :: isRunning()
{
return mIsRunning;
}

int SP_Server :: run()
{
int ret = -1;

sp_thread_attr_t attr;
sp_thread_attr_init( &attr );
assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );

sp_thread_t thread;
ret = sp_thread_create( &thread, &attr, eventLoop, this );
sp_thread_attr_destroy( &attr );
if( 0 == ret ) {
sp_syslog( LOG_NOTICE, "Thread #%ld has been created to listen on port [%d]", thread, mPort );
} else {
mIsRunning = 0;
sp_syslog( LOG_WARNING, "Unable to create a thread for TCP server on port [%d], %s",
mPort, strerror( errno ) ) ;
}

return ret;
}

void SP_Server :: runForever()
{
eventLoop( this );
}

sp_thread_result_t SP_THREAD_CALL SP_Server :: eventLoop( void * arg )
{
SP_Server * server = (SP_Server*)arg;

server->mIsRunning = 1;

server->start();

server->mIsRunning = 0;

return 0;
}

void SP_Server :: sigHandler( int, short, void * arg )
{
SP_Server * server = (SP_Server*)arg;
server->shutdown();
}

void SP_Server :: outputCompleted( void * arg )
{
SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];

handler->completionMessage( msg );

free( arg );
}

int SP_Server :: start()
{
#ifdef SIGPIPE
/* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
signal( SIGPIPE, SIG_IGN );
#endif

int ret = 0;
int listenFD = -1;

ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );

if( 0 == ret ) {

SP_EventArg eventArg( mTimeout );

// Clean close on SIGINT or SIGTERM.
struct event evSigInt, evSigTerm;
signal_set( &evSigInt, SIGINT,  sigHandler, this );
event_base_set( eventArg.getEventBase(), &evSigInt );
signal_add( &evSigInt, NULL);
signal_set( &evSigTerm, SIGTERM, sigHandler, this );
event_base_set( eventArg.getEventBase(), &evSigTerm );
signal_add( &evSigTerm, NULL);

SP_AcceptArg_t acceptArg;
memset( &acceptArg, 0, sizeof( SP_AcceptArg_t ) );

if( NULL == mIOChannelFactory ) {
mIOChannelFactory = new SP_DefaultIOChannelFactory();
}
acceptArg.mEventArg = &eventArg;
acceptArg.mHandlerFactory = mHandlerFactory;
acceptArg.mIOChannelFactory = mIOChannelFactory;
acceptArg.mReqQueueSize = mReqQueueSize;
acceptArg.mMaxConnections = mMaxConnections;
acceptArg.mRefusedMsg = mRefusedMsg;

struct event evAccept;
event_set( &evAccept, listenFD, EV_READ|EV_PERSIST,
SP_EventCallback::onAccept, &acceptArg );
event_base_set( eventArg.getEventBase(), &evAccept );
event_add( &evAccept, NULL );

SP_Executor workerExecutor( mMaxThreads, "work" );
SP_Executor actExecutor( 1, "act" );
SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();

/* Start the event loop. */
while( 0 == mIsShutdown ) {
event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );

for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
workerExecutor.execute( task );
}

for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();

void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
arg[ 0 ] = (void*)completionHandler;
arg[ 1 ] = (void*)msg;

actExecutor.execute( outputCompleted, arg );
}
}

delete completionHandler;

sp_syslog( LOG_NOTICE, "Server is shutdown." );

event_del( &evAccept );

signal_del( &evSigTerm );
signal_del( &evSigInt );

sp_close( listenFD );
}

return ret;
}


L160初始化服务器监听,188-192初始化事件,194初始化任务执行者workerExecutor

这里使用了半同步、半异步模式。

event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );运行在主线程当中,借助libevent的抓取TCP异步事件。

class SP_EventCallback {
public:
static void onAccept( int fd, short events, void * arg ); //处理客户端连接事件
static void onRead( int fd, short events, void * arg ); //处理客户端读数据事件
static void onWrite( int fd, short events, void * arg ); //处理客户端写数据事件

static void onResponse( void * queueData, void * arg );

static void addEvent( SP_Session * session, short events, int fd );

...}

SP_EventCallback处理各种事件,将对应的后续处理函数入队。由任务执行者对象出队,交给线程池去执行。

class SP_EventHelper {
public:
static void doStart( SP_Session * session ); //对应onAccept事件
static void start( void * arg );

static void doError( SP_Session * session ); //对应onRead和onWrite中的错误处理流程
static void error( void * arg );

static void doTimeout( SP_Session * session ); //对应onRead和onWrite中的超时处理流程
static void timeout( void * arg );

static void doClose( SP_Session * session ); //对应onWrite的正常关闭处理流程
static void myclose( void * arg );

static void doCompletion( SP_EventArg * eventArg, SP_Message * msg ); //对应onResponse中的完成处理流程

...

}

该类当中doXXX函数最用是配合事件处理函数将XXX函数入队。XXX函数是由线程去真正地执行
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: