RocketMQ 桥接插件配置文件: etc/plugins/emqx_bridge_rocket.conf。
配置 RocketMQ 桥接规则
## ${topic}: the RocketMQ topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
## Client Connected Record Hook
bridge.rocket.hook.client.connected.1 = {"topic": "ClientConnected"}
## Client Disconnected Record Hook
bridge.rocket.hook.client.disconnected.1 = {"topic": "ClientDisconnected"}
## Session Subscribed Record Hook
bridge.rocket.hook.session.subscribed.1 = {"filter": "#", "topic": "SessionSubscribed"}
## Session Unsubscribed Record Hook
bridge.rocket.hook.session.unsubscribed.1 = {"filter": "#", "topic": "SessionUnsubscribed"}
## Message Publish Record Hook
bridge.rocket.hook.message.publish.1 = {"filter": "#", "topic": "MessagePublish"}
## Message Delivered Record Hook
bridge.rocket.hook.message.delivered.1 = {"filter": "#", "topic": "MessageDeliver"}
## Message Acked Record Hook
bridge.rocket.hook.message.acked.1 = {"filter": "#", "topic": "MessageAcked"}
RocketMQ 桥接规则说明
topic = "ClientConnected",
value = {
"username": ${username},
"node": ${node},
"ts": ${ts}
}
设备下线 EMQ X 转发下线事件消息到 RocketMQ:
topic = "ClientDisconnected",
value = {
"client_id": ${clientid},
"username": ${username},
"reason": ${reason},
"node": ${node},
"ts": ${ts}
}
客户端订阅主题事件转发 RocketMQ
客户端取消订阅主题事件转发 RocketMQ
topic = "SessionUnsubscribed"
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
value = {
"client_id": ${clientid},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
MQTT 消息派发 (Deliver) 事件转发 RocketMQ
topic = "MessageDeliver"
value = {
"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
MQTT 消息确认 (Ack) 事件转发 RocketMQ
RocketMQ 读取 MQTT 客户端上下线事件消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientConnected
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientDisconnected
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionSubscribed
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionUnsubscribed
RocketMQ 读取 MQTT 发布消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessagePublish
RocketMQ 读取 MQTT 消息发布 (Deliver)、确认 (Ack) 事件:
默认 payload 被 base64 编码,可通过修改配置 bridge.rocket.encode_payload_type 指定 payload 数据格式。