freeDiameter源码阅读之消息队列和消息处理流程
2015-01-14 11:14
585 查看
http://gmd20.blog.163.com/blog/static/1684392320133312333840/
fifo消息队列的定义和基本操作
=============
include/freeDiameter/libfdproto.h
-----------
/*
* FUNCTION: fd_fifo_post
*
* PARAMETERS:
* queue : The queue in which the element must be posted.
* item : The element that is put in the queue.
*
* DESCRIPTION:
* An element is added in a queue. Elements are retrieved from the queue in FIFO order
* with the fd_fifo_get, fd_fifo_tryget, or fd_fifo_timedget functions.
*
* RETURN VALUE:
* 0 : The element is queued.
* EINVAL : A parameter is invalid.
* ENOMEM : Not enough memory to complete the operation.
*/
int
fd_fifo_post_int (
struct
fifo *
queue,
void
**
item );
#define
fd_fifo_post(queue,
item)
\
fd_fifo_post_int((queue),
(void
*)(item))
/*
* FUNCTION: fd_fifo_get
*
* PARAMETERS:
* queue : The queue from which the first element must be retrieved.
* item : On return, the first element of the queue is stored here.
*
* DESCRIPTION:
* This function retrieves the first element from a queue. If the queue is empty, the function will
block the
* thread until a new element is posted to the queue, or until the thread is canceled (in which case
the
* function does not return).
*
* RETURN VALUE:
* 0 : A new element has been retrieved.
* EINVAL : A parameter is invalid.
*/
int
fd_fifo_get_int (
struct
fifo *
queue,
void
**
item );
#define
fd_fifo_get(queue,
item)
\
fd_fifo_get_int((queue),
(void
*)(item))
------------------------------------------------------------
fifo.c
--------------
struct
fifo {
int
eyec;
/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
pthread_mutex_t
mtx;
/* Mutex protecting this queue */
pthread_cond_t
cond_pull;
/* condition variable for pulling threads */
pthread_cond_t
cond_push;
/* condition variable for pushing threads */
struct
fd_list list;
/* sentinel for the list of elements */
int
count;
/* number of objects in the list */
int
thrs;
/* number of threads waiting for a new element (when count is 0) */
int
max;
/* maximum number of items to accept if not 0 */
int
thrs_push;
/* number of threads waitnig to push an item */
uint16_t
high;
/* High level threshold (see libfreeDiameter.h for details) */
uint16_t
low;
/* Low level threshhold */
void
*data;
/* Opaque pointer for threshold callbacks */
void
(*h_cb)(struct
fifo *,
void
**);
/* The callbacks */
void
(*l_cb)(struct
fifo *,
void
**);
int
highest;/*
The highest count value for which h_cb has been called */
int
highest_ever;
/* The max count value this queue has reached (for tweaking) */
};
-----------------
fd_fifo_post_int
pthread_mutex_lock(
&queue->mtx
这个队列是由锁保护的链表。
ret
=
pthread_cond_wait(
&queue->cond_push,
&queue->mtx
);
//循环等待条件变量,等到可以插入
fd_list_insert_before
pthread_cond_signal(&queue->cond_pull
// 通知等待的读取线程
pthread_cond_signal(&queue->cond_push
pthread_mutex_unlock(
&queue->mtx
)
----------------
fd_fifo_get_int
fifo_tget
fifo_tget
pthread_mutex_lock(
&queue->mtx
*item
=
mq_pop(queue);
// 如有有就从链表取出元素
pthread_cond_timedwait(
&queue->cond_pull,
&queue->mtx,
abstime );
//有超时的等待
pthread_cond_wait(
&queue->cond_pull,
&queue->mtx
);
pthread_mutex_unlock(
&queue->mtx
)
=========================================
使用的几个例子
=========
evens.c
---------
fd_event_send
fd_fifo_post
fd_event_get
fd_fifo_get
====================
message.c
----------
fd_msg_send
fd_msg_send_timeout
fd_fifo_post
//把消息加到队列
===================
peers.c
----------
fd_peer_alloc
fd_fifo_new(&p->p_tosend
=======================
上一篇文章说到peer的状态机处理进程会把消息放到全局的消息队列里面去。
p_psm.c
p_psm_th
fd_fifo_post(fd_g_incoming,
&msg),
goto
psm_end );
下面看看这个消息队列是怎么使用和工作的
---------------
queue.c
全局的i消息队列的定义在这里
/* The global message queues */
struct
fifo *
fd_g_incoming =
NULL;
接收到的新消息
struct
fifo *
fd_g_outgoing =
NULL;
struct
fifo *
fd_g_local =
NULL;
/* Initialize the message queues. */
int
fd_queues_init(void)
{
TRACE_ENTRY();
CHECK_FCT(
fd_fifo_new (
&fd_g_incoming,
20
)
);
CHECK_FCT(
fd_fifo_new (
&fd_g_outgoing,
30
)
);
CHECK_FCT(
fd_fifo_new (
&fd_g_local,
25
)
);
return
0;
==============================
routing_dispatch.c
------------
/* The dispatch thread */
static
void
*
dispatch_thr(void
*
arg)
{
return
process_thr(arg,
msg_dispatch,
fd_g_local,
"Dispatch");
}
/* The (routing-in) thread -- see description in freeDiameter.h */
static
void
*
routing_in_thr(void
*
arg)
{
return
process_thr(arg,
msg_rt_in,
fd_g_incoming,
"Routing-IN");
}
/* The (routing-out) thread -- see description in freeDiameter.h */
static
void
*
routing_out_thr(void
*
arg)
{
return
process_thr(arg,
msg_rt_out,
fd_g_outgoing,
"Routing-OUT");
}
这几个线程在程序启动时由
main ()
->fd_core_parseconf
->
fd_rtdisp_init ()
函数里面创建
---------
process_thr
do
{
循环从消息队列中取出消息
ret
=
fd_fifo_timedget (
queue,
&msg,
&ts
);
//调用msg_rt_in
函数 处理消息
(*action_cb)(&msg),
goto
fatal_error)
}
while
(1)
-------------------
*
The
ROUTING-IN
message processing */
static int msg_rt_in(struct msg ** pmsg)
fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) //判断是不是bending能处理的app
fd_fifo_post(fd_g_local,
pmsg)
);
//本地的转交local 线程处理
fd_fifo_post(fd_g_incoming
// 有时需要转换,然后重新入队再走一下处理流程
/*
Call all registered callbacks for this message */
pthread_rwlock_rdlock(
&rt_fwd_lock
)
for
{
ret
=
(*rh->rt_fwd_cb)(rh->cbdata,
pmsg),
]
pthread_rwlock_unlock(
&rt_fwd_lock
)
------------------
本地的fd_g_local,
队列对应的处理函数
/* The DISPATCH message processing */
static
int
msg_dispatch(struct
msg **
pmsg
(1)
对于
answer消息,看看是不是对应request发送的时候注册了回调函数,
fd_msg_answ_getq
fd_msg_anscb_get
(*anscb)(data,
pmsg);
// 调完回调处理就完成了。
(2)
fd_msg_sess_get
获取对应的session
fd_msg_dispatch
//调用看看谁注册要这个消息的,调用扩展应用的回调
fd_fifo_post(fd_g_outgoing,
pmsg)
根据扩展的返回状态,决定是不是往外发消息
fifo消息队列的定义和基本操作
=============
include/freeDiameter/libfdproto.h
-----------
/*
* FUNCTION: fd_fifo_post
*
* PARAMETERS:
* queue : The queue in which the element must be posted.
* item : The element that is put in the queue.
*
* DESCRIPTION:
* An element is added in a queue. Elements are retrieved from the queue in FIFO order
* with the fd_fifo_get, fd_fifo_tryget, or fd_fifo_timedget functions.
*
* RETURN VALUE:
* 0 : The element is queued.
* EINVAL : A parameter is invalid.
* ENOMEM : Not enough memory to complete the operation.
*/
int
fd_fifo_post_int (
struct
fifo *
queue,
void
**
item );
#define
fd_fifo_post(queue,
item)
\
fd_fifo_post_int((queue),
(void
*)(item))
/*
* FUNCTION: fd_fifo_get
*
* PARAMETERS:
* queue : The queue from which the first element must be retrieved.
* item : On return, the first element of the queue is stored here.
*
* DESCRIPTION:
* This function retrieves the first element from a queue. If the queue is empty, the function will
block the
* thread until a new element is posted to the queue, or until the thread is canceled (in which case
the
* function does not return).
*
* RETURN VALUE:
* 0 : A new element has been retrieved.
* EINVAL : A parameter is invalid.
*/
int
fd_fifo_get_int (
struct
fifo *
queue,
void
**
item );
#define
fd_fifo_get(queue,
item)
\
fd_fifo_get_int((queue),
(void
*)(item))
------------------------------------------------------------
fifo.c
--------------
struct
fifo {
int
eyec;
/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
pthread_mutex_t
mtx;
/* Mutex protecting this queue */
pthread_cond_t
cond_pull;
/* condition variable for pulling threads */
pthread_cond_t
cond_push;
/* condition variable for pushing threads */
struct
fd_list list;
/* sentinel for the list of elements */
int
count;
/* number of objects in the list */
int
thrs;
/* number of threads waiting for a new element (when count is 0) */
int
max;
/* maximum number of items to accept if not 0 */
int
thrs_push;
/* number of threads waitnig to push an item */
uint16_t
high;
/* High level threshold (see libfreeDiameter.h for details) */
uint16_t
low;
/* Low level threshhold */
void
*data;
/* Opaque pointer for threshold callbacks */
void
(*h_cb)(struct
fifo *,
void
**);
/* The callbacks */
void
(*l_cb)(struct
fifo *,
void
**);
int
highest;/*
The highest count value for which h_cb has been called */
int
highest_ever;
/* The max count value this queue has reached (for tweaking) */
};
-----------------
fd_fifo_post_int
pthread_mutex_lock(
&queue->mtx
这个队列是由锁保护的链表。
ret
=
pthread_cond_wait(
&queue->cond_push,
&queue->mtx
);
//循环等待条件变量,等到可以插入
fd_list_insert_before
pthread_cond_signal(&queue->cond_pull
// 通知等待的读取线程
pthread_cond_signal(&queue->cond_push
pthread_mutex_unlock(
&queue->mtx
)
----------------
fd_fifo_get_int
fifo_tget
fifo_tget
pthread_mutex_lock(
&queue->mtx
*item
=
mq_pop(queue);
// 如有有就从链表取出元素
pthread_cond_timedwait(
&queue->cond_pull,
&queue->mtx,
abstime );
//有超时的等待
pthread_cond_wait(
&queue->cond_pull,
&queue->mtx
);
pthread_mutex_unlock(
&queue->mtx
)
=========================================
使用的几个例子
=========
evens.c
---------
fd_event_send
fd_fifo_post
fd_event_get
fd_fifo_get
====================
message.c
----------
fd_msg_send
fd_msg_send_timeout
fd_fifo_post
//把消息加到队列
===================
peers.c
----------
fd_peer_alloc
fd_fifo_new(&p->p_tosend
=======================
上一篇文章说到peer的状态机处理进程会把消息放到全局的消息队列里面去。
p_psm.c
p_psm_th
fd_fifo_post(fd_g_incoming,
&msg),
goto
psm_end );
下面看看这个消息队列是怎么使用和工作的
---------------
queue.c
全局的i消息队列的定义在这里
/* The global message queues */
struct
fifo *
fd_g_incoming =
NULL;
接收到的新消息
struct
fifo *
fd_g_outgoing =
NULL;
struct
fifo *
fd_g_local =
NULL;
/* Initialize the message queues. */
int
fd_queues_init(void)
{
TRACE_ENTRY();
CHECK_FCT(
fd_fifo_new (
&fd_g_incoming,
20
)
);
CHECK_FCT(
fd_fifo_new (
&fd_g_outgoing,
30
)
);
CHECK_FCT(
fd_fifo_new (
&fd_g_local,
25
)
);
return
0;
==============================
routing_dispatch.c
------------
/* The dispatch thread */
static
void
*
dispatch_thr(void
*
arg)
{
return
process_thr(arg,
msg_dispatch,
fd_g_local,
"Dispatch");
}
/* The (routing-in) thread -- see description in freeDiameter.h */
static
void
*
routing_in_thr(void
*
arg)
{
return
process_thr(arg,
msg_rt_in,
fd_g_incoming,
"Routing-IN");
}
/* The (routing-out) thread -- see description in freeDiameter.h */
static
void
*
routing_out_thr(void
*
arg)
{
return
process_thr(arg,
msg_rt_out,
fd_g_outgoing,
"Routing-OUT");
}
这几个线程在程序启动时由
main ()
->fd_core_parseconf
->
fd_rtdisp_init ()
函数里面创建
---------
process_thr
do
{
循环从消息队列中取出消息
ret
=
fd_fifo_timedget (
queue,
&msg,
&ts
);
//调用msg_rt_in
函数 处理消息
(*action_cb)(&msg),
goto
fatal_error)
}
while
(1)
-------------------
*
The
ROUTING-IN
message processing */
static int msg_rt_in(struct msg ** pmsg)
fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) //判断是不是bending能处理的app
fd_fifo_post(fd_g_local,
pmsg)
);
//本地的转交local 线程处理
fd_fifo_post(fd_g_incoming
// 有时需要转换,然后重新入队再走一下处理流程
/*
Call all registered callbacks for this message */
pthread_rwlock_rdlock(
&rt_fwd_lock
)
for
{
ret
=
(*rh->rt_fwd_cb)(rh->cbdata,
pmsg),
]
pthread_rwlock_unlock(
&rt_fwd_lock
)
------------------
本地的fd_g_local,
队列对应的处理函数
/* The DISPATCH message processing */
static
int
msg_dispatch(struct
msg **
pmsg
(1)
对于
answer消息,看看是不是对应request发送的时候注册了回调函数,
fd_msg_answ_getq
fd_msg_anscb_get
(*anscb)(data,
pmsg);
// 调完回调处理就完成了。
(2)
fd_msg_sess_get
获取对应的session
fd_msg_dispatch
//调用看看谁注册要这个消息的,调用扩展应用的回调
fd_fifo_post(fd_g_outgoing,
pmsg)
根据扩展的返回状态,决定是不是往外发消息
相关文章推荐
- freeDiameter源码阅读之 dictionary树和消息的合法性检查(rule)
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- Volley源码阅读详解(一)---网络任务分发,处理和交付的核心流程
- 安卓异步消息处理机制源码流程
- MQTT---HiveMQ源码详解(十二)Netty-MQTT消息、事件处理(流程)
- DL4J源码阅读(六):LSTM信号前传处理流程
- [置顶] 【消息队列】MSMQ(二)——消息处理流程
- Yii源码阅读笔记 - 请求处理基本流程
- memched1.0源码阅读(4)——事件的处理流程
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- netty源码深入研究(从客户端入手)第三篇(详解写消息的管道处理流程)
- 第二人生的源码分析(三十七)消息处理的完整流程
- Android源码阅读笔记二 消息处理机制
- 【Android】结合源码解析Android消息队列工作流程
- Nginx源码阅读(worker进程处理http请求流程)
- 蔡军生先生第二人生的源码分析(三十七)消息处理的完整流程
- 第二人生的源码分析(三十七)消息处理的完整流程
- Android系统源码阅读(10):Android 应用程序的消息处理机制
- freeDiameter源码阅读之 Extensions 的实现
- 小伙伴们的ceph源码分析三——monitor消息处理流程