PO Box简介
2016-06-13 15:00
127 查看
使用Erlang写程序的时候,经常会碰到一种情况:因为Erlang进程的mailbox是没有大小限制的,所以它会一直接受消息,直到Erlang节点内存溢出。在大多数情况下,我们可以通过限制消息生产者的频率来解决这个问题,而且也应该实现这一点。但是,有时候不太可能完全限制发给一个进程的所有消息,这时候,开发人员就需要通过丢弃消息来减轻负载。
PO Box就是一个可以减轻负载的工具。
这是Erlang现在实现的消息处理逻辑,所有的消息都是放在mailbox中的,当一条消息处理卡顿时,会导致后面的消息一直在排队,消耗内存。
PO Box实现了一个消息代理:
可以将一个PO Box进程看成是一个消息代理,它通过实现一个缓冲区,对消息进行缓存,并在缓冲区满了的时候丢弃消息。PO Box可以通知你有新的数据,这样你可以请求这些数据;或者你可以通知它直接把数据发给你,不用通知
更具体一些,就是PO Box是一个有owner进程的状态机,共有3种状态:
Active
Notify
Passive
Passive状态基本上除了接收消息保存在缓冲区和必要时丢弃消息,不做什么事。
用户可以通过调用PO Box的api来进入Notify状态。该状态的唯一任务是检查缓冲区内有没有消息。如果有,它会发一个
只有active状态可以将真实数据发给owner进程。用户同样可以通过调用PO Box的api告知它进入active状态。如果缓冲区内有消息,所有的消息会以一个list的状态发给owner进程。如果没有消息,PO Box会等待直到有消息。转发完消息后,PO Box会回到passive状态。
当考虑采取哪一种缓存类型时,要关注的地方是:
是否需要消息保持有序
是保留最新到达的消息,还是最先到达的消息
对时间上有没有要求?如果要求最低的时间延迟,选择stack。
当然,也可以自己开发自己想要的缓存类型。
将PO Box转入
消息发送的格式是:
转换成notify状态:
发送消息:
或者
一个进程可以有多个PO Box进程
可以看到,如果使用
会把所有消息都发送到owner进程的函数可以这样写
限制二进制消息大小的函数可以这样写:
丢掉空消息:
只读取一条消息:
PO Box就是一个可以减轻负载的工具。
设计原则
PO Box是一个实现了消息缓冲区的功能的库。因为Erlang进程需要同时接收消息并做自身的工作,所以很有可能因为消息过多,而使内存消耗急剧上升。这是Erlang现在实现的消息处理逻辑,所有的消息都是放在mailbox中的,当一条消息处理卡顿时,会导致后面的消息一直在排队,消耗内存。
messages | V +-----[Pid or Name]-----+ | | | | | | mailbox | | | +---------+ | | | | | receive | +-----------------------+
PO Box实现了一个消息代理:
messages | V +---------[Pid]---------+ +--------[POBox]--------+ | |<-- got mail ---| | | | | | | | mailbox | | | <important stuff> |--- send it! -->| +---------+ | | | | | | | |<---<messages>--|<---buffer | +-----------------------+ +-----------------------+
可以将一个PO Box进程看成是一个消息代理,它通过实现一个缓冲区,对消息进行缓存,并在缓冲区满了的时候丢弃消息。PO Box可以通知你有新的数据,这样你可以请求这些数据;或者你可以通知它直接把数据发给你,不用通知
更具体一些,就是PO Box是一个有owner进程的状态机,共有3种状态:
Active
Notify
Passive
Passive状态基本上除了接收消息保存在缓冲区和必要时丢弃消息,不做什么事。
用户可以通过调用PO Box的api来进入Notify状态。该状态的唯一任务是检查缓冲区内有没有消息。如果有,它会发一个
{mail, new_data}给owner进程。如果没有,PO Box会一直在notify状态等待,直到有新消息。在发送通知后,PO Box会返回passive状态。
只有active状态可以将真实数据发给owner进程。用户同样可以通过调用PO Box的api告知它进入active状态。如果缓冲区内有消息,所有的消息会以一个list的状态发给owner进程。如果没有消息,PO Box会等待直到有消息。转发完消息后,PO Box会回到passive状态。
,---->[passive]------(user makes active)----->[active] | | ^ | ^ | | | '---(sends message to user)--<-----' | | | (user makes notify) | | | | | | (user is notified) | | | | V | | '-----[notify]---------(user makes active)--------' | ^----------(user makes notify)<----------'
缓存类型
PO Box实现了消息的缓存机制,当前支持的缓存方式包括三种:queue、
stack和
keep_old queue。
queue按消息到达的顺序保存消息,当缓存满时,会丢弃最老的消息。例如,有6条消息,a,b,c,d,e,消息缓存的大小是3,最后会保留的消息是[c, d, e]。
keep_old queue也是一种
queue,不过当缓存满时,会阻止新的消息的到达。例如,有6条消息,a,b,c,d,e,缓存大小是3,最后会保留的消息是[a, b, c]。
stack并不能保证消息的顺序呢。当缓存满时,会丢弃栈顶的消息。对于前面两个例子,
stack最后保存的消息是[e, b, a]。
当考虑采取哪一种缓存类型时,要关注的地方是:
是否需要消息保持有序
是保留最新到达的消息,还是最先到达的消息
对时间上有没有要求?如果要求最低的时间延迟,选择stack。
当然,也可以自己开发自己想要的缓存类型。
使用用例
PO Box进程启动函数:start_link(OwnerPid, MaxSize, BufferType) start_link(OwnerPid, MaxSize, BufferType, InitialState) start_link(Name, OwnerPid, MaxSize, BufferType) start_link(Name, OwnerPid, MaxSize, BufferType, InitialState)
Name就是Po Box进程注册的名字
OwnerPid就是PO Box的owner进程的Pid。只有owner进程可以读取该PO Box进程的消息,也只有这个owner进程可以设置PO Box进程的state。
OwnerPid也可以是原子。两个进程之间会建立link关系,PO Box 不会trap exits。所以如果想要PO Box进程独立存活,应该手动取消link。
MaxSize就是缓冲区的大小
BufferType就是上面所说缓存类型
InitialState可以是
passive或者
notify。缺省是
notify。
将PO Box转入
active状态
pobox:active(BoxPid, FilterFun, FilterState)
FilterFun就是消息的读取过滤函数,返回值如下:
ok, Message, NewState这条消息会被发送到owner进程
{drop, NewState}这条消息会被丢弃
skip这条消息会被留在缓冲区,之前被遍历过的会被发送
消息发送的格式是:
{mail, BoxPid, Messages, MessageCount, MessageDropCount}
转换成notify状态:
pobox:notify(BoxPid)
发送消息:
pobox:post(BoxPid, Msg)
或者
BoxPid ! {post, Msg}
注意
FilterFun必须是轻量级,尤其是在处理消息到达速度非常快的时候。因为发送到PO Box进程的消息还是会先保存在Po Box自己的Mailbox中.
一个进程可以有多个PO Box进程
可以看到,如果使用
keep_old queue类型,一次处理一条消息,等价于拥有一个受限制的mailbox。
FilterFun/2
FilterFun/2的两个参数是message和state。会把所有消息都发送到owner进程的函数可以这样写
fun(Msg, _ ) -> {{ok, Msg}, nostate} end.
限制二进制消息大小的函数可以这样写:
fun(Msg, Allowed) -> case Allowed - byte_size(Msg) of N when N < 0 -> skip; N -> {{ok, Msg}, N} end end
丢掉空消息:
fun(<<>>, State) -> {drop, State}; (Msg, State) -> {{ok, Msg}, State} end.
只读取一条消息:
fun(Msg, 0) -> {{ok, Msg}, 1}; (_, _) -> skip end.
相关文章推荐
- 【iOS】UIButton 常用属性
- IT忍者神鬼之HTTP长连接和短连接
- ReflectASM,高性能的反射
- mongodb的find().pretty()方法的作用。
- 反射获取对象的属性名和对应的值并转为json字符串
- KVO 和 KVC 的使用和实现
- 第11周项目-阅读程序写结果1
- IOS Dev Intro - Object-C Call C C++
- Android 图片存储到指定路径和相册
- 你信吗?使用火狐和谷歌浏览器的员工业绩更好也更忠心
- input中blur失去焦点事件与点击事件冲突的解决方法
- linux中 epoll
- pos终端规范
- JdbcTemplate 版本4以上的queryforObject和queryforList的实现
- <meta http-equiv="X-UA-Compatible" content="IE=Edge">
- 5-7 12-24小时制 (15分)
- Oracle计算时间差函数
- HDU 4630 线段树+离线处理
- 链表面试题:判断链表是否相交(c语言)
- JS中Null与Undefined的区别