钩子

    简单来讲,该机制目的在于增强软件系统的扩展性、方便与其他三方系统的集成、或者改变其系统原有的默认行为。如:

    当系统中不存在 钩子 (Hooks) 机制时,整个事件处理流程 从 事件 (Event) 的输入,到 处理 (Handler),再到完成后的返回 结果 (Result) 对于系统外部而讲,都是不可见、且无法修改的。

    而在这个过程中加入一个可挂载函数的点 (HookPoint),允许外部插件挂载多个回调函数,形成一个调用链。达到对内部事件处理过程的扩展和修改。

    系统中常用到的认证插件则是按照该逻辑进行实现的。以最简单的 为例:

    在只开启 认证插件,且关闭匿名用户登录时。按照上图对事件的处理逻辑可知,此时认证模块的逻辑为:

    1. 收到用户认证请求 (Authenticate)
    2. 读取 是否允许匿名登录 参数,得到 拒绝登录
    3. 执行 认证事件的钩子,即回调到 emqx_auth_username 插件中,假设其认为此次登录合法,得到 允许登录
    4. 返回 认证成功,成功接入系统

    即,如下图所示:

    因此,在 EMQ X 中,钩子 (Hooks) 这种机制极大地方便了系统的扩展。我们不需要修改 emqx 核心代码,仅需要在特定的位置埋下 挂载点 (HookPoint) ,便能允许外部插件扩展 EMQ X 的各种行为。

    对于实现者来说仅需要关注:

    1. 挂载点 (HookPoint) 的位置:包括其作用、执行的时机、和如何挂载和取消挂载。
    2. 了解回调函数在 上执行的机制:包括回调函数执行的顺序,及如何提前终止链的执行。

    单个 挂载点 上可能会存在多个插件都需要关心该事件并执行相应操作,所以每个 挂载点 上都可能会存在多个回调函数。

    我们称这种由多个回调函数顺序执行所构成的链为 回调链 (Callback Functions Chain)

    回调链 目前按照 的理念进行实现。为了满足钩子的功能和使用的灵活性,它必须具有以下属性:

    • 回调链 上的回调函数必须按某种先后顺序执行。
    • 回调链 一定会存在一个输入、和输出 (在通知类事件输出则是非必须的,例如 “某客户端已成功登陆”)。
    • 回调链 具有传递性,意思是指,链会将输入给链的入参输入给第一个回调函数,第一个回调函数的返回值会传递给第二个回调函数,直到最后一个函数,最后一个函数的返回值则为整个链的返回值。
    • 回调链 需要允许其上面的函数 提前终止链忽略本次操作

      • 提前终止:本函数执行完成后,直接终止链的执行。忽略链上后续所有的回调函数。例如:某认证插件认为,此类客户端允许登录后便不需要再检查其他认证插件,所以需要提前终止。
      • 忽略本次操作:不修改链上的处理结果,直接透传给下一个回调函数。例如:存在多个认证插件的情况下,某认证插件认为,此类客户端不属于其认证范围,所以我不需要修改认证结果,应当忽略本次操作,直接将前一个函数的返回值传递给链上的下一个函数。

    由此,我们可以得到一个链的设计简图:

    Callback Functions Chain Design

    该图的含义是指:

    1. 链的入参为只读的 Args 与用于链上的函数修改的参数 Acc
    2. 链无论以何种方式终止执行,其返回值均为新的 Acc
    3. 图中链上一共注册了三个回调函数;分别为 Fun1 Fun2 Fun3 并按所表示的顺序执行
    4. 回调函数通过返回:
      • ok:忽略本次操作,以只读的 Args 和上个函数返回的 Acc 继续链的执行
      • {ok, NewAcc}:执行了某些操作,修改了 Acc 内容,以只读的 Args 和新的 NewAcc 继续链的执行
    5. 回调函数也可通过返回:
      • stop:表示终止链的传递,立即返回上个函数的结果 Acc
      • {stop, NewAcc}:表示终止链的传递,立即返回本次修改的结果 NewAcc

    以上为回调链的主要设计理念,它规范了钩子上的回调函数的执行逻辑。

    接下来 挂载点, 两节中,对于钩子的所有操作都是依赖于 emqx 提供的 Erlang 代码级的 API。他们是整个钩子逻辑实现的基础。如需寻求:

    • 会话被移除 是指:当客户端以 清除会话 的方式登入时,如果服务端中已存在该客户端的会话,那么旧的会话就会被丢弃。

    • 会话被接管 是指:当客户端以 保留会话 的方式登入时,如果服务端中已存在该客户端的会话,那么旧的会话就会被新的连接所接管。

    EMQ X 提供了 API 进行钩子的挂载与取消挂载的操作。

    挂载

    挂载完成后,回调函数会按优先级从大到小执行,同一优先级按挂载的先后顺序执行。所有官方插件挂载的钩子优先级都为 0

    取消挂载

    回调函数的入参及返回值要求,见下表:

    (参数数据结构参见:)

    名称入参返回
    client.connectConnInfo:客户端连接层参数
    :MQTT v5.0 连接报文的 Properties 属性
    新的 Props
    client.connackConnInfo:客户端连接层参数
    Rc:返回码
    Props: MQTT v5.0 连接应答报文的 Properties 属性
    新的 Props
    client.connectedClientInfo: 客户端信息参数
    ConnInfo: 客户端连接层参数
    -
    client.disconnectedClientInfo:客户端信息参数
    ConnInfo:客户端连接层参数
    ReasonCode:错误码
    -
    client.authenticateClientInfo:客户端信息参数
    AuthResult:认证结果
    新的 AuthResult
    client.check_aclClientInfo:客户端信息参数
    Topic:发布/订阅的主题
    PubSub: 发布或订阅
    ACLResult:鉴权结果
    新的 ACLResult
    client.subscribeClientInfo:客户端信息参数
    Props:MQTT v5.0 订阅报文的 Properties 参数
    TopicFilters:需订阅的主题列表
    新的 TopicFilters
    client.unsubscribeClientInfo:客户端信息参数
    Props:MQTT v5.0 取消订阅报文的 Properties 参数
    TopicFilters:需取消订阅的主题列表
    新的 TopicFilters
    session.created:客户端信息参数
    SessInfo:会话信息
    -
    session.subscribedClientInfo:客户端信息参数
    Topic:订阅的主题
    SubOpts:订阅操作的配置选项
    -
    session.unsubscribedClientInfo:客户端信息参数
    Topic:取消订阅的主题
    SubOpts:取消订阅操作的配置选项
    -
    session.resumedClientInfo:客户端信息参数
    SessInfo:会话信息
    -
    session.discardedClientInfo:客户端信息参数
    SessInfo:会话信息
    -
    session.takeoveredClientInfo:客户端信息参数
    SessInfo:会话信息
    session.terminatedClientInfo:客户端信息参数
    Reason:终止原因
    SessInfo:会话信息
    -
    message.publishMessage:消息对象新的 Message
    message.deliveredClientInfo:客户端信息参数
    Message:消息对象
    新的 Message
    message.ackedClientInfo:客户端信息参数
    Message:消息对象
    -
    message.droppedMessage:消息对象
    By:被谁丢弃
    Reason:丢弃原因
    -

    具体对于这些钩子的应用,参见:emqx_plugin_template