基本语法举例

    • 从 topic 为 “t/a” 的消息中提取所有字段:
    • 从 topic 为 “t/a” 或 “t/b” 的消息中提取所有字段:
    • 从 topic 能够匹配到 ‘t/#’ 的消息中提取所有字段。
    1. SELECT * FROM "t/#"
    • 从 topic 能够匹配到 ‘t/#’ 的消息中提取 qos, username 和 clientid 字段:
    1. SELECT qos, username, clientid FROM "t/#"
    • 从任意 topic 的消息中提取 username 字段,并且筛选条件为 username = ‘Steven’:
    1. SELECT username FROM "#" WHERE username='Steven'
    • 从任意 topic 的 JSON 消息体(payload) 中提取 x 字段,并创建别名 x 以便在 WHERE 子句中使用。WHERE 子句限定条件为 x = 1。下面这个 SQL 语句可以匹配到消息体 {“x”: 1}, 但不能匹配到消息体 {“x”: 2}:
    1. SELECT payload as p FROM "#" WHERE p.x = 1
    • 类似于上面的 SQL 语句,但嵌套地提取消息体中的数据,下面的 SQL 语句可以匹配到 JSON 消息体 {“x”: {“y”: 1}}:
    1. SELECT payload as a FROM "#" WHERE a.x.y = 1
    • 在 clientid = ‘c1’ 尝试连接时,提取其来源 IP 地址和端口号:
    1. SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
    • 筛选所有订阅 ‘t/#’ 主题且订阅级别为 QoS1 的 clientid:
    1. SELECT clientid FROM "$events/session_subscribed" WHERE topic = 't/#' and qos = 1
    • 筛选所有订阅主题能匹配到 ‘t/#’ 且订阅级别为 QoS1 的 clientid。注意与上例不同的是,这里用的是主题匹配操作符 ‘=~’,所以会匹配订阅 ‘t’ 或 ‘t/+/a’ 的订阅事件:
    1. SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
    • FROM 子句后面的主题需要用双引号 "" 引起来。
    • WHERE 子句后面接筛选条件,如果使用到字符串需要用单引号 '' 引起来。
    • FROM 子句里如有多个主题,需要用逗号 "," 分隔。例如 SELECT * FROM “t/1”, “t/2” 。
    • 可以使用使用 "." 符号对 payload 进行嵌套选择。

    遍历语法(FOREACH-DO-INCASE) 举例

    1. {
    2. "date": "2020-04-24",
    3. "sensors": [
    4. {"name": "a", "idx":0},
    5. {"name": "b", "idx":1},
    6. {"name": "c", "idx":2}
    7. ]
    8. }

    示例1: 要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为 ${name}。即最终规则引擎将会发出 3 条消息:

    1) 主题:sensors/0 内容:a 2) 主题:sensors/1 内容:b 3) 主题:sensors/2 内容:c

    要完成这个规则,我们需要配置如下动作:

    • 动作类型:消息重新发布 (republish)
    • 目的主题:sensors/${idx}
    • 目的 QoS:0
    • 消息内容模板:${name}

    以及如下 SQL 语句:

    示例解析:

    这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为:

    1. [
    2. {
    3. "name": "a",
    4. "idx": 0
    5. },
    6. {
    7. "name": "b",
    8. "idx": 1
    9. },
    10. {
    11. "name": "c",
    12. "idx": 2
    13. }

    FOREACH 语句将会对于结果数组里的每个对象分别执行 “消息重新发布” 动作,所以将会执行重新发布动作 3 次。

    示例2: 要求将 sensors 里的 idx 值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为 clientid=${clientid},name=${name},date=${date}。即最终规则引擎将会发出 2 条消息:

    1) 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24 2) 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24

    要完成这个规则,我们需要配置如下动作:

    • 动作类型:消息重新发布 (republish)
    • 目的主题:sensors/${idx}
    • 目的 QoS:0
    • 消息内容模板:clientid=${clientid},name=${name},date=${date}
    1. FOREACH
    2. payload.sensors
    3. DO
    4. clientid,
    5. item.name as name,
    6. item.idx as idx
    7. INCASE
    8. item.idx >= 1
    9. FROM "t/#"

    示例解析:

    这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 ; DO 子句选取每次操作需要的字段,这里我们选了外层的 clientid 字段,以及当前 sensor 对象的 nameidx 两个字段,注意 item 代表 sensors 数组中本次循环的对象。INCASE 子句是针对 DO 语句中字段的筛选条件,仅仅当 idx >= 1 满足条件。所以 SQL 的选取结果为:

    1. [
    2. {
    3. "name": "b",
    4. "idx": 1,
    5. "clientid": "c_emqx"
    6. },
    7. {
    8. "name": "c",
    9. "idx": 2,
    10. "clientid": "c_emqx"
    11. }
    12. ]

    FOREACH 语句将会对于结果数组里的每个对象分别执行 “消息重新发布” 动作,所以将会执行重新发布动作 2 次。

    在 DO 和 INCASE 语句里,可以使用 item 访问当前循环的对象,也可以通过在 FOREACH 使用 as 语法自定义一个变量名。所以本例中的 SQL 语句又可以写为:

    1. FOREACH
    2. payload.sensors as s
    3. DO
    4. clientid,
    5. s.name as name,
    6. s.idx as idx
    7. INCASE
    8. s.idx >= 1
    9. FROM "t/#"

    示例3: 在示例2 的基础上,去掉 clientid 字段 c_steve 中的 c_ 前缀

    在 FOREACH 和 DO 语句中可以调用各类 SQL 函数,若要将 c_steve 变为 steve,则可以把例2 中的 SQL 改为:

    1. FOREACH
    2. payload.sensors as s
    3. DO
    4. nth(2, tokens(clientid,'_')) as clientid,
    5. s.name as name,
    6. s.idx as idx
    7. INCASE
    8. s.idx >= 1
    9. FROM "t/#"

    另外,FOREACH 子句中也可以放多个表达式,只要最后一个表达式是指定要遍历的数组即可。比如我们将消息体改一下,sensors 外面多套一层 Object:

    1. {
    2. "date": "2020-04-24",
    3. "data": {
    4. "sensors": [
    5. {"name": "a", "idx":0},
    6. {"name": "b", "idx":1},
    7. {"name": "c", "idx":2}
    8. ]
    9. }

    则 FOREACH 中可以在决定要遍历的数组之前把 data 选取出来:

    1. payload.data as data
    2. data.sensors as s
    3. ...

    CASE-WHEN 语法示例

    示例1: 将消息中 x 字段的值范围限定在 0~7 之间。

    1. SELECT
    2. CASE WHEN payload.x < 0 THEN 0
    3. WHEN payload.x > 7 THEN 7
    4. ELSE payload.x
    5. END as x
    6. FROM "t/#"

    假设消息为:

    1. {"x": 8}
    1. {"x": 7}

    数组操作语法举例

    示例1: 创建一个数组,赋值给变量 a:

    下标从 1 开始,上面的 SQL 输出为:

    1. {
    2. "a": [1, 2, 3]
    3. }

    示例2: 从数组中取出第 N 个元素。下标为负数时,表示从数组的右边取:

    1. SELECT
    2. [1,2,3] as a,
    3. a[2] as b,
    4. a[-2] as c
    5. FROM
    6. "t/#"

    上面的 SQL 输出为:

    1. {
    2. "b": 2,
    3. "c": 2,
    4. "a": [1, 2, 3]
    5. }

    示例3: 从 JSON 格式的 payload 中嵌套的获取值:

    1. SELECT
    2. payload.data[1].id as id
    3. FROM
    4. "t/#"

    假设消息为:

    1. {"data": [
    2. {"id": 1, "name": "steve"},
    3. {"id": 2, "name": "bill"}
    4. ]}

    则上面的 SQL 输出为:

    1. {"id": 1}

    示例4: 数组范围(range)操作:

    1. SELECT
    2. [1..5] as a,
    3. a[2..4] as b
    4. FROM
    5. "t/#"

    上面的 SQL 输出为:

    1. {
    2. "b": [2, 3, 4],
    3. "a": [1, 2, 3, 4, 5]
    4. }

    示例5: 使用下标语法修改数组中的某个元素:

    1. SELECT
    2. payload,
    3. 'STEVE' as payload.data[1].name
    4. FROM
    5. "t/#"
    1. {"data": [
    2. {"id": 1, "name": "steve"},

    则上面的 SQL 输出为: