规则引擎
EMQX Rule Engine (以下简称规则引擎) 用于配置 EMQX 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的 "配置式" 的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 EMQX 的耦合度;也为 EMQX 的私有功能定制提供了一个更优秀的基础架构。
EMQX 在 消息发布或事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。
提示:
1.适用版本:EMQX v3.1.0+
2.兼容提示:EMQX v4.0 对规则引擎 SQL 语法做出较大调整,v3.x 升级用户请参照 迁移指南 进行适配。
EMQX 规则引擎快速入门
此处包含规则引擎的简介与实战,演示使用规则引擎结合华为云 RDS 上的 MySQL 服务,进行物联网 MQTT 设备在线状态记录、消息存储入库。
从本视频中可以快速了解规则引擎解决的问题和基础使用方法,视频链接:https://www.bilibili.com/video/BV19T4y1w7Nj?zw
消息发布
规则引擎借助响应动作可将特定主题的消息处理结果存储到数据库,发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
选择发布到 t/# 主题的消息,并筛选出全部字段:
SELECT * FROM "t/#"
选择发布到 t/a 主题的消息,并从 JSON 格式的消息内容中筛选出 "x" 字段:
SELECT payload.x as x FROM "t/a"
事件触发
规则引擎使用 $events/ 开头的虚拟主题(事件主题)处理 EMQX 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。
选择客户端连接事件,筛选 Username 为 'emqx' 的设备并获取连接信息:
SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'emqx'
规则引擎数据和 SQL 语句格式,事件主题 列表详细教程参见 SQL 手册。
规则引擎组成
规则描述了 数据从哪里来、如何筛选并处理数据、处理结果到哪里去 三个配置,即一条可用的规则包含三个要素:
触发事件:规则通过事件触发,触发时事件给规则注入事件的上下文信息(数据源),通过 SQL 的 FROM 子句指定事件类型;
处理规则(SQL):使用 SELECT 子句 和 WHERE 子句以及内置处理函数, 从上下文信息中过滤和处理数据;
响应动作:如果有处理结果输出,规则将执行相应的动作,如持久化到数据库、重新发布处理后的消息、转发消息到消息队列等。一条规则可以配置多个响应动作。
如图所示是一条简单的规则,该条规则用于处理 消息发布 时的数据,将全部主题消息的 msg
字段,消息 topic
、qos
筛选出来,发送到 Web Server 与 /uplink 主题:
使用 EMQX 的规则引擎可以灵活地处理消息和事件。使用规则引擎可以方便地实现诸如将消息转换成指定格式,然后存入数据库表,或者发送到消息队列等。
与 EMQX 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resource-type)。
规则、动作、资源的关系:
规则: { SQL 语句, 动作列表: [ { 动作1, 动作参数, 绑定资源: { 资源配置 } }, { 动作2, 动作参数, 绑定资源: { 资源配置 } } ] }
规则(Rule): 规则由 SQL 语句和动作列表组成。动作列表包含一个或多个动作及其参数。
SQL 语句用于筛选或转换消息中的数据。
动作(Action) 是 SQL 语句匹配通过之后,所执行的任务。动作定义了一个针对数据的操作。 动作可以绑定资源,也可以不绑定。例如,“inspect” 动作不需要绑定资源,它只是简单打印数据内容和动作参数。而 “data_to_webserver” 动作需要绑定一个 web_hook 类型的资源,此资源中配置了 URL。
资源(Resource): 资源是通过资源类型为模板实例化出来的对象,保存了与资源相关的配置(比如数据库连接地址和端口、用户名和密码等) 和系统资源(如文件句柄,连接套接字等)。
资源类型 (Resource Type): 资源类型是资源的静态定义,描述了此类型资源需要的配置项。
提示:动作和资源类型是由 emqx 或插件的代码提供的,不能通过 API 和 CLI 动态创建。
规则引擎典型应用场景举例
动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则引擎配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
数据筛选:车辆网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于 40 km/h 时的数据,此场景下可以使用规则引擎对消息进行条件过滤,向业务消息队列写入满足条件的数据;
消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则引擎将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
消息编解码:其他公共协议 / 私有 TCP 协议接入、工控行业等应用场景下,可以通过规则引擎的本地处理函数(可在 EMQX 上定制开发)做二进制 / 特殊格式消息体的编解码工作;亦可通过规则引擎的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。
在 Dashboard 中测试 SQL 语句
Dashboard 界面提供了 SQL 语句测试功能,通过给定的 SQL 语句和事件参数,展示 SQL 测试结果。
在创建规则界面,输入 规则SQL,并启用 SQL 测试 开关:
修改模拟事件的字段,或者使用默认的配置,点击 测试 按钮:
SQL 处理后的结果将在 测试输出 文本框里展示:
迁移指南
4.0 版本中规则引擎 SQL 语法更加易用,3.x 版本中所有事件 FROM 子句后面均需要指定事件名称,4.0 以后我们引入 事件主题 概念,默认情况下 消息发布 事件不再需要指定事件名称:
## 3.x 版本 ## 需要指定事件名称进行处理 SELECT * FROM "t/#" WHERE topic =~ 't/#' ## 4.0 及以后版本 ## 默认处理 message.publish 事件,FROM 后面直接填写 MQTT 主题 ## 上述 SQL 语句等价于: SELECT * FROM 't/#' ## 其他事件,FROM 后面填写事件主题 SELECT * FROM "$events/message_acked" where topic =~ 't/#' SELECT * FROM "$events/client_connected"
提示:Dashboard 中提供了旧版 SQL 语法转换功能可以完成 SQL 升级迁移。