您的位置:首页 > 其它

epoll定时器实现系列文章:使用epoll+时间堆实现高性能定时器

2018-02-13 15:55 417 查看
http://blog.csdn.net/gbjj123/article/details/25155501
在开发Linux网络程序时,通常需要维护多个定时器,如维护客户端心跳时间、检查多个数据包的超时重传等。如果采用Linux的SIGALARM信号实现,则会带来较大的系统开销,且不便于管理。本文在应用层实现了一个基于时间堆的高性能定时器,同时考虑到定时的粒度问题,由于通过alarm系统调用设置的SIGALARM信号只能以秒为单位触发,因此需要采用其它手段实现更细粒度的定时操作,当然,这里不考虑使用多线程+sleep的实现方法,理由性能太低。通常的做法还有采用基于升序的时间链表,但升序时间链表的插入操作效率较低,需要遍历链表。因此本实现方案使用最小堆来维护多个定时器,插入O(logn)、删除O(1)、查找O(1)的效率较高。首先是每个定时器的定义:
[cpp] view plain copyclass heap_timer  
{  
public:  
    heap_timer( int ms_delay )  
    {  
        gettimeofday( &expire, NULL );  
        expire.tv_usec += ms_delay * 1000;  
        if ( expire.tv_usec > 1000000 )  
        {  
            expire.tv_sec += expire.tv_usec / 1000000;  
            expire.tv_usec %= 1000000;  
        }  
    }  
  
public:  
    struct timeval expire;  
    void (*cb_func)( client_data* );  
    client_data* user_data;  
    ~heap_timer()  
    {  
        delete user_data;  
    }  
};  

包括一个超时时间expire、超时回调函数cb_func以及一个user_data变量,user_data用于存储与定时器相关的用户数据,用户数据可以根据不同的应用场合进行修改,这里实现的是一个智能博物馆的网关,网关接收来自zigbee协调器的用户数据,并为每个用户维护一段等待时间T,在T到来之前,同一个用户的所有数据都存放到user_data的target_list中,当T到来时,根据target_list列表选择一个适当的target并发送到ip_address,同时删除定时器(有点扯远了=。=)。总之,要实现的功能就是给每个用户维护一个定时器,定时值到来时做一些操作。[cpp] view plain copyclass client_data  
{  
public:  
    client_data(char *address):target_count(0)  
    {  
        strcpy(ip_address,address);  
    }  
private:  
    char ip_address[32];  
    target target_list[64];  
    int target_count;  
    ......  
};  
以下是时间堆的类定义,包括了一些基本的堆操作:插入、删除、扩容,还包括了定时器溢出时的操作函数tick()[cpp] view plain copyclass time_heap  
{  
public:  
    time_heap( int cap  = 1) throw ( std::exception )  
        : capacity( cap ), cur_size( 0 )  
    {  
        array = new heap_timer* [capacity];  
        if ( ! array )  
        {  
            throw std::exception();  
        }  
        for( int i = 0; i < capacity; ++i )  
        {  
            array[i] = NULL;  
        }  
    }  
  
    ~time_heap()  
    {  
        for ( int i =  0; i < cur_size; ++i )  
        {  
            delete array[i];  
        }  
        delete [] array;  
    }  
  
public:  
    int get_cursize()  
    {  
        return cur_size;  
    }  
  
    void add_timer( heap_timer* timer ) throw ( std::exception )  
    {  
        if( !timer )  
        {  
            return;  
        }  
        if( cur_size >= capacity )  
        {  
            resize();  
        }  
        int hole = cur_size++;  
        int parent = 0;  
        for( ; hole > 0; hole=parent )  
        {  
            parent = (hole-1)/2;  
            if ( timercmp( &(array[parent]->expire), &(timer->expire), <= ) )  
            {  
                break;  
            }  
            array[hole] = array[parent];  
        }  
        array[hole] = timer;  
    }  
    void del_timer( heap_timer* timer )  
    {  
        if( !timer )  
        {  
            return;  
        }  
        // lazy delelte  
        timer->cb_func = NULL;  
    }  
    int top(struct timeval &time_top) const  
    {  
        if ( empty() )  
        {  
            return 0;  
        }  
        time_top = array[0]->expire;  
        return 1;  
    }  
    void pop_timer()  
    {  
        if( empty() )  
        {  
            return;  
        }  
        if( array[0] )  
        {  
            delete array[0];  
            array[0] = array[--cur_size];  
            percolate_down( 0 );  
        }  
    }  
    void tick()  
    {  
        heap_timer* tmp = array[0];  
        struct timeval cur;  
        gettimeofday( &cur, NULL );  
        while( !empty() )  
        {  
            if( !tmp )  
            {  
                break;  
            }  
            if( timercmp( &cur, &(tmp->expire), < ) )  
            {  
                break;  
            }  
            if( array[0]->cb_func )  
            {  
                array[0]->cb_func( array[0]->user_data );  
            }  
            pop_timer();  
            tmp = array[0];  
        }  
    }  
    bool empty() const  
    {  
        return cur_size == 0;  
    }  
    heap_timer** get_heap_array()  
    {  
        return array;  
    }  
  
private:  
    void percolate_down( int hole )  
    {  
        heap_timer* temp = array[hole];  
        int child = 0;  
        for ( ; ((hole*2+1) <= (cur_size-1)); hole=child )  
        {  
            child = hole*2+1;  
            if ( (child < (cur_size-1)) && timercmp( &(array[child+1]->expire), &(array[child]->expire), < ) )  
            {  
                ++child;  
            }  
            if ( timercmp( &(array[child]->expire), &(temp->expire), < ) )  
            {  
                array[hole] = array[child];  
            }  
            else  
            {  
                break;  
            }  
        }  
        array[hole] = temp;  
    }  
    void resize() throw ( std::exception )  
    {  
        heap_timer** temp = new heap_timer* [2*capacity];  
        for( int i = 0; i < 2*capacity; ++i )  
        {  
            temp[i] = NULL;  
        }  
        if ( ! temp )  
        {  
            throw std::exception();  
        }  
        capacity = 2*capacity;  
        for ( int i = 0; i < cur_size; ++i )  
        {  
            temp[i] = array[i];  
        }  
        delete [] array;  
        array = temp;  
    }  
  
  
private:  
    heap_timer** array;  
    int capacity;  
    int cur_size;  
};  
如何用epoll实现多个定时器的操作是本设计的关键,我们知道,epoll_wait的最后一个参数是阻塞等待的时候,单位是毫秒。可以这样设计:1、当时间堆中没有定时器时,epoll_wait的超时时间T设为-1,表示一直阻塞等待新用户的到来;2、当时间堆中有定时器时,epoll_wait的超时时间T设为最小堆堆顶的超时值,这样可以保证让最近触发的定时器能得以执行;3、在epoll_wait阻塞等待期间,若有其它的用户到来,则epoll_wait返回n>0,进行常规的处理,随后应重新设置epoll_wait为小顶堆堆顶的超时时间。为此,本实现对epoll_wait进行了封装,名为tepoll_wait,调用接口与epoll_wait差不多,但返回值有所不同:tepoll_wait不返回n=0的情况(即超时),因为超时事件在tepoll_wait中进行处理,只有等到n>0(即在等待过程中有用户数据到来)或者n<0(出现错误)才进行返回。废话不多说,看代码最清楚:[cpp] view plain copyvoid timer_handler()  
{  
    heap.tick();  
    //setalarm();  
}  
  
/* tselect - select with timers */  
int tepoll_wait( int epollfd, epoll_event *events, int max_event_number )  
{  
    struct timeval now;  
    struct timeval tv;  
    struct timeval *tvp;  
    //tevent_t *tp;  
    int n;  
  
    for ( ;; )  
    {  
        if ( gettimeofday( &now, NULL ) < 0 )  
            perror("gettimeofday");  
        struct timeval time_top;  
        if ( heap.top(time_top) )  
        {  
            tv.tv_sec = time_top.tv_sec - now.tv_sec;;  
            tv.tv_usec = time_top.tv_usec - now.tv_usec;  
            if ( tv.tv_usec < 0 )  
            {  
                tv.tv_usec += 1000000;  
                tv.tv_sec--;  
            }  
            tvp = &tv;  
        }  
        else  
            tvp = NULL;  
  
        if(tvp == NULL)  
            n = epoll_wait( epollfd, events, max_event_number, -1 );  
        else  
            n = epoll_wait( epollfd, events, max_event_number, tvp->tv_sec*1000 + tvp->tv_usec/1000 );  
        if ( n < 0 )  
            return -1;  
        if ( n > 0 )  
            return n;  
  
        timer_handler();  
    }  
}  
代码一目了然,在tepoll_wait中,是个死循环,只有等到上述两种情况发生时,才进行返回,此时在调用方进行处理,处理过程跟epoll_wait一样。[cpp] view plain copywhile( !stop_server )  
    {  
        number = tepoll_wait( epollfd, events, MAX_EVENT_NUMBER);  
        for ( i= 0; i < number; i++ )  
        {  
            int fd = events[i].data.fd;  
            if ( (events[i].events & EPOLLIN)&& (fd == uart_fd) )  
            {  
               //读取用户数据  
                if( (timer_id = find_exist_timer(ip_address)) != -1)  
                {  
                    //add to the exist timer  
                    heap_timer ** heap_array = heap.get_heap_array();  
                    heap_array[timer_id]->user_data->add_target(RSSI,target_id);  
                    continue;  
                }  
<span style="white-space:pre;">     </span>//new timer  
                heap_timer *timer = new heap_timer(200);  
                timer->cb_func = cb_func;  
                timer->user_data = new client_data(ip_address);  
                timer->user_data->add_target(RSSI,target_id);  
                heap.add_timer(timer);  
            }  
            else if( ( fd == pipefd[0] ) && ( events[i].events & EPOLLIN ) )  
            {  
                //此处进行了统一信号源处理,通过双向管道来获取SIGTERM以及SIGINT的信号,在主循环中进行统一处理  
<span style="white-space:pre;">     </span>char signals[1024];  
                ret = recv( pipefd[0], signals, sizeof( signals ), 0 );  
                if( ret == -1 )  
                {  
                    continue;  
                }  
                else if( ret == 0 )  
                {  
                    continue;  
                }  
                else  
                {  
                    for( int i = 0; i < ret; ++i )  
                    {  
                        switch( signals[i] )  
                        {  
                        case SIGTERM:  
                        case SIGINT:  
                        {  
                            stop_server = true;  
                        }  
  
                        }  
                    }  
                }  
            }  
        }  
    }  
实测效果非常不错,欢迎分享!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: