RocketMQ 桥接插件配置文件: etc/plugins/emqx_bridge_rocket.conf。

    配置 RocketMQ 桥接规则

    1. ## ${topic}: the RocketMQ topics to which the messages will be published.
    2. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
    3. ## Client Connected Record Hook
    4. bridge.rocket.hook.client.connected.1 = {"topic": "ClientConnected"}
    5. ## Client Disconnected Record Hook
    6. bridge.rocket.hook.client.disconnected.1 = {"topic": "ClientDisconnected"}
    7. ## Session Subscribed Record Hook
    8. bridge.rocket.hook.session.subscribed.1 = {"filter": "#", "topic": "SessionSubscribed"}
    9. ## Session Unsubscribed Record Hook
    10. bridge.rocket.hook.session.unsubscribed.1 = {"filter": "#", "topic": "SessionUnsubscribed"}
    11. ## Message Publish Record Hook
    12. bridge.rocket.hook.message.publish.1 = {"filter": "#", "topic": "MessagePublish"}
    13. ## Message Delivered Record Hook
    14. bridge.rocket.hook.message.delivered.1 = {"filter": "#", "topic": "MessageDeliver"}
    15. ## Message Acked Record Hook
    16. bridge.rocket.hook.message.acked.1 = {"filter": "#", "topic": "MessageAcked"}

    RocketMQ 桥接规则说明

    1. topic = "ClientConnected",
    2. value = {
    3. "username": ${username},
    4. "node": ${node},
    5. "ts": ${ts}
    6. }

    设备下线 EMQ X 转发下线事件消息到 RocketMQ:

    1. topic = "ClientDisconnected",
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "reason": ${reason},
    6. "node": ${node},
    7. "ts": ${ts}
    8. }

    客户端订阅主题事件转发 RocketMQ

    客户端取消订阅主题事件转发 RocketMQ

    1. topic = "SessionUnsubscribed"
    2. value = {
    3. "client_id": ${clientid},
    4. "topic": ${topic},
    5. "qos": ${qos},
    6. "node": ${node},
    7. "ts": ${timestamp}
    8. }
    1. value = {
    2. "client_id": ${clientid},
    3. "topic": ${topic},
    4. "payload": ${payload},
    5. "qos": ${qos},
    6. "node": ${node},
    7. "ts": ${timestamp}
    8. }

    MQTT 消息派发 (Deliver) 事件转发 RocketMQ

    1. topic = "MessageDeliver"
    2. value = {
    3. "client_id": ${clientid},
    4. "username": ${username},
    5. "from": ${fromClientId},
    6. "topic": ${topic},
    7. "payload": ${payload},
    8. "qos": ${qos},
    9. "node": ${node},
    10. "ts": ${timestamp}
    11. }

    MQTT 消息确认 (Ack) 事件转发 RocketMQ

    RocketMQ 读取 MQTT 客户端上下线事件消息:

    1. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientConnected
    2. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientDisconnected
    1. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionSubscribed
    2. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionUnsubscribed

    RocketMQ 读取 MQTT 发布消息:

    1. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessagePublish

    RocketMQ 读取 MQTT 消息发布 (Deliver)、确认 (Ack) 事件:

    默认 payload 被 base64 编码,可通过修改配置 bridge.rocket.encode_payload_type 指定 payload 数据格式。

    启用 RocketMQ 桥接插件