准备工作
关于如何在阿里云日志服务 SLS 创建实例,可参考快速入门。
步骤一:开通日志服务
步骤二:创建 Project 和 Logstore
对于 Project 的地域,请选择与 EMQX Cloud 部署相同的地域,便于通过内网做数据传输。
在对应的 Project 中,创建以下 4 个 Logstore(Kafka topic)
message_flow:用于记录消息从 pub--->emqx--->sub 的流转情况。
message_lost:用于记录消息丢失的情况。
session_sub_unsub:用于记录客户端订阅/取消订阅主题的情况。
client_online_offline:用于记录客户端上下线的情况。
步骤三:采集日志
在 Logstore 实例处,点击数据接入,选择 Kafka 协议,在本示例中我们以这种方式采集数据。
EMQX Cloud 数据集成配置
步骤一:创建资源
○ Kafka 服务器:SLS 内网地址
○ 用户名:SLS Project 名称
○ 密码:${access-key-id}#${access-key-secret}
○ 开启 SSL:true
步骤二:新建规则和动作
将下面规则 SQL 填入到 SQL 输入框中。
消息流转
①pub--->emqx
SELECT
id,
clientid,
topic,
qos,
event,
timestamp,
publish_received_at,
payload
FROM "t/#"
然后将数据转发到 message_flow。
消息丢失
SELECT
id,
clientid,
topic,
qos,
event,
timestamp,
publish_received_at,
reason,
payload
FROM
"$events/message_dropped", "$events/delivery_dropped"
WHERE topic =~ 't/#'
然后将数据转发到 message_lost。
客户端上下线
SELECT
*
FROM
"$events/client_connected", "$events/client_disconnected"
然后将数据转发到 client_online_offline。
客户端订阅/取消订阅主题
SELECT
*
FROM
"$events/session_subscribed", "$events/session_unsubscribed"
然后将数据转发到 session_sub_unsub。
测试
步骤一:使用 MQTT X 模拟数据上报
需要将 broker.emqx.io 替换成已创建的部署连接地址,并添加客户端认证信息。
步骤二:查询与分析日志
以 message_flow 为例,记录了消息从 pub--->emqx--->sub 的流转记录。
✓ 对于 QoS 0:
消息到 EMQX (message.publish) -> EMQX 发出(message.delivered)
✓ 对于 QoS 1 和 2:
消息到 EMQX (message.publish) -> EMQX 投递出去(message.delivered) -> 客户端收到消息返回 ack (message.acked)
在阿里云 SLS 页面,通过 sql 查询出从 EMQX 获取到的几个事件时间戳,然后求差值,即可获取到消息传递具体耗时。
您也可以将以上数据添加到仪表盘中进行查看。
下一篇 Using EMQX Cloud Data Integrations to save data to Firebase Realtime Database