支持配置多台 Redis 服务器连接池:

    配置 Redis 存储规则

    1. backend.redis.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
    2. backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
    3. backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
    4. backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
    5. backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
    6. backend.redis.hook.session.unsubscribed.1= {"topic": "#", "action": {"commands": ["DEL mqtt:acked:${clientid}:${topic}"]}, "pool": "pool1"}
    7. backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "expired_time" : 3600, "pool": "pool1"}
    8. backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "expired_time" : 3600, "pool": "pool1"}
    9. backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
    10. backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
    11. backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
    12. ## backend.redis.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_keep_latest"}, "pool": "pool1"}
    13. ## backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_store_keep_latest"}, "expired_time" : 3600, "pool": "pool1"}
    14. ## backend.redis.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_keep_latest"}, "pool": "pool1"}

    Redis 存储规则说明

    Redis 命令行参数说明

    hook可用参数示例(每个字段分隔,必须是一个空格)
    client.connectedclientidSET conn:${clientid} ${clientid}
    client.disconnectedclientidSET disconn:${clientid} ${clientid}
    session.subscribedclientid, topic, qosHSET sub:${clientid} ${topic} ${qos}
    session.unsubscribedclientid, topicSET unsub:${clientid} ${topic}
    message.publishmessage, msgid, topic, payload, qos, clientidRPUSH pub:${topic} ${msgid}
    message.ackedmsgid, topic, clientidHSET ack:${clientid} ${topic} ${msgid}
    message.delivermsgid, topic, clientidHSET deliver:${clientid} ${topic} ${msgid}

    Redis 存储支持用户采用 Redis Commands 语句配置 Action,例如:

    1. ## 在客户端连接到 EMQ X 服务器后,执行一条 redis

    Redis 设备在线状态 Hash

    mqtt:client Hash 存储设备在线状态:

    1. hmset
    2. key = mqtt:client:${clientid}
    3. value = {state:int, online_at:timestamp, offline_at:timestamp}
    4. hset
    5. key = mqtt:node:${node}
    6. field = ${clientid}
    7. value = ${ts}

    查询设备在线状态:

    例如 ClientId 为 test 客户端上线:

    1. HGETALL mqtt:client:test
    2. 1) "state"
    3. 2) "1"
    4. 3) "online_at"
    5. 4) "1481685802"
    6. 5) "offline_at"
    7. 6) "undefined"

    例如 ClientId 为 test 客户端下线:

    Redis 保留消息 Hash

    1. hmset
    2. key = mqtt:retain:${topic}
    3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

    查询 retain 消息:

    1. HGETALL "mqtt:retain:${topic}"

    例如查看 topic 为 topic 的 retain 消息:

    1. HGETALL mqtt:retain:topic
    2. 1) "id"
    3. 2) "6P9NLcJ65VXBbC22sYb4"
    4. 3) "from"
    5. 4) "test"
    6. 5) "qos"
    7. 6) "1"
    8. 7) "topic"
    9. 8) "topic"
    10. 10) "true"
    11. 11) "payload"
    12. 12) "Hello world!"
    13. 13) "ts"

    Redis 消息存储 Hash

    mqtt:msg Hash 存储 MQTT 消息:

    1. hmset
    2. key = mqtt:msg:${msgid}
    3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
    4. zadd
    5. key = mqtt:msg:${topic}
    6. field = 1
    7. value = ${msgid}

    mqtt:acked SET 存储客户端消息确认:

    1. set
    2. key = mqtt:acked:${clientid}:${topic}
    3. value = ${msgid}

    Redis 订阅存储 Hash

    mqtt:sub Hash 存储订阅关系:

    某个客户端订阅主题:

    1. HSET mqtt:sub:${clientid} ${topic} ${qos}
    1. HSET "mqtt:sub:test" "topic1" 1
    2. HSET "mqtt:sub:test" "topic2" 2

    查询 ClientId 为 test 的客户端已订阅主题:

    1. HGETALL mqtt:sub:test
    2. 1) "topic1"
    3. 2) "1"
    4. 3) "topic2"
    5. 4) "2"

    Redis SUB/UNSUB 事件发布

    设备需要订阅/取消订阅主题时,业务服务器向 Redis 发布事件消息:

    1. PUBLISH
    2. channel = "mqtt_channel"
    3. message = {type: string , topic: string, clientid: string, qos: int}
    4. \*type: [subscribe/unsubscribe]

    例如 ClientId 为 test 客户端订阅主题 topic0 :

    1. PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"

    例如 ClientId 为 test 客户端取消订阅主题:

    Tip

    Redis Cluster 无法使用 Redis PUB/SUB 功能。

    启用 Redis 数据存储插件