一、EMQX项目简介
EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。
二、协议简介
MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。 MQTT官网: http://mqtt.org MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 这里我们一切从简,直接上图:
协议流程图(图一)
MQTT的控制包类型(MQTT Control Packet type)包含:CONNECT、CONNACK、PUBLISH、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK、PINGREQ、PINGRESP、DISCONNECT,基本上都在图中体现了,这些协议包在图中的流转基本上实现了MQTT定义的连接、断开、订阅、取消订阅、发布的功能。
三、代码阅读
本篇笔记仅仅初探EMQX(3.0版本),主要包含连接层与会话层代码分析、订阅/取消订阅过程分析、发布路由跳转,因为篇幅有限,通篇点到为止。
连接层与会话层
这里相关代码也比较多,这里主要关注emqx_connection.erl、emqx_protocol.erl、emqx_session.erl、emqx_broker.erl。 每一个设备连接进来都会有两个进程,一个是连接进程(conn),一个是会话进程(session),连接进程的ID我们成为ConnPid,会话进程的ID我们成为SPid。
- 连接进程:主要负责报文的收发、解析。
- 会话进程:处理 MQTT 协议发布订阅(Publish/Subscribe)业务交互流程,处理 Qos0/1/2 消息接收与下发,消息超时重传与离线消息保存,通过飞行窗口(Inflight Window),实现下发消息吞吐控制与顺序保证。
(一)流程图
下面是我根据代码画出的流程图:
连接、订阅、取消订阅、断连(图二)发布(图三)
图例说明:
Device 表示连接服务的Client端,Conn表示服务端连接进程,Session表示会话进程。粗实线表Client发送服务端的协议包;还表示服务中的同步处理。细虚线表示服务端发给Client端的协议包。粗虚线表示服务端的异步调用。
1、连接
设备与服务端建立长链接后,会派生出一个Conn进程来维护设备与服务端的通信。
客户端收到CONNECT协议包,校验完权限后会建立session,session进程建立成功后会与conn进程建立双向绑定,随后conn进程给设备返回CONNACK协议包。
我们看一下两个进程的state
conn进程
{status,<0.2190.0>,
{module,gen_server},
[[{incoming_bytes,166},
{{subscribe,<<"mqttbroker/xxx">>},{allow,1570872922716}},
{'$ancestors',[<0.1876.0>,<0.1875.0>,esockd_sup,<0.1555.0>]},
{force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
{acl_keys_q,{[{subscribe,<<"mqttbroker/xxx">>}],[]}},
{rand_seed,{#{bits => 58,jump => #Fun<rand.8.10897371>,
next => #Fun<rand.5.10897371>,type => exrop,
uniform => #Fun<rand.6.10897371>,
uniform_n => #Fun<rand.7.10897371>,weak_low_bits => 1},
[123527204234062397|227463409239782656]}},
{'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>,
peername => "127.0.0.1:55952",pid => <0.2190.0>}},
{guid,{1570872922688773,268564305021070,0}},
{'$initial_call',{emqx_connection,init,1}},
{acl_cache_size,1}],
running,<0.1876.0>,[],
[{header,"Status for generic server <0.2190.0>"},
{data,[{"Status",running},
{"Parent",<0.1876.0>},
{"Logged events",[]}]},
{data,[{"State",
#state{transport = esockd_transport,socket = #Port<0.29>,
peername = {{127,0,0,1},55952},
sockname = undefined,conn_state = running,active_n = 100,
proto_state = #pstate{zone = external,
sendfun = #Fun<emqx_connection.0.65258536>,
peername = {{127,0,0,1},55952},
peercert = nossl,proto_ver = 4,proto_name = <<"MQTT">>,
client_id = <<"mqttbroker/slw">>,is_assigned = false,
conn_pid = <0.2190.0>,conn_props = #{},
ack_props = undefined,username = <<"mqttbroker/slw">>,
session = <0.2192.0>,clean_start = true,topic_aliases = #{},
packet_size = 1048576,keepalive = 60,mountpoint = undefined,
is_super = false,is_bridge = false,
prod_key = <<"mqttbroker">>,dev_name = <<"slw">>,
enable_ban = true,enable_acl = true,
acl_deny_action = ignore,
recv_stats = #{msg => 0,pkt => 3},
send_stats = #{msg => 0,pkt => 3},
connected = true,
connected_at = {1570,872922,684761},
ignore_loop = false,
topic_alias_maximum = #{from_client => 0,to_client => 0}},
parser_state = {none,#{max_packet_size => 1048576,version => 4}},
gc_state = {emqx_gc,#{cnt => {1000,1000},
oct => {1048576,1048576}}},
keepalive = {keepalive,#Fun<emqx_connection.1.65258536>,164,
45,
{keepalive,check},
#Ref<0.3658081892.699924485.182073>,1},
enable_stats = true,stats_timer = undefined,
rate_limit = undefined,pub_limit = undefined,
limit_timer = undefined,idle_timeout = 15000}}]}]]}
session 进程
{status,<0.2192.0>,
{module,gen_server},
[[{'$ancestors',[emqx_session_sup,emqx_sm_sup,emqx_sup,
<0.1561.0>]},
{force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
{'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>}},
{guid,{1570872922717372,268564305021072,0}},
{'$initial_call',{emqx_session,init,1}}],
running,<0.1713.0>,[],
[{header,"Status for generic server <0.2192.0>"},
{data,[{"Status",running},
{"Parent",<0.1713.0>},
{"Logged events",[]}]},
{data,[{"State",
#state{idle_timeout = 15000,clean_start = true,
binding = local,client_id = <<"mqttbroker/slw">>,
username = <<"mqttbroker/slw">>,conn_pid = <0.2190.0>,
old_conn_pid = undefined,next_pkt_id = 1,
max_subscriptions = 0,
subscriptions = #{<<"mqttbroker/xxx">> =>
#{nl => 0,pktid => 1,qos => 2,rap => 0,rc => 0,rh => 0,
<<"r">> => <<"1561530245902">>,
<<"s">> => <<"EAB78B0E25D6C5A5EDCEEB78ABCB7B58">>}},
upgrade_qos = false,
inflight = {emqx_inflight,32,{0,nil}},
retry_interval = 20000,retry_timer = undefined,
mqueue = {mqueue,true,1000,0,0,none,infinity,
{queue,[],[],0}},
awaiting_rel = #{},max_awaiting_rel = 100,
await_rel_timeout = 300000,await_rel_timer = undefined,
expiry_interval = 0,expiry_timer = undefined,
enable_stats = true,stats_timer = undefined,
gc_state = {emqx_gc,#{cnt => {1000,1000},
oct => {1048576,1048576}}},
created_at = {1570,872922,687756},
will_msg = undefined,will_delay_timer = undefined}}]}]]}
conn进程中有session字段,session进程也有conn字段,可以保证他们相互找到对方,进行同步/异步调用。
2、订阅、取消订阅
具体的订阅步骤我放在后面的章节来讲,这里只看conn和session进程的逻辑,这样来看订阅与取消订阅差不多,我们以订阅为例。
conn进程收到SUBSCRIBE协议包后,校验权限后,调用 emqx_session:subscribe/4,实际是想session进程cast了消息 {subscribe, self(), SubReq},session进程处理完订阅逻辑后,给conn进程发送suback消息 From ! {deliver, {suback, PacketId, ReasonCodes}}。
conn进程收到session进程的deliver消息后给设备端发送SUBACK消息。
3、心跳
心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。
4、断开连接
图二中仅仅画出了主动断开逻辑,设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。
5、发布
图三中为方便理解,省掉了消息路由与分发的具体逻辑(后面章节来讲),并刻意画出了一对一发消息的流程,方便分析conn、session进程在其中扮演的角色。
- Qos0消息发布
- Qos0消息比较简单,从发送方来看,ConnA收到PUBLISH协议包,调用emqx_session:publish/3,执行结果调用emqx_protocol的puback/2,该函数Qos0不做处理。
- 从接收方来看,分发到订阅该topic的B的步骤是向SessionB发送dispatch消息 SubPid ! {dispatch, Topic, Msg}。SessionB进程handle dispatch ,执行函数是 emqx_session:dispatch/2 ,Qos0消息直接do_deliver ,给ConnB进程发送deliver消息 ConnPid ! {deliver, {publish, PacketId, Msg}},ConnB进程收到deliver消息,调用 emqx_ptotocol:deliver/2 ,将PUBLISH协议包发送给DeviceB。
- Qos1消息发布
- Qos1消息相比Qos0消息多了一个PUBACK。从发送方看,ConnA收到PUBLISH包,调用emqx_session:publish/3,调用emqx_protocol的puback/2,Qos1消息deliver PUBACK 消息,通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA。
- 从接收方来看,Qos1比Qos0多了一个inflight的操作。当SessionB收到 SubPid ! {dispatch, Topic, Msg}消息,在给ConnB deliver ConnPid ! {deliver, {publish, PacketId, Msg}} 消息的同时,执行emqx_inflight:insert/3操作。ConnB将PUBLISH发送给DeviceB,DeviceB会回应PUBACK消息,ConnB收到PUBACK消息的时候会执行emqx_session:puback/2,实际上就是向SessionB执行cast调用, gen_server:cast(SPid, {puback, PacketId, ReasonCode}),SessionB收到cast调用时,执行emqx_inflight:delete/2操作。
- inflight飞行窗口操作是下行消息确保可达和保证消息顺序的逻辑,Qos2也有此逻辑,但是稍微不同。
- Qos2消息发布
- Qos2消息相比Qos0多了三次交互。从发送方看,ConnA收到PUBLISH包,emqx_session:publish/3函数执行时,先向SessionA进行call调用,gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity),等待SessionA在state中将要发布的消息插入到awaiting_rel中,再执行消息发布,用执行结果调用emqx_protocol的puback/2,ConnA会根据Qos2消息给DeviceA发送PUBREC协议包。DevcieA收到PUBREC会回应PUBREL协议包。ConnA收到协议包会执行emqx_session:pubrel/3,它会同步调用SessionA gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity) ,SessionA会将State里面的awaiting_rel 之前记录的消息删除, ConnA得到执行结果后给DeviceA发送PUBCOMP包。
- 从接收方来看,Qos2的inflight操作略有不同。emqx_inflight:insert/3的操作时机相同,当SessionB分别收到ConnB的同步callgen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)与异步cast gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}) ,sessionB会分别操作emqx_inflight:update/3、emqx_inflight:delete/2。
- Qos2的inflight飞行窗口操作同样是下行消息确保可达和保证消息顺序的逻辑。
- Qos2在上行消息中比Qos1多了awaiting_rel的操作,是从发送方确保消息可达。
(二)Conn和Session进程的解读
本节仍然只关注进程间的消息流转。
1、Conn进程
先看连接层代码,主要看入口与出口 emqx_connection.erl
process_incoming(Data, State) ->
Oct = iolist_size(Data),
?LOG(debug, "RECV ~p", [Data]),
emqx_pd:update_counter(incoming_bytes, Oct),
emqx_metrics:trans(inc, 'bytes/received', Oct),
case handle_packet(Data, State) of
{noreply, State1} ->
State2 = maybe_gc({1, Oct}, State1),
{noreply, ensure_stats_timer(State2)};
Shutdown -> Shutdown
end.
......
%% Parse and handle packets
......
handle_packet(Data, State = #state{proto_state = ProtoState,
parser_state = ParserState,
idle_timeout = IdleTimeout}) ->
try emqx_frame:parse(Data, ParserState) of
{more, ParserState1} ->
{noreply, State#state{parser_state = ParserState1}, IdleTimeout};
{ok, Packet = ?PACKET(Type), Rest} ->
emqx_metrics:received(Packet),
(Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1}));
{error, Reason} ->
?LOG(warning, "Process packet error - ~p", [Reason]),
shutdown(Reason, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Error, ProtoState1} ->
stop(Error, State#state{proto_state = ProtoState1})
end;
{error, Reason} ->
?LOG(warning, "Parse frame error - ~p", [Reason]),
shutdown(Reason, State)
catch
_:Error ->
?LOG(warning, "Parse failed for ~p~nError data:~p", [Error, Data]),
shutdown(parse_error, State)
end.
emqx_protocol.erl (process_packet)
received(Packet = ?PACKET(Type), PState) ->
PState1 = set_protover(Packet, PState),
trace(recv, Packet),
try emqx_packet:validate(Packet) of
true ->
case preprocess_properties(Packet, PState1) of
{error, ReasonCode} ->
{error, ReasonCode, PState1};
{Packet1, PState2} ->
process_packet(Packet1, inc_stats(recv, Type, PState2))
end
catch
......
error : Reason ->
deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1),
{error, Reason, PState1}
end.
conn进程收到上行的数据后,执行handle_packet函数,emqx_frame:parse/2函数进行协议解析,将二进制数据转成term格式的协议包(CONNECT,CONNACK,SUBSCRIBE,SUBACK等等),emqx_protocol:received/2 校验协议包,并做预处理,emqx_protocol:proccess_packet/2函数根据接收到的协议包执行协议动作。我们记住这个函数:
emqx_protocol:process_packet/2,是处理设备端发来的协议包,并执行动作的重要函数。
emqx_connection.erl
handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:deliver(PubOrAck, ProtoState) of
{ok, ProtoState1} ->
State1 = State#state{proto_state = ProtoState1},
{noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))};
{error, Reason} ->
shutdown(Reason, State)
end;
emqx_prptocol.erl
deliver({connack, ReasonCode}, PState) ->
send(?CONNACK_PACKET(ReasonCode), PState);
......
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet),
case SendFun(Packet, #{version => Ver}) of
ok ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};
{error, Reason} ->
{error, Reason}
end.
从前面看下行消息有Session打过来的(比如ConnPid ! {deliver, {publish, PacketId, Msg}}),有Conn进程自己回应的(比如通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA)。这两个地方都最终会调用emqx_procotol:send/2函数。
2、Session
这里也只关注流程图上的逻辑,主要关注handle_info/2,handle_call/3,handle_cast/2。
- 来自Conn的订阅与取消订阅:gen_server:cast(SPid, {subscribe, self(), SubReq}),gen_server:cast(SPid, {unsubscribe, self(), UnsubReq})
- 来自路由投递的下行消息: SubPid ! {dispatch, Topic, Msg}
- 各种来自Conn的各种回应包处理:
- 来自左端的ConnA: gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity),gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity)
- 来自右端的ConnB: gen_server:cast(SPid, {puback, PacketId, ReasonCode}),gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity),gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode})
来自左端的消息对应awaiting_rel的处理,来自右端的消息对inflight的处理,在没有画出流程图前,这是我曾经很迷惑的地方,现在则一目了然。
至此设备到Conn进程与Session进程之间的消息流转已经讲清楚了。
协议流程图(图一),发布流程图QOS1和QOS2流程有误,publisher到broker和broker到subscriber是两条链路,所以broker不应该等到subscriber发送puback后才给publisher回复puback,而是接收到消息后就给publisher回复puback