您的位置:首页 > 其它

freeDiameter源码阅读之消息队列和消息处理流程

2015-01-14 11:14 585 查看
http://gmd20.blog.163.com/blog/static/1684392320133312333840/

fifo消息队列的定义和基本操作

=============

include/freeDiameter/libfdproto.h

-----------

/*

* FUNCTION: fd_fifo_post

*

* PARAMETERS:

* queue : The queue in which the element must be posted.

* item : The element that is put in the queue.

*

* DESCRIPTION:

* An element is added in a queue. Elements are retrieved from the queue in FIFO order

* with the fd_fifo_get, fd_fifo_tryget, or fd_fifo_timedget functions.

*

* RETURN VALUE:

* 0 : The element is queued.

* EINVAL : A parameter is invalid.

* ENOMEM : Not enough memory to complete the operation.

*/

int
fd_fifo_post_int (
struct
fifo *
queue,
void
**
item );

#define
fd_fifo_post(queue,
item)
\

fd_fifo_post_int((queue),
(void
*)(item))

/*

* FUNCTION: fd_fifo_get

*

* PARAMETERS:

* queue : The queue from which the first element must be retrieved.

* item : On return, the first element of the queue is stored here.

*

* DESCRIPTION:

* This function retrieves the first element from a queue. If the queue is empty, the function will
block the

* thread until a new element is posted to the queue, or until the thread is canceled (in which case
the

* function does not return).

*

* RETURN VALUE:

* 0 : A new element has been retrieved.

* EINVAL : A parameter is invalid.

*/

int
fd_fifo_get_int (
struct
fifo *
queue,
void
**
item );

#define
fd_fifo_get(queue,
item)
\

fd_fifo_get_int((queue),
(void
*)(item))

------------------------------------------------------------

fifo.c

--------------

struct
fifo {

int
eyec;
/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */

pthread_mutex_t
mtx;
/* Mutex protecting this queue */

pthread_cond_t
cond_pull;
/* condition variable for pulling threads */

pthread_cond_t
cond_push;
/* condition variable for pushing threads */

struct
fd_list list;
/* sentinel for the list of elements */

int
count;
/* number of objects in the list */

int
thrs;
/* number of threads waiting for a new element (when count is 0) */

int
max;
/* maximum number of items to accept if not 0 */

int
thrs_push;
/* number of threads waitnig to push an item */

uint16_t
high;
/* High level threshold (see libfreeDiameter.h for details) */

uint16_t
low;
/* Low level threshhold */

void
*data;
/* Opaque pointer for threshold callbacks */

void
(*h_cb)(struct
fifo *,
void
**);
/* The callbacks */

void
(*l_cb)(struct
fifo *,
void
**);

int
highest;/*
The highest count value for which h_cb has been called */

int
highest_ever;
/* The max count value this queue has reached (for tweaking) */

};

-----------------

fd_fifo_post_int

pthread_mutex_lock(
&queue->mtx
这个队列是由锁保护的链表。

ret
=
pthread_cond_wait(
&queue->cond_push,
&queue->mtx
);
//循环等待条件变量,等到可以插入

fd_list_insert_before

pthread_cond_signal(&queue->cond_pull
// 通知等待的读取线程

pthread_cond_signal(&queue->cond_push

pthread_mutex_unlock(
&queue->mtx
)

----------------

fd_fifo_get_int

fifo_tget

fifo_tget

pthread_mutex_lock(
&queue->mtx

*item
=
mq_pop(queue);
// 如有有就从链表取出元素

pthread_cond_timedwait(
&queue->cond_pull,
&queue->mtx,
abstime );
//有超时的等待

pthread_cond_wait(
&queue->cond_pull,
&queue->mtx
);

pthread_mutex_unlock(
&queue->mtx
)

=========================================

使用的几个例子

=========

evens.c

---------

fd_event_send

fd_fifo_post

fd_event_get

fd_fifo_get

====================

message.c

----------

fd_msg_send

fd_msg_send_timeout

fd_fifo_post
//把消息加到队列

===================

peers.c

----------

fd_peer_alloc

fd_fifo_new(&p->p_tosend

=======================

上一篇文章说到peer的状态机处理进程会把消息放到全局的消息队列里面去。

p_psm.c

p_psm_th

fd_fifo_post(fd_g_incoming,
&msg),
goto
psm_end );

下面看看这个消息队列是怎么使用和工作的

---------------

queue.c
全局的i消息队列的定义在这里

/* The global message queues */

struct
fifo *
fd_g_incoming =
NULL;
接收到的新消息

struct
fifo *
fd_g_outgoing =
NULL;

struct
fifo *
fd_g_local =
NULL;

/* Initialize the message queues. */

int
fd_queues_init(void)

{

TRACE_ENTRY();

CHECK_FCT(
fd_fifo_new (
&fd_g_incoming,
20
)
);

CHECK_FCT(
fd_fifo_new (
&fd_g_outgoing,
30
)
);

CHECK_FCT(
fd_fifo_new (
&fd_g_local,
25
)
);

return
0;

==============================

routing_dispatch.c

------------

/* The dispatch thread */

static
void
*
dispatch_thr(void
*
arg)

{

return
process_thr(arg,
msg_dispatch,
fd_g_local,
"Dispatch");

}

/* The (routing-in) thread -- see description in freeDiameter.h */

static
void
*
routing_in_thr(void
*
arg)

{

return
process_thr(arg,
msg_rt_in,
fd_g_incoming,
"Routing-IN");

}

/* The (routing-out) thread -- see description in freeDiameter.h */

static
void
*
routing_out_thr(void
*
arg)

{

return
process_thr(arg,
msg_rt_out,
fd_g_outgoing,
"Routing-OUT");

}

这几个线程在程序启动时由
main ()
->fd_core_parseconf
->
fd_rtdisp_init ()
函数里面创建

---------

process_thr

do
{
循环从消息队列中取出消息

ret
=
fd_fifo_timedget (
queue,
&msg,
&ts
);

//调用msg_rt_in
函数 处理消息

(*action_cb)(&msg),
goto
fatal_error)

}
while
(1)

-------------------

*
The
ROUTING-IN
message processing */

static int msg_rt_in(struct msg ** pmsg)

fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) //判断是不是bending能处理的app

fd_fifo_post(fd_g_local,
pmsg)
);
//本地的转交local 线程处理

fd_fifo_post(fd_g_incoming
// 有时需要转换,然后重新入队再走一下处理流程

/*
Call all registered callbacks for this message */

pthread_rwlock_rdlock(
&rt_fwd_lock
)

for
{

ret
=
(*rh->rt_fwd_cb)(rh->cbdata,
pmsg),

]

pthread_rwlock_unlock(
&rt_fwd_lock
)

------------------

本地的fd_g_local,
队列对应的处理函数

/* The DISPATCH message processing */

static
int
msg_dispatch(struct
msg **
pmsg

(1)
对于
answer消息,看看是不是对应request发送的时候注册了回调函数,

fd_msg_answ_getq

fd_msg_anscb_get

(*anscb)(data,
pmsg);
// 调完回调处理就完成了。

(2)

fd_msg_sess_get
获取对应的session

fd_msg_dispatch
//调用看看谁注册要这个消息的,调用扩展应用的回调

fd_fifo_post(fd_g_outgoing,
pmsg)
根据扩展的返回状态,决定是不是往外发消息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: