emqtt 3 (我要subscribe 这个topic)
2016-05-09 13:02
537 查看
这一次,主要分析client subscribe 某个topic 的处理流程。
subscribe 类型的packet的处理是:
1、过滤掉topictable 为空的情况
2、组装必要的client 信息,完成ACL检查
3、获取clientid 对应的session pid,并调用emqttd_session:subscribe/3 函数
接口函数的定义如上。
handle_cast callback 的实现如下:
emqttd_server 也是由pool 组织的gen_server进程,主要作用是subscription 的增删改查,subscription 信息是保存在 subscription mnesia table 中的,subscription mnesia table的字段信息如下:
其中,subid 即为subscriber id,也就是clientid,topic 即为topic的名称。
而,update subscription 的逻辑:
因为 subscription mnesia table的类型为bag,也就是一个clientid 可能会和多个topic 相对应,所以,不能依据key 进行delete,必须使用delete_object的方式。
首先,create subscribe的入口函数在emqttd module中,
在此调用emqttd_server:subscribe/3 函数,并请求emqttd_server 进程,emqttd_server 进程调用handle_call callback 函数,处理请求。
L26处,用了subscribed ets table,记录session pid subscribe 的所有topic,这样在 session pid DOWN的时候,就可以移除所有的topic 中session pid 相关的信息了。
而,emqttd_pubsub 同样是由pool 组织的gen_server 进程。
至此,subscribe 操作的处理逻辑就ok了。
由protocol开始
是的,还是要从protocol开始,至于为什么,之前就说过了。subscribe 类型的packet的处理是:
%% 直接过滤掉topic 为空的情况 process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> %% 组装client 信息 Client = client(State), %% 检查ACL ... %% session 为clientid 对应的session pid %% TopicTable 为 [{TopicName, QoS}] emqttd_session:subscribe(Session, PacketId, TopicTable) ... ;
1、过滤掉topictable 为空的情况
2、组装必要的client 信息,完成ACL检查
3、获取clientid 对应的session pid,并调用emqttd_session:subscribe/3 函数
emqttd_session 模块处理
emqttd_session:subscribe/3 只是一个接口函数,实际的处理逻辑是在emqttd_session 模块的handle_cast callback 中实现。-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok). subscribe(SessPid, PacketId, TopicTable) -> From = self(), %%这里的self 是client process id AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
接口函数的定义如上。
handle_cast callback 的实现如下:
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, %% subscription 是dict subscriptions = Subscriptions}) -> %% rewrite topic name 对topic name 做一些处理 Subscriptions1 = lists:foldl( fun({Topic, Qos}, SubDict) -> case dict:find(Topic, SubDict) of {ok, Qos} -> %% 已经存在,并且QoS 未更新,所以什么都不需要做 SubDict; {ok, OldQos} -> %% 已经存在,但是QoS 更新,所以,需要更新一下 emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), dict:store(Topic, Qos, SubDict); error -> %% 不存在,直接添加 emqttd:subscribe(ClientId, Topic, Qos), dict:store(Topic, Qos, SubDict) end end, Subscriptions, TopicTable),
更新subscribe
更新subscribe,也就是调用emqttd_server:update_subscription/4 。emqttd_server 也是由pool 组织的gen_server进程,主要作用是subscription 的增删改查,subscription 信息是保存在 subscription mnesia table 中的,subscription mnesia table的字段信息如下:
-record(mqtt_subscription, {subid :: binary() | atom(), topic :: binary(), qos = 0 :: 0 | 1 | 2 }).
其中,subid 即为subscriber id,也就是clientid,topic 即为topic的名称。
而,update subscription 的逻辑:
%% 外部接口 update_subscription(ClientId, Topic, OldQos, NewQos) -> call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}). handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> if_subsciption(State, fun() -> OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, %% 使用事物 mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), set_subscription_stats() end), ok(State); update_subscription_(OldSub, NewSub) -> %% 删除旧的 subscription mnesia:delete_object(subscription, OldSub, write), %% 写入新的 subscription mnesia:write(subscription, NewSub, write).
因为 subscription mnesia table的类型为bag,也就是一个clientid 可能会和多个topic 相对应,所以,不能依据key 进行delete,必须使用delete_object的方式。
创建subscribe
create subscribe的处理略微有些绕,不知道是作者有意而为之,还是其他什么原因。首先,create subscribe的入口函数在emqttd module中,
-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}). subscribe(ClientId, Topic, Qos) -> emqttd_server:subscribe(ClientId, Topic, Qos).
在此调用emqttd_server:subscribe/3 函数,并请求emqttd_server 进程,emqttd_server 进程调用handle_call callback 函数,处理请求。
%% 外部接口 -spec(subscribe(binary(), binary(), mqtt_qos()) -> ok). subscribe(ClientId, Topic, Qos) -> %% 这里的self 是emqttd_session 进程,这个调用是在emqttd_session %% module 中的 handle_cast callback 发起的 From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> %% call pubsub process pubsub_subscribe_(SubPid, Topic), if_subsciption(State, fun() -> %% 将subscription 信息写入到 subscription mnesia table 中 add_subscription_(ClientId, Topic, Qos), set_subscription_stats() end), %% monitor session pid,当起DOWN 之后,去掉subscribe 并移除相关信息 ok(monitor_subscriber_(ClientId, SubPid, State)); %% @private %% @doc Call pubsub to subscribe pubsub_subscribe_(SubPid, Topic) -> case ets:match(subscribed, {SubPid, Topic}) of [] -> emqttd_pubsub:async_subscribe(Topic, SubPid), ets:insert(subscribed, {SubPid, Topic}); [_] -> false end.
L26处,用了subscribed ets table,记录session pid subscribe 的所有topic,这样在 session pid DOWN的时候,就可以移除所有的topic 中session pid 相关的信息了。
而,emqttd_pubsub 同样是由pool 组织的gen_server 进程。
%% 外部接口,发起请求 -spec(async_subscribe(binary(), pid()) -> ok). async_subscribe(Topic, SubPid) when is_binary(Topic) -> cast(pick(Topic), {subscribe, Topic, SubPid}). handle_cast({subscribe, Topic, SubPid}, State) -> %% 实际的处理函数 add_subscriber_(Topic, SubPid), {noreply, setstats(State)}; add_subscriber_(Topic, SubPid) -> %% 检查该Topic 是否已经存在 %% 若不存在,则先增加{Topic,Node}信息,为多node 场景服务 ... ets:insert(subscriber, {Topic, SubPid}). %% 这里的subscriber 是一张ets table,接下来的publish 主要就是用的这张表 ********
至此,subscribe 操作的处理逻辑就ok了。
总结
应该也不需要,只是这代码贴的有点多了。(示意图待补)相关文章推荐
- QT5.4,VS2010
- Qt 5.7 亮瞎眼的更新
- Qt Model/View( 一)
- QT ini配置文件的读写(使用QSettings类)
- Qt路径问题
- Qt 编译时遇到 error: [debug/qrc_music.cpp] Error 1
- Qt PropertyAnimation动画残影
- Qt5+opencv程序打包发布
- 专访安晓辉:Qt是最佳的跨平台解决方案
- Qt5解析json文件
- 如何用Qt Designer快速设计产品的高保真原型
- Qt信号和槽
- QT串口模拟-基本界面
- Qt和Qml交互,及多线程
- 嵌入式开发中qt环境的搭建
- 第一个Qt程序
- 在Qt中使用C++代码创建界面
- QT 学习之路
- Ubuntu 下安装 QT 开发环境
- Ubuntu系统使用Qt Designer设计界面