您的位置:首页 > 编程语言 > Qt开发

emqtt 4 (我要publish消息了)

2016-05-09 13:03 537 查看
这次,分析处理publish msg的流程。

由protocol开始

publish 类型的packet的处理是:

process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
%% ACL check
...
publish(Packet, State);
...
publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
#proto_state{client_id = ClientId, username = Username, session = Session}) ->
%% 处理packet 获得msg
Msg = emqttd_message:from_packet(Username, ClientId, Packet),
%% 调用emqttd_session module的publish/2 函数
%% subscribe的时候,也是由protocol 进入的emqttd_session module
emqttd_session:publish(Session, Msg);


1、ACL 检查

2、处理packet 获得msg

3、调用session module进行处理

emqttd_session 模块处理

和subscribe的处理流程类似,emqttd_session:publish/2 也只是一个接口函数,该函数要根据QoS的不同,决定是

自己调用后续函数完成处理

call session process 完成后续处理

%% @doc Publish message
-spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
%% publish qos0 directly
emqttd:publish(Msg);
publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
%% publish qos1 directly, and client will puback automatically
emqttd:publish(Msg);
publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% publish qos2 by session
gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).


直接处理

如果是自己调用后续函数完成处理的话,则继续调用emqttd:publish/2,则在emqttd module 中继续调用emqttd_server:publish/1:

%% @doc Publish a Message
-spec(publish(Msg :: mqtt_message()) -> any()).
publish(Msg = #mqtt_message{from = From}) ->
...
%% 处理topic
...
%% pulish
emqttd_pubsub:publish(Topic, Msg2),
...


还是subscribe处理套路:

emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub

在emqttd_pubsub module中的处理是:

%% @doc Publish message to Topic.
-spec(publish(binary(), any()) -> any()).
publish(Topic, Msg) ->
lists:foreach(
fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
?MODULE:dispatch(To, Msg);
(#mqtt_route{topic = To, node = Node}) ->
rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
end, emqttd_router:lookup(Topic)).

dispatch(Topic, Msg) ->
case subscribers(Topic) of
[] ->
dropped(Topic);
[SubPid] ->
SubPid ! {dispatch, Topic, Msg};
SubPids ->
lists:foreach(fun(SubPid) ->
SubPid ! {dispatch, Topic, Msg}
end, SubPids)
end.
%% @private
%% @doc Find all subscribers
subscribers(Topic) ->
case ets:member(subscriber, Topic) of
true -> %% faster then lookup?
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
false ->
[]
end.


至此,msg 就已经以{dispatch, Topic, Msg}的形式发送给 clientid 对应的session process了。

那么,就需要在emqttd_session module中的handle_info callback 函数处进行处理:

%% Dispatch Message
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
when is_record(Msg, mqtt_message) ->
dispatch(tune_qos(Topic, Msg, Subscriptions), Session);

%% Deliver qos0 message directly to client
dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg},
hibernate(Session);
dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case check_inflight(Session) of
true  ->
noreply(deliver(Msg, Session));
false ->
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.


继而,将信息发送给socket controlling process,然后根据QoS的不同,判断是否需要等待ack。

总结

(流程示意图待补)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: