MySQL 数据存储

EMQX 客服发表于:2022年05月18日 16:43:53更新于:2022年06月02日 11:02:19

MySQL 数据存储

提示:EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 MySQL规则引擎中创建 保存数据到 MySQL

配置文件: emqx_backend_mysql.conf

配置 MySQL 服务器

支持配置多台 MySQL 服务器连接池:

## Mysql 服务器地址
backend.mysql.pool1.server = 127.0.0.1:3306

## Mysql 连接池大小
backend.mysql.pool1.pool_size = 8

## Mysql 用户名
backend.mysql.pool1.user = root

## Mysql 密码
backend.mysql.pool1.password = public

## Mysql 数据库名称
backend.mysql.pool1.database = mqtt

配置 MySQL 存储规则

backend.mysql.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.mysql.hook.client.connected.2     = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
backend.mysql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
backend.mysql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
backend.mysql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
backend.mysql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
backend.mysql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
backend.mysql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
backend.mysql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## 获取离线消息
### "offline_opts": 获取离线消息的配置
####   - max_returned_count: 单次拉去的最大离线消息数目
####   - time_range: 仅拉去在当前时间范围的消息
## backend.mysql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## 如果需要存储 Qos0 消息, 可开启以下配置
## 警告: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次
## backend.mysql.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

MySQL 存储规则说明


hooktopicaction说明
client.connected
on_client_connected存储客户端在线状态
session.created
on_subscribe_lookup订阅主题
client.disconnected
on_client_disconnected存储客户端离线状态
session.subscribed#on_message_fetch获取离线消息
session.subscribed#on_retain_lookup获取retain消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储retain消息
message.publish#on_retain_delete删除retain消息
message.acked#on_message_acked消息ACK处理

SQL 语句参数说明


hook可用参数示例(sql语句中${name} 表示可获取的参数)
client.connectedclientidinsert into conn(clientid) values(${clientid})
client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivermsgid, topic, clientidinsert into deliver(msgid, topic) values(${msgid}, ${topic})

SQL 语句配置 Action

MySQL 存储支持用户采用 SQL 语句配置 Action:

## 在客户端连接到 EMQX 服务器后,执行一条 sql 语句(支持多条 sql 语句)
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

创建 MySQL 数据库表

create database mqtt;

导入 MySQL 库表结构

mysql -u root -p mqtt < etc/sql/emqx_backend_mysql.sql

提示:数据库名称可自定义

MySQL 设备在线状态表

mqtt_client 存储设备在线状态:

DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `state` varchar(3) DEFAULT NULL,
  `node` varchar(64) DEFAULT NULL,
  `online_at` datetime DEFAULT NULL,
  `offline_at` datetime DEFAULT NULL,
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_client_idx` (`clientid`),
  UNIQUE KEY `mqtt_client_key` (`clientid`),
  INDEX topic_index(`id`, `clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

查询设备在线状态:

select * from mqtt_client where clientid = ${clientid};

例如 ClientId 为 test 客户端上线:

 select * from mqtt_client where clientid = "test";

+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node           | online_at           | offline_at          | created             |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
   |  1 | test     | 1     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL                | 2022-05-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)

例如 ClientId 为 test 客户端下线:

select * from mqtt_client where clientid = "test";

+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node           | online_at           | offline_at          | created             |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
|  1 | test     | 0     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2022-05-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)

MySQL 主题订阅表

mqtt_sub 存储设备的主题订阅关系:

DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `topic` varchar(180) DEFAULT NULL,
  `qos` tinyint(1) DEFAULT NULL,
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`),
  UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`),
  INDEX topic_index(`id`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

例如 ClientId 为 test 客户端订阅主题 test_topic1 test_topic2:

insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic1", 1);
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic2", 2);

某个客户端订阅主题:

 select * from mqtt_sub where clientid = ${clientid};

查询 ClientId 为 test 的客户端已订阅主题:

select * from mqtt_sub where clientid = "test";

+----+--------------+-------------+------+---------------------+
| id | clientId     | topic       | qos  | created             |
+----+--------------+-------------+------+---------------------+
|  1 | test         | test_topic1 |    1 | 2022-05-24 17:09:05 |
|  2 | test         | test_topic2 |    2 | 2022-05-24 17:12:51 |
+----+--------------+-------------+------+---------------------+
2 rows in set (0.00 sec)

MySQL 消息存储表

mqtt_msg 存储 MQTT 消息:

DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `msgid` varchar(64) DEFAULT NULL,
  `topic` varchar(180) NOT NULL,
  `sender` varchar(64) DEFAULT NULL,
  `node` varchar(64) DEFAULT NULL,
  `qos` tinyint(1) NOT NULL DEFAULT '0',
  `retain` tinyint(1) DEFAULT NULL,
  `payload` blob,
  `arrived` datetime NOT NULL,
  PRIMARY KEY (`id`),
  INDEX topic_index(`id`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

查询某个客户端发布的消息:

select * from mqtt_msg where sender = ${clientid};

查询 ClientId 为 test 的客户端发布的消息:

select * from mqtt_msg where sender = "test";

+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| id | msgid                         | topic    | sender | node | qos | retain | payload | arrived             |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| 1  | 53F98F80F66017005000004A60003 | hello    | test   | NULL |   1 |      0 | hello   | 2022-05-24 17:25:12 |
| 2  | 53F98F9FE42AD7005000004A60004 | world    | test   | NULL |   1 |      0 | world   | 2022-05-24 17:25:45 |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
2 rows in set (0.00 sec)

MySQL 保留消息表

mqtt_retain 存储 retain 消息:

DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(180) DEFAULT NULL,
  `msgid` varchar(64) DEFAULT NULL,
  `sender` varchar(64) DEFAULT NULL,
  `node` varchar(64) DEFAULT NULL,
  `qos` tinyint(1) DEFAULT NULL,
  `payload` blob,
  `arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_retain_key` (`topic`),
  INDEX topic_index(`id`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

查询 retain 消息:

select * from mqtt_retain where topic = ${topic};

查询 topic 为 retain 的 retain 消息:

select * from mqtt_retain where topic = "retain";

+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| id | topic    | msgid                         | sender  | node | qos  | payload | arrived             |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
|  1 | retain   | 53F33F7E4741E7007000004B70001 | test    | NULL |    1 | www     | 2022-05-24 16:55:18 |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+


>    1 rows in set (0.00 sec)

MySQL 消息确认表

mqtt_acked 存储客户端消息确认:

DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `topic` varchar(180) DEFAULT NULL,
  `mid` int(11) unsigned DEFAULT NULL,
  `created` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`),
  INDEX topic_index(`id`, `topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

启用 MySQL 数据存储插件

./bin/emqx_ctl plugins load emqx_backend_mysql


    您需要登录后才可以回复