扩展插件(Plugins)

    emqtt项目组开发维护的插件包括:

    emqttd插件实际是一个Erlang应用,带自身的配置文件’etc/plugin.config”,置于’emqttd/plugins’目录下。

    plugins/emqttd_plugin_template是一个模版插件,典型目录结构:

    管理命令行’./bin/emqttd_ctl’加载卸载插件。

    加载插件:

    卸载插件:

    查询插件:

    1. ./bin/emqttd_ctl plugins list

    emqttd_dashboard: Dashboard插件

    emqttd消息服务器的Web管理控制台。插件项目地址:

    emqttd消息服务器默认加载Dashboard插件。URL地址: http://localhost:18083 ,缺省用户名/密码: admin/public。

    Dashboard插件可查询emqttd基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)。

    Dashboard插件设置

    plugins/emqttd_dashboard/etc/plugin.config:

    1. [
    2. {emqttd_dashboard, [
    3. {listener,
    4. {emqttd_dashboard, 18083, [
    5. {acceptors, 4},
    6. {max_clients, 512}]}
    7. }
    8. ]}
    9. ].

    emqttd_auth_http: HTTP认证/访问控制插件

    HTTP认证/访问控制插件:

    Note

    1.1版本支持

    配置插件

    plugins/emqttd_auth_http/etc/plugin.config:

    1. [
    2. {emqttd_auth_http, [
    3. %% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic
    4. {super_req, [
    5. {method, post},
    6. {url, "http://localhost:8080/mqtt/superuser"},
    7. {params, [
    8. {username, "%u"},
    9. {clientid, "%c"}
    10. ]}
    11. ]},
    12. {auth_req, [
    13. {method, post},
    14. {url, "http://localhost:8080/mqtt/auth"},
    15. {params, [
    16. {clientid, "%c"},
    17. {username, "%u"},
    18. {password, "%P"}
    19. ]}
    20. ]},
    21. %% 'access' parameter: sub = 1, pub = 2
    22. {acl_req, [
    23. {method, post},
    24. {url, "http://localhost:8080/mqtt/acl"},
    25. {params, [
    26. {access, "%A"},
    27. {username, "%u"},
    28. {clientid, "%c"},
    29. {ipaddr, "%a"},
    30. {topic, "%t"}
    31. ]}
    32. ]}
    33. ]}
    34. ].

    HTTP API

    认证/ACL成功,API返回200

    认证/ACL失败,API返回4xx

    加载插件

    ./bin/emqttd_ctl plugins load emqttd_auth_http

    emqttd_plugin_mysql: MySQL认证/访问控制插件

    MySQL认证/访问控制插件,基于MySQL库表认证鉴权: https://github.com/emqtt/emqttd_plugin_mysql

    MQTT用户表

    1. CREATE TABLE `mqtt_user` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `username` varchar(100) DEFAULT NULL,
    4. `password` varchar(100) DEFAULT NULL,
    5. `salt` varchar(20) DEFAULT NULL,
    6. `is_superuser` tinyint(1) DEFAULT 0,
    7. `created` datetime DEFAULT NULL,
    8. PRIMARY KEY (`id`),
    9. UNIQUE KEY `mqtt_username` (`username`)
    10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

    Note

    MQTT访问控制表

    1. CREATE TABLE `mqtt_acl` (
    2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
    4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
    5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
    6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
    7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
    8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
    9. PRIMARY KEY (`id`)
    10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    11. INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
    12. VALUES
    13. (1,1,NULL,'$all',NULL,2,'#'),
    14. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    15. (3,0,NULL,'$all',NULL,1,'eq #'),
    16. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    17. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    18. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    配置插件

    plugins/emqttd_plugin_mysql/etc/plugin.config:

    1. [
    2. {emqttd_plugin_mysql, [
    3. {mysql_pool, [
    4. %% ecpool options
    5. {pool_size, 8},
    6. {auto_reconnect, 3},
    7. %% mysql options
    8. {host, "localhost"},
    9. {port, 3306},
    10. {user, ""},
    11. {password, ""},
    12. {database, "mqtt"},
    13. {encoding, utf8}
    14. ]},
    15. %% Variables: %u = username, %c = clientid, %a = ipaddress
    16. %% Superuser Query
    17. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
    18. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
    19. %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
    20. {password_hash, sha256},
    21. %% select password with salt
    22. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
    23. %% sha256 with salt prefix
    24. %% {password_hash, {salt, sha256}},
    25. %% sha256 with salt suffix
    26. %% {password_hash, {sha256, salt}},
    27. %% '%a' = ipaddress, '%u' = username, '%c' = clientid
    28. %% Comment this query, the acl will be disabled
    29. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
    30. %% If no ACL rules matched, return...
    31. {acl_nomatch, allow}
    32. ]}
    33. ].

    加载插件

    ./bin/emqttd_ctl plugins load emqttd_plugin_mysql

    PostgreSQL认证/访问控制插件,基于PostgreSQL库表认证鉴权:

    MQTT用户表

    1. CREATE TABLE mqtt_user (
    2. id SERIAL primary key,
    3. is_superuser boolean,
    4. username character varying(100),
    5. password character varying(100),
    6. salt character varying(40)
    7. );

    MQTT访问控制表

    1. CREATE TABLE mqtt_acl (
    2. allow integer,
    3. ipaddr character varying(60),
    4. username character varying(100),
    5. clientid character varying(100),
    6. access integer,
    7. topic character varying(100)
    8. );
    9. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
    10. VALUES
    11. (1,1,NULL,'$all',NULL,2,'#'),
    12. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    13. (3,0,NULL,'$all',NULL,1,'eq #'),
    14. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    15. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    16. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

    配置插件

    plugins/emqttd_plugin_pgsql/etc/plugin.config:

    1. [
    2. {emqttd_plugin_pgsql, [
    3. {pgsql_pool, [
    4. %% ecpool options
    5. {pool_size, 8},
    6. {auto_reconnect, 3},
    7. %% pgsql options
    8. {host, "localhost"},
    9. {port, 5432},
    10. {ssl, false},
    11. {username, "feng"},
    12. {password, ""},
    13. {database, "mqtt"},
    14. {encoding, utf8}
    15. ]},
    16. %% Variables: %u = username, %c = clientid, %a = ipaddress
    17. %% Superuser Query
    18. {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
    19. %% Authentication Query: select password only
    20. {authquery, "select password from mqtt_user where username = '%u' limit 1"},
    21. %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
    22. {password_hash, sha256},
    23. %% select password with salt
    24. %% {authquery, "select password, salt from mqtt_user where username = '%u'"},
    25. %% sha256 with salt prefix
    26. %% {password_hash, {salt, sha256}},
    27. %% sha256 with salt suffix
    28. %% {password_hash, {sha256, salt}},
    29. %% Comment this query, the acl will be disabled. Notice: don't edit this query!
    30. {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl
    31. where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
    32. %% If no rules matched, return...
    33. {acl_nomatch, allow}
    34. ]}
    35. ].
    1. ./bin/emqttd_ctl plugins load emqttd_plugin_pgsql

    emqttd_plugin_redis: Redis认证/访问控制插件

    基于Redis认证/访问控制: https://github.com/emqtt/emqttd_plugin_redis

    配置插件

    plugins/emqttd_plugin_redis/etc/plugin.config:

    1. [
    2. {emqttd_plugin_redis, [
    3. {eredis_pool, [
    4. %% ecpool options
    5. {pool_size, 8},
    6. {auto_reconnect, 2},
    7. %% eredis options
    8. {host, "127.0.0.1"},
    9. {port, 6379},
    10. {database, 0},
    11. {password, ""}
    12. ]},
    13. %% Variables: %u = username, %c = clientid
    14. %% HMGET mqtt_user:%u is_superuser
    15. {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]},
    16. %% HMGET mqtt_user:%u password
    17. {authcmd, ["HGET", "mqtt_user:%u", "password"]},
    18. %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2?
    19. {password_hash, sha256},
    20. %% SMEMBERS mqtt_acl:%u
    21. {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]},
    22. %% If no rules matched, return...
    23. {acl_nomatch, deny},
    24. %% Load Subscriptions form Redis when client connected.
    25. {subcmd, ["HGETALL", "mqtt_subs:%u"]}
    26. ]}
    27. ].

    用户Hash

    默认基于用户Hash认证:

    ACL规则SET

    默认采用SET存储ACL规则:

    1. SADD mqtt_acl:<username> "publish topic1"
    2. SADD mqtt_acl:<username> "subscribe topic2"
    3. SADD mqtt_acl:<username> "pubsub topic3"

    订阅Hash

    插件还支持Redis中创建MQTT订阅。当MQTT客户端连接成功,会自动从Redis加载订阅:

    1. HSET mqtt_subs:<username> topic1 0
    2. HSET mqtt_subs:<username> topic2 1
    3. HSET mqtt_subs:<username> topic3 2

    加载插件

    1. ./bin/emqttd_ctl plugins load emqttd_plugin_redis

    emqttd_plugin_mongo: MongoDB认证/访问控制插件

    基于MongoDB认证/访问控制:

    配置插件

    plugins/emqttd_plugin_mongo/etc/plugin.config:

    1. [
    2. {emqttd_plugin_mongo, [
    3. {mongo_pool, [
    4. {pool_size, 8},
    5. {auto_reconnect, 3},
    6. %% Mongodb Driver Opts
    7. {host, "localhost"},
    8. {port, 27017},
    9. %% {login, ""},
    10. %% {password, ""},
    11. ]},
    12. %% Variables: %u = username, %c = clientid
    13. %% Superuser Query
    14. {superquery, [
    15. {collection, "mqtt_user"},
    16. {super_field, "is_superuser"},
    17. {selector, {"username", "%u"}}
    18. ]},
    19. %% Authentication Query
    20. {authquery, [
    21. {collection, "mqtt_user"},
    22. {password_field, "password"},
    23. %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
    24. {selector, {"username", "%u"}}
    25. ]},
    26. %% ACL Query: "%u" = username, "%c" = clientid
    27. {aclquery, [
    28. {collection, "mqtt_acl"},
    29. {selector, {"username", "%u"}}
    30. ]},
    31. %% If no ACL rules matched, return...
    32. {acl_nomatch, deny}
    33. ]}
    34. ].

    MongoDB数据库

    1. use mqtt
    2. db.createCollection("mqtt_user")
    3. db.createCollection("mqtt_acl")
    4. db.mqtt_user.ensureIndex({"username":1})

    Note

    数据库、集合名称可自定义

    用户集合(User Collection)

    1. {
    2. username: "user",
    3. password: "password hash",
    4. is_superuser: boolean (true, false),
    5. created: "datetime"
    6. }

    示例:

    1. db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
    2. db.mqtt_user:insert({username: "root", is_superuser: true})

    ACL集合(ACL Collection)

    1. {
    2. username: "username",
    3. clientid: "clientid",
    4. publish: ["topic1", "topic2", ...],
    5. subscribe: ["subtop1", "subtop2", ...],
    6. pubsub: ["topic/#", "topic1", ...]
    7. }

    示例:

    1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
    2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

    加载插件

    1. ./bin/emqttd_ctl plugins load emqttd_plugin_mongo

    emqttd_stomp: Stomp协议插件

    Stomp协议插件。支持STOMP 1.0/1.1/1.2协议客户端连接emqttd,发布订阅MQTT消息。

    配置插件

    Note

    Stomp协议端口: 61613

    plugins/emqttd_stomp/etc/plugin.config:

    1. [
    2. {emqttd_stomp, [
    3. {default_user, [
    4. {login, "guest"},
    5. {passcode, "guest"}
    6. ]},
    7. {allow_anonymous, true},
    8. %%TODO: unused...
    9. {frame, [
    10. {max_headers, 10},
    11. {max_header_length, 1024},
    12. {max_body_length, 8192}
    13. ]},
    14. {listeners, [
    15. {emqttd_stomp, 61613, [
    16. {acceptors, 4},
    17. {max_clients, 512}
    18. ]}
    19. ]}
    20. ]}
    21. ].
    1. ./bin/emqttd_ctl plugins load emqttd_stomp

    配置插件

    Note

    缺省端口: 61616

    加载插件

    Note

    需先加载emqttd_stomp插件

    1. ./bin/emqttd_ctl plugins load emqttd_stomp
    2. ./bin/emqttd_ctl plugins load emqttd_sockjs

    插件演示页面

    emqttd_recon: Recon性能调试插件

    emqttd_recon插件集成recon性能调测库,’./bin/emqttd_ctl’命令行注册recon命令。

    加载插件

    1. ./bin/emqttd_ctl plugins load emqttd_recon

    recon命令

    1. ./bin/emqttd_ctl recon
    2. recon memory #recon_alloc:memory/2
    3. recon allocated #recon_alloc:memory(allocated_types, current|max)
    4. recon bin_leak #recon:bin_leak(100)
    5. recon node_stats #recon:node_stats(10, 1000)
    6. recon remote_load Mod #recon:remote_load(Mod)

    emqttd_reloader: 代码热加载插件

    用于开发调试的代码热升级插件。加载该插件后,emqttd会自动热升级更新代码。

    Note

    加载插件

    1. ./bin/emqttd_ctl plugins load emqttd_reloader

    reload命令

    1. ./bin/emqttd_ctl reload
    2. reload <Module> # Reload a Module

    emqttd插件开发

    创建插件项目

    github下载emqttd源码库,plugins/目录下创建插件应用。

    模版代码请参考: emqttd_plugin_templage

    注册认证/访问控制模块

    认证演示模块 - emqttd_auth_demo.erl

    1. -module(emqttd_auth_demo).
    2. -behaviour(emqttd_auth_mod).
    3. -include("../../../include/emqttd.hrl").
    4. -export([init/1, check/3, description/0]).
    5. init(Opts) -> {ok, Opts}.
    6. check(#mqtt_client{client_id = ClientId, username = Username}, Password, _Opts) ->
    7. io:format("Auth Demo: clientId=~p, username=~p, password=~p~n",
    8. [ClientId, Username, Password]),
    9. ok.
    10. description() -> "Demo Auth Module".

    访问控制演示模块 - emqttd_acl_demo.erl

    1. -module(emqttd_acl_demo).
    2. -include("../../../include/emqttd.hrl").
    3. %% ACL callbacks
    4. -export([init/1, check_acl/2, reload_acl/1, description/0]).
    5. init(Opts) ->
    6. {ok, Opts}.
    7. check_acl({Client, PubSub, Topic}, Opts) ->
    8. io:format("ACL Demo: ~p ~p ~p~n", [Client, PubSub, Topic]),
    9. allow.
    10. reload_acl(_Opts) ->
    11. ok.
    12. description() -> "ACL Module Demo".

    注册认证、访问控制模块 - emqttd_plugin_template_app.erl

    1. ok = emqttd_access_control:register_mod(auth, emqttd_auth_demo, []),
    2. ok = emqttd_access_control:register_mod(acl, emqttd_acl_demo, []),

    注册扩展钩子(Hooks)

    通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。

    emqttd_plugin_template.erl:

    1. %% Called when the plugin application start
    2. load(Env) ->
    3. emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    4. emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    5. emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
    6. emqttd:hook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3, [Env]),
    7. emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
    8. emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    9. emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
    10. emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).

    扩展钩子(Hook):

    钩子

    说明

    client.connected

    客户端上线

    client.subscribe

    客户端订阅主题前

    client.subscribe.after

    客户端订阅主题后

    client.unsubscribe

    客户端取消订阅主题

    message.publish

    MQTT消息发布

    message.delivered

    MQTT消息送达

    message.acked

    MQTT消息回执

    client.disconnected

    客户端连接断开

    注册扩展命令行

    扩展命令行演示模块 - emqttd_cli_demo.erl

    1. -module(emqttd_cli_demo).
    2. -include("../../../include/emqttd_cli.hrl").
    3. -export([cmd/1]).
    4. cmd(["arg1", "arg2"]) ->
    5. ?PRINT_MSG("ok");
    6. cmd(_) ->
    7. ?USAGE([{"cmd arg1 arg2", "cmd demo"}]).

      插件加载后,’./bin/emqttd_ctl’新增命令行: