源码阅读之初探EMQX

一、EMQX项目简介

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。

二、协议简介

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。 MQTT官网: http://mqtt.org MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 这里我们一切从简,直接上图:

协议流程图(图一)

MQTT的控制包类型(MQTT Control Packet type)包含:CONNECT、CONNACK、PUBLISH、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK、PINGREQ、PINGRESP、DISCONNECT,基本上都在图中体现了,这些协议包在图中的流转基本上实现了MQTT定义的连接、断开、订阅、取消订阅、发布的功能。

三、代码阅读

本篇笔记仅仅初探EMQX(3.0版本),主要包含连接层与会话层代码分析、订阅/取消订阅过程分析、发布路由跳转,因为篇幅有限,通篇点到为止。


连接层与会话层

这里相关代码也比较多,这里主要关注emqx_connection.erl、emqx_protocol.erl、emqx_session.erl、emqx_broker.erl。 每一个设备连接进来都会有两个进程,一个是连接进程(conn),一个是会话进程(session),连接进程的ID我们成为ConnPid,会话进程的ID我们成为SPid。

  • 连接进程:主要负责报文的收发、解析。
  • 会话进程:处理 MQTT 协议发布订阅(Publish/Subscribe)业务交互流程,处理 Qos0/1/2 消息接收与下发,消息超时重传与离线消息保存,通过飞行窗口(Inflight Window),实现下发消息吞吐控制与顺序保证。

(一)流程图

下面是我根据代码画出的流程图:

连接、订阅、取消订阅、断连(图二)发布(图三)

图例说明:

Device 表示连接服务的Client端,Conn表示服务端连接进程,Session表示会话进程。粗实线表Client发送服务端的协议包;还表示服务中的同步处理。细虚线表示服务端发给Client端的协议包。粗虚线表示服务端的异步调用。

1、连接

设备与服务端建立长链接后,会派生出一个Conn进程来维护设备与服务端的通信。

客户端收到CONNECT协议包,校验完权限后会建立session,session进程建立成功后会与conn进程建立双向绑定,随后conn进程给设备返回CONNACK协议包。

我们看一下两个进程的state

conn进程

{status,<0.2190.0>,
        {module,gen_server},
        [[{incoming_bytes,166},
          {{subscribe,<<"mqttbroker/xxx">>},{allow,1570872922716}},
          {'$ancestors',[<0.1876.0>,<0.1875.0>,esockd_sup,<0.1555.0>]},
          {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
          {acl_keys_q,{[{subscribe,<<"mqttbroker/xxx">>}],[]}},
          {rand_seed,{#{bits => 58,jump => #Fun<rand.8.10897371>,
                        next => #Fun<rand.5.10897371>,type => exrop,
                        uniform => #Fun<rand.6.10897371>,
                        uniform_n => #Fun<rand.7.10897371>,weak_low_bits => 1},
                      [123527204234062397|227463409239782656]}},
          {'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>,
                                 peername => "127.0.0.1:55952",pid => <0.2190.0>}},
          {guid,{1570872922688773,268564305021070,0}},
          {'$initial_call',{emqx_connection,init,1}},
          {acl_cache_size,1}],
         running,<0.1876.0>,[],
         [{header,"Status for generic server <0.2190.0>"},
          {data,[{"Status",running},
                 {"Parent",<0.1876.0>},
                 {"Logged events",[]}]},
          {data,[{"State",
                  #state{transport = esockd_transport,socket = #Port<0.29>,
                         peername = {{127,0,0,1},55952},
                         sockname = undefined,conn_state = running,active_n = 100,
                         proto_state = #pstate{zone = external,
                                               sendfun = #Fun<emqx_connection.0.65258536>,
                                               peername = {{127,0,0,1},55952},
                                               peercert = nossl,proto_ver = 4,proto_name = <<"MQTT">>,
                                               client_id = <<"mqttbroker/slw">>,is_assigned = false,
                                               conn_pid = <0.2190.0>,conn_props = #{},
                                               ack_props = undefined,username = <<"mqttbroker/slw">>,
                                               session = <0.2192.0>,clean_start = true,topic_aliases = #{},
                                               packet_size = 1048576,keepalive = 60,mountpoint = undefined,
                                               is_super = false,is_bridge = false,
                                               prod_key = <<"mqttbroker">>,dev_name = <<"slw">>,
                                               enable_ban = true,enable_acl = true,
                                               acl_deny_action = ignore,
                                               recv_stats = #{msg => 0,pkt => 3},
                                               send_stats = #{msg => 0,pkt => 3},
                                               connected = true,
                                               connected_at = {1570,872922,684761},
                                               ignore_loop = false,
                                               topic_alias_maximum = #{from_client => 0,to_client => 0}},
                         parser_state = {none,#{max_packet_size => 1048576,version => 4}},
                         gc_state = {emqx_gc,#{cnt => {1000,1000},
                                               oct => {1048576,1048576}}},
                         keepalive = {keepalive,#Fun<emqx_connection.1.65258536>,164,
                                                45,
                                                {keepalive,check},
                                                #Ref<0.3658081892.699924485.182073>,1},
                         enable_stats = true,stats_timer = undefined,
                         rate_limit = undefined,pub_limit = undefined,
                         limit_timer = undefined,idle_timeout = 15000}}]}]]}

session 进程

{status,<0.2192.0>,
        {module,gen_server},
        [[{'$ancestors',[emqx_session_sup,emqx_sm_sup,emqx_sup,
                         <0.1561.0>]},
          {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
          {'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>}},
          {guid,{1570872922717372,268564305021072,0}},
          {'$initial_call',{emqx_session,init,1}}],
         running,<0.1713.0>,[],
         [{header,"Status for generic server <0.2192.0>"},
          {data,[{"Status",running},
                 {"Parent",<0.1713.0>},
                 {"Logged events",[]}]},
          {data,[{"State",
                  #state{idle_timeout = 15000,clean_start = true,
                         binding = local,client_id = <<"mqttbroker/slw">>,
                         username = <<"mqttbroker/slw">>,conn_pid = <0.2190.0>,
                         old_conn_pid = undefined,next_pkt_id = 1,
                         max_subscriptions = 0,
                         subscriptions = #{<<"mqttbroker/xxx">> =>
                                               #{nl => 0,pktid => 1,qos => 2,rap => 0,rc => 0,rh => 0,
                                                 <<"r">> => <<"1561530245902">>,
                                                 <<"s">> => <<"EAB78B0E25D6C5A5EDCEEB78ABCB7B58">>}},
                         upgrade_qos = false,
                         inflight = {emqx_inflight,32,{0,nil}},
                         retry_interval = 20000,retry_timer = undefined,
                         mqueue = {mqueue,true,1000,0,0,none,infinity,
                                          {queue,[],[],0}},
                         awaiting_rel = #{},max_awaiting_rel = 100,
                         await_rel_timeout = 300000,await_rel_timer = undefined,
                         expiry_interval = 0,expiry_timer = undefined,
                         enable_stats = true,stats_timer = undefined,
                         gc_state = {emqx_gc,#{cnt => {1000,1000},
                                               oct => {1048576,1048576}}},
                         created_at = {1570,872922,687756},
                         will_msg = undefined,will_delay_timer = undefined}}]}]]}

conn进程中有session字段,session进程也有conn字段,可以保证他们相互找到对方,进行同步/异步调用。

2、订阅、取消订阅

具体的订阅步骤我放在后面的章节来讲,这里只看conn和session进程的逻辑,这样来看订阅与取消订阅差不多,我们以订阅为例。

conn进程收到SUBSCRIBE协议包后,校验权限后,调用 emqx_session:subscribe/4,实际是想session进程cast了消息 {subscribe, self(), SubReq},session进程处理完订阅逻辑后,给conn进程发送suback消息 From ! {deliver, {suback, PacketId, ReasonCodes}}

conn进程收到session进程的deliver消息后给设备端发送SUBACK消息。

3、心跳

心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。

4、断开连接

图二中仅仅画出了主动断开逻辑,设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。

5、发布

图三中为方便理解,省掉了消息路由与分发的具体逻辑(后面章节来讲),并刻意画出了一对一发消息的流程,方便分析conn、session进程在其中扮演的角色。

  • Qos0消息发布
  1. Qos0消息比较简单,从发送方来看,ConnA收到PUBLISH协议包,调用emqx_session:publish/3,执行结果调用emqx_protocol的puback/2,该函数Qos0不做处理。
  2. 从接收方来看,分发到订阅该topic的B的步骤是向SessionB发送dispatch消息 SubPid ! {dispatch, Topic, Msg}。SessionB进程handle dispatch ,执行函数是 emqx_session:dispatch/2 ,Qos0消息直接do_deliver ,给ConnB进程发送deliver消息 ConnPid ! {deliver, {publish, PacketId, Msg}},ConnB进程收到deliver消息,调用 emqx_ptotocol:deliver/2 ,将PUBLISH协议包发送给DeviceB。
  • Qos1消息发布
  1. Qos1消息相比Qos0消息多了一个PUBACK。从发送方看,ConnA收到PUBLISH包,调用emqx_session:publish/3,调用emqx_protocol的puback/2,Qos1消息deliver PUBACK 消息,通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA。
  2. 从接收方来看,Qos1比Qos0多了一个inflight的操作。当SessionB收到 SubPid ! {dispatch, Topic, Msg}消息,在给ConnB deliver ConnPid ! {deliver, {publish, PacketId, Msg}} 消息的同时,执行emqx_inflight:insert/3操作。ConnB将PUBLISH发送给DeviceB,DeviceB会回应PUBACK消息,ConnB收到PUBACK消息的时候会执行emqx_session:puback/2,实际上就是向SessionB执行cast调用, gen_server:cast(SPid, {puback, PacketId, ReasonCode}),SessionB收到cast调用时,执行emqx_inflight:delete/2操作。
  3. inflight飞行窗口操作是下行消息确保可达和保证消息顺序的逻辑,Qos2也有此逻辑,但是稍微不同。
  • Qos2消息发布
  1. Qos2消息相比Qos0多了三次交互。从发送方看,ConnA收到PUBLISH包,emqx_session:publish/3函数执行时,先向SessionA进行call调用,gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity),等待SessionA在state中将要发布的消息插入到awaiting_rel中,再执行消息发布,用执行结果调用emqx_protocol的puback/2,ConnA会根据Qos2消息给DeviceA发送PUBREC协议包。DevcieA收到PUBREC会回应PUBREL协议包。ConnA收到协议包会执行emqx_session:pubrel/3,它会同步调用SessionA gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity) ,SessionA会将State里面的awaiting_rel 之前记录的消息删除, ConnA得到执行结果后给DeviceA发送PUBCOMP包。
  2. 从接收方来看,Qos2的inflight操作略有不同。emqx_inflight:insert/3的操作时机相同,当SessionB分别收到ConnB的同步callgen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)与异步cast gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}) ,sessionB会分别操作emqx_inflight:update/3emqx_inflight:delete/2
  3. Qos2的inflight飞行窗口操作同样是下行消息确保可达和保证消息顺序的逻辑。
  4. Qos2在上行消息中比Qos1多了awaiting_rel的操作,是从发送方确保消息可达。

(二)Conn和Session进程的解读

本节仍然只关注进程间的消息流转。

1、Conn进程

先看连接层代码,主要看入口与出口 emqx_connection.erl

process_incoming(Data, State) ->
    Oct = iolist_size(Data),
    ?LOG(debug, "RECV ~p", [Data]),
    emqx_pd:update_counter(incoming_bytes, Oct),
    emqx_metrics:trans(inc, 'bytes/received', Oct),
    case handle_packet(Data, State) of
        {noreply, State1} ->
            State2 = maybe_gc({1, Oct}, State1),
            {noreply, ensure_stats_timer(State2)};
        Shutdown -> Shutdown
    end.
