数据源(Nanomq)-->ekuiper-->EMQX

EMQX 客服发表于:2022年11月28日 15:27:31更新于:2022年11月28日 15:27:43

概览

从 eKuiper 0.9.1 版本开始,每发布一个 eKuiper 新版本,会随之发布对应版本的管理控制台。本文以一个实际例子来说明如何使用管理控制台对 eKuiper 节点进行操作与管理。文中将订阅来自于 MQTT 服务器的数据,通过 eKuiper 写好的规则,经过处理后发送到业务指定平台,演示说明如下:

  • 通过管理控制台创建一个 eKuiper 节点

  • 创建一个流,用于订阅 MQTT 服务器中的数据,本例演示订阅 MQTT 服务器,相关信息如下所示。

    • 地址为:tcp://broker.emqx.io:1883

    • 主题为:emqx/up

    • 数据为:{"temperature": 20, "humidity" : 20}{"temperature": 30, "humidity" : 20}、{"temperature": 40, "humidity" : 20}

  • 创建一个规则,用于计算订阅到的数据,并将数据写入目标 (sink) 端「本例演示将订阅到的消息写入到文件中」。

  • eKuiper 目前已经支持多种源和目标。用户只需安装相对应的插件,便能实现对应的功能「本例的源为 MQTT源是内置支持,无需安装;目标为文件 (file),非内置支持,需要另安装」

数据框架

image.png

部署

本文以docker-compose安装方式为例:

version: '3.4'

services:
  nanomq:
    image: emqx/nanomq:0.13-slim
    container_name: nanomq
    hostname: nanomq
    ports:
      - "1883:1883"
  manager:
    image: emqx/ekuiper-manager:1.7
    container_name: kuiper-manager
    hostname: manager
    ports:
      - "9082:9082"
  kuiper:
    image: lfedge/ekuiper:1.7-slim-python
    container_name: kuiper
    hostname: kuiper
    ports:
      - "9081:9081"

登录 kuiper-manager

登录时需要提供 kuiper-manager 的地址,用户名、密码。如下图所示:

  • 地址:http://$yourhost:9082

  • 用户名:admin

  • 密码:public

image.png

创建 eKuiper 服务

创建 eKuiper 服务时需要填写「服务类型」,「服务名称」和「端点 URL 」。

  • 服务类型 : 选择 直接连接服务 (华为 IEF 服务 专用于华为用户)。

  • 服务名称 : 自拟,本例为 example

  • 端点URL:http://$IP:9081,IP 获取命令如下:

docker inspect kuiper |  grep IPAddress

创建 eKuiper 服务样例如下图所示,如果把端口暴露到了主机,那么也可以直接使用主机上的 9081 端口地址。

image.png

创建流

如下图,创建一个名为 nanomq 的流,

  • 用于订阅地址为 tcp://broker.emqx.io:1883 的 MQTT 服务器消息

  • 消息主题为 nanomq/up

  • 流结构体定义包含了以下三个字段。

    用户也可以去掉「是否为带结构的流」来定义一个 schemaless 的数据源。

    • temperature: bigint

    • humidity: bigint

    • id:bigint

  • 「流类型」可以不选择,不选的话为缺省的「mqtt」,或者如下图所示直接选择「mqtt」

  • 「配置组」,与「流类型」类似,用户不选的话,使用缺省的「default」

  • 「流格式」,与「流类型」类似,用户不选的话,使用缺省的「json」

image.png

如上所示用的是缺省的「default」配置组,用户也可以根据需求编写自己的配置。在ekuiper 1.7版本上可以在直接在创建流的时候去添加或修改配置组,如图:

image.pngimage.png

  • 名称:自定义

  • 服务器地址:MQTT 消息代理的服务器

  • 用户名:MQTT 连接用户名

  • 密码:MQTT 连接密码

  • MQTT协议版本:MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1

  • 客户端ID:MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid

  • QoS级别:默认订阅 QoS 级别

  • 证书路径:证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径

  • 私钥路径:私钥路径。可以为绝对路径,也可以为相对路径

  • 跟证书路径:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径

  • 跳过证书验证:控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证

  • Kubeedge版本号:Kubeedge 版本号,不同的版本号对应的文件内容不同

  • KubeEdge模型文件:KubeEdge 模版文件名,文件指定放在 etc/sources 文件夹中

创建规则

如下图,创建一条名为 rule_001 的规则(这里简单使用了一个时间窗口函数做了下数据过滤处理,更多函数使用详情可参考:https://ekuiper.org/docs/zh/latest/),将30s为一个时间窗口发送过来的数据中 temperature > 30 的数据过滤出来。SQL 编辑器在用户写 SQL 的过程中可以给出提示,方便用户完成 SQL 的编写。

image.png

单击「添加」按钮,弹出对话框如下所示。

image.png

  • 是否忽略输出:如果选择结果为空,则忽略输出。

  • 将结果数据按条发送:输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为 false,则输出消息将为{"result":"${the string of received message}"}。 例如,{"result":"[{"count":30},""count":20}]"}。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 {"count":30},然后发送{"count":20} 到 RESTful 端点。默认为 false。

  • 流格式:默认是json格式

  • 数据模版:Golang 模板格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是 map 数组。 如果未指定数据模板,则将数据作为原始输入。

创建规则后,如果一切正常,那么规则处于运行状态。

image.png

验证

通过nanomq_cli发布3条数据模拟传感器数据发送到 MQTT 服务器 tcp://broker.emqx.io:1883的主题 nanomq/up 中

docker exec -it nanomq bash

nanomq_cli pub --url 127.0.0.1:1883 -t nanomq/up -m '{"id":1,temperature": 30, "humidity" : 20}'

image.png

订阅

通过MQTTX客户端工具连接EMQX broker去订阅ekuiper上传消息topicekuiper/up,可以看到根据规则筛选得到30s内temperature大于30的数据消息,如图:

image.png

    您需要登录后才可以回复