emqtt 4 (我要publish消息了)
2016-05-09 13:03
537 查看
这次,分析处理publish msg的流程。
1、ACL 检查
2、处理packet 获得msg
3、调用session module进行处理
自己调用后续函数完成处理
call session process 完成后续处理
还是subscribe处理套路:
emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub
在emqttd_pubsub module中的处理是:
至此,msg 就已经以{dispatch, Topic, Msg}的形式发送给 clientid 对应的session process了。
那么,就需要在emqttd_session module中的handle_info callback 函数处进行处理:
继而,将信息发送给socket controlling process,然后根据QoS的不同,判断是否需要等待ack。
由protocol开始
publish 类型的packet的处理是:process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> %% ACL check ... publish(Packet, State); ... publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), #proto_state{client_id = ClientId, username = Username, session = Session}) -> %% 处理packet 获得msg Msg = emqttd_message:from_packet(Username, ClientId, Packet), %% 调用emqttd_session module的publish/2 函数 %% subscribe的时候,也是由protocol 进入的emqttd_session module emqttd_session:publish(Session, Msg);
1、ACL 检查
2、处理packet 获得msg
3、调用session module进行处理
emqttd_session 模块处理
和subscribe的处理流程类似,emqttd_session:publish/2 也只是一个接口函数,该函数要根据QoS的不同,决定是自己调用后续函数完成处理
call session process 完成后续处理
%% @doc Publish message -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}). publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> %% publish qos0 directly emqttd:publish(Msg); publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> %% publish qos1 directly, and client will puback automatically emqttd:publish(Msg); publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> %% publish qos2 by session gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
直接处理
如果是自己调用后续函数完成处理的话,则继续调用emqttd:publish/2,则在emqttd module 中继续调用emqttd_server:publish/1:%% @doc Publish a Message -spec(publish(Msg :: mqtt_message()) -> any()). publish(Msg = #mqtt_message{from = From}) -> ... %% 处理topic ... %% pulish emqttd_pubsub:publish(Topic, Msg2), ...
还是subscribe处理套路:
emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub
在emqttd_pubsub module中的处理是:
%% @doc Publish message to Topic. -spec(publish(binary(), any()) -> any()). publish(Topic, Msg) -> lists:foreach( fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> ?MODULE:dispatch(To, Msg); (#mqtt_route{topic = To, node = Node}) -> rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) end, emqttd_router:lookup(Topic)). dispatch(Topic, Msg) -> case subscribers(Topic) of [] -> dropped(Topic); [SubPid] -> SubPid ! {dispatch, Topic, Msg}; SubPids -> lists:foreach(fun(SubPid) -> SubPid ! {dispatch, Topic, Msg} end, SubPids) end. %% @private %% @doc Find all subscribers subscribers(Topic) -> case ets:member(subscriber, Topic) of true -> %% faster then lookup? try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end; false -> [] end.
至此,msg 就已经以{dispatch, Topic, Msg}的形式发送给 clientid 对应的session process了。
那么,就需要在emqttd_session module中的handle_info callback 函数处进行处理:
%% Dispatch Message handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) when is_record(Msg, mqtt_message) -> dispatch(tune_qos(Topic, Msg, Subscriptions), Session); %% Deliver qos0 message directly to client dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, hibernate(Session); dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case check_inflight(Session) of true -> noreply(deliver(Msg, Session)); false -> hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end.
继而,将信息发送给socket controlling process,然后根据QoS的不同,判断是否需要等待ack。
总结
(流程示意图待补)相关文章推荐
- emqtt 2 (我要连服务器)
- emqtt 3 (我要subscribe 这个topic)
- QT5.4,VS2010
- Qt 5.7 亮瞎眼的更新
- Qt Model/View( 一)
- QT ini配置文件的读写(使用QSettings类)
- Qt路径问题
- Qt 编译时遇到 error: [debug/qrc_music.cpp] Error 1
- Qt PropertyAnimation动画残影
- Qt5+opencv程序打包发布
- 专访安晓辉:Qt是最佳的跨平台解决方案
- Qt5解析json文件
- 如何用Qt Designer快速设计产品的高保真原型
- Qt信号和槽
- QT串口模拟-基本界面
- Qt和Qml交互,及多线程
- 嵌入式开发中qt环境的搭建
- 第一个Qt程序
- 在Qt中使用C++代码创建界面
- QT 学习之路