......
%% Parse and handle packets
......
handle_packet(Data, State = #state{proto_state  = ProtoState,
                                   parser_state = ParserState,
                                   idle_timeout = IdleTimeout}) ->
    try emqx_frame:parse(Data, ParserState) of
        {more, ParserState1} ->
            {noreply, State#state{parser_state = ParserState1}, IdleTimeout};
        {ok, Packet = ?PACKET(Type), Rest} ->
            emqx_metrics:received(Packet),
            (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
            case emqx_protocol:received(Packet, ProtoState) of
                {ok, ProtoState1} ->
                    handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1}));
                {error, Reason} ->
                    ?LOG(warning, "Process packet error - ~p", [Reason]),
                    shutdown(Reason, State);
                {error, Reason, ProtoState1} ->
                    shutdown(Reason, State#state{proto_state = ProtoState1});
                {stop, Error, ProtoState1} ->
                    stop(Error, State#state{proto_state = ProtoState1})
            end;
        {error, Reason} ->
            ?LOG(warning, "Parse frame error - ~p", [Reason]),
            shutdown(Reason, State)
    catch
        _:Error ->
            ?LOG(warning, "Parse failed for ~p~nError data:~p", [Error, Data]),
            shutdown(parse_error, State)
    end.

emqx_protocol.erl (process_packet)

received(Packet = ?PACKET(Type), PState) ->
    PState1 = set_protover(Packet, PState),
    trace(recv, Packet),
    try emqx_packet:validate(Packet) of
        true ->
            case preprocess_properties(Packet, PState1) of
                {error, ReasonCode} ->
                    {error, ReasonCode, PState1};
                {Packet1, PState2} ->
                    process_packet(Packet1, inc_stats(recv, Type, PState2))
            end
    catch
        ......
        error : Reason ->
            deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1),
            {error, Reason, PState1}
    end.

conn进程收到上行的数据后,执行handle_packet函数,emqx_frame:parse/2函数进行协议解析,将二进制数据转成term格式的协议包(CONNECT,CONNACK,SUBSCRIBE,SUBACK等等),emqx_protocol:received/2 校验协议包,并做预处理,emqx_protocol:proccess_packet/2函数根据接收到的协议包执行协议动作。我们记住这个函数:

emqx_protocol:process_packet/2,是处理设备端发来的协议包,并执行动作的重要函数。

emqx_connection.erl

handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
    case emqx_protocol:deliver(PubOrAck, ProtoState) of
        {ok, ProtoState1} ->
            State1 = State#state{proto_state = ProtoState1},
            {noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))};
        {error, Reason} ->
            shutdown(Reason, State)
    end;

emqx_prptocol.erl

deliver({connack, ReasonCode}, PState) ->
    send(?CONNACK_PACKET(ReasonCode), PState);
......
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
    trace(send, Packet),
    case SendFun(Packet, #{version => Ver}) of
        ok ->
            emqx_metrics:sent(Packet),
            {ok, inc_stats(send, Type, PState)};
        {error, Reason} ->
            {error, Reason}
    end.

从前面看下行消息有Session打过来的(比如ConnPid ! {deliver, {publish, PacketId, Msg}}),有Conn进程自己回应的(比如通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA)。这两个地方都最终会调用emqx_procotol:send/2函数。

2、Session

这里也只关注流程图上的逻辑,主要关注handle_info/2handle_call/3handle_cast/2

  • 来自Conn的订阅与取消订阅:gen_server:cast(SPid, {subscribe, self(), SubReq})gen_server:cast(SPid, {unsubscribe, self(), UnsubReq})
  • 来自路由投递的下行消息: SubPid ! {dispatch, Topic, Msg}
  • 各种来自Conn的各种回应包处理:
  • 来自左端的ConnA: gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity)gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity)
  • 来自右端的ConnB: gen_server:cast(SPid, {puback, PacketId, ReasonCode})gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode})

来自左端的消息对应awaiting_rel的处理,来自右端的消息对inflight的处理,在没有画出流程图前,这是我曾经很迷惑的地方,现在则一目了然。

至此设备到Conn进程与Session进程之间的消息流转已经讲清楚了。

评论

  1. 小甜甜
    3年前
    2021-5-25 10:53:33

    协议流程图(图一),发布流程图QOS1和QOS2流程有误,publisher到broker和broker到subscriber是两条链路,所以broker不应该等到subscriber发送puback后才给publisher回复puback,而是接收到消息后就给publisher回复puback

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