数据框架
部署
Neuron 和 eKuiper 都支持二进制安装包以及 Docker 容器化部署方案。本文以 Docker 方案为例,采用 docker compose (opens new window)方式,一键完成边缘端两个组件的快速部署以及此次试验所需其他软件。本文以将处理后的数据结果写入TDengine为例。
1.复制 docker-compose.yml 文件到部署的机器上。其内容如下,包含了 Neuron,eKuiper,TDengine,Grafana 以及 eKuiper 的管理界面 eKuiper manager(可选)。其中,eKuiper 和 neuron 共享了名为 nng-ipc
的 volume ,用于二者通信。如果要使用 eKuiper alpine 版本,需要在 compose 文件的 eKuiper 部分添加 user: root:root
赋予写入 ipc 文件的权限,否则连接将无法建立。
version: '3.4' services: manager: image: emqx/ekuiper-manager:1.7 container_name: kuiper-manager hostname: manager ports: - "9082:9082" kuiper: image: lfedge/ekuiper:1.7.0-slim container_name: kuiper hostname: kuiper volumes: - nng-ipc:/tmp ports: - "9081:9081" - "20498:20498" neuron: image: neugates/neuron:2.2.7 container_name: neuron hostname: neuron volumes: - nng-ipc:/tmp ports: - "7000:7000" - "7001:7001" TDengine: image: tdengine/tdengine:2.6.0.8 container_name: tdengine hostname: taos ports: - "6030:6030" - "6035:6035" - "6041:6041" - "6030:6030/udp" - "6040:6040/udp" grafana: image: grafana/grafana:9.2.3 container_name: grafana hostname: grafana ports: - "3000:3000" volumes: nng-ipc:
注:ekuiper和ekuiper-manager版本应保持对应,如果ekuiper-manager版本低于ekuiper可能会造成ekuiper新增功能在低版本的ekuiper-manager上无法使用
2.在该文件所在目录,运行:
docker-compose up -d
3.所有的容器启动完毕之后,请使用 docker-compose ps
命令确定所有的容器已经正常启动。
数据流配置
LF Edge eKuiper 是物联网数据分析和流式计算引擎,所以第一步就是要将数据流接入ekuiper,这里以neuron采集modbus-tcp协议点位值作为流的来源为例
Neuron北向接入ekuiper
在配置菜单中选择北向应用管理
,进入到北向应用管理界面,此时是已经有一个默认创建好的北向应用,不需要填写任何配置,只需在ekuiper创建好相应neuron,这个北向应用会自动连接ekuiper
订阅 Group
点击第一步应用卡片 data-stream-processing 中任意空白处,进入到订阅Group界面,如下图所示。
点击右上角的
添加订阅
按键添加订阅;下拉框选择南向设备,这里我们选择上面建好的 modbus-plus-tcp-1 的设备;
下拉框选择所要订阅的 Group,这里我们选择上面建好的 group-1;
点击提交,完成订阅。
点击
北向应用管理
,点开应用卡片中的工作状态开关,使应用进入运行中的状态。
至此,Neuron 已配置好数据采集,并将采集到的数据发送到北向的 eKuiper 通道中。
eKuiper manager 配置
eKuiper manager 是一个 Web 管理界面,可管理多个 eKuiper 实例。因此,我们需要设置 manager 管理的 eKuiper 实例。详细设置请参考eKuiper 管理控制台的使用。
eKuiper 管理可使用 REST API,命令行以及管理控制台。以下教程中,我们主要使用 REST API 进行管理,包括流和规则的创建。
注:下面所有操作均在ekuiper-manager上进行配置,命令行创建可参考ekuiper官网文档(https://ekuiper.org/docs/zh/latest/)
创建ekuiper服务
创建 eKuiper 服务时需要填写「服务类型」,「服务名称」和「端点 URL 」。
服务类型 : 选择
直接连接服务
(华为 IEF 服务
专用于华为用户)。服务名称 : 自拟,本例为
ekuiper
。端点URL:
http://$IP:9081
,IP 获取命令如下:
docker inspect kuiper | grep IPAddress
创建 eKuiper 服务样例如下图所示,如果把端口暴露到了主机,那么也可以直接使用主机上的 9081 端口地址。
源管理
配置
创建流
如图,创建一个名为
neuron
的流,消息主题为
neuron/#
流结构体暂无定义
用户也可以选择「带结构的流」来定义一个 schemaless 的数据源。
「流类型」,选择「neuron」
「配置组」,与「流类型」类似,这里选择配置新添加的neuron
「流格式」,与「流类型」类似,用户不选的话,使用缺省的「json」
添加插件
注意:插件安装、并且通过规则使用后,插件已经被加载到内存中,由于 Golang 语言的限制,在插件删除的时候,无法将其真正卸载,所以想重新进行插件的安装,eKuiper 必须重启才可生效;目前只支持在 debian 的 Docker 环境里的插件安装,其余环境暂不支持。
创建规则
如下图,创建一条名为 rule_001 的规则,将数据中 tem > 30 的数据过滤出来。SQL 编辑器在用户写 SQL 的过程中可以给出提示,方便用户完成 SQL 的编写。
添加动作
数据库地址必须为域名,端口6030,用户名密码默认为root/taosdata,数据库和表名称填写TDengine里面实际创建的
是否忽略输出:如果选择结果为空,则忽略输出。
将结果数据按条发送:输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为 false,则输出消息将为{"result":"${the string of received message}"}。 例如,{"result":"[{"count":30},""count":20}]"}。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 {"count":30},然后发送{"count":20} 到 RESTful 端点。默认为 false。
流格式:默认是json格式
数据模版:Golang 模板格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是 map 数组。 如果未指定数据模板,则将数据作为原始输入。
查看规则统计
通过下图可以看到规则是正常运行并且成功写入TDengine