使用阿里云日志服务 SLS 实现消息全链路追踪

Cloud 服务支持发表于:2023年06月08日 11:01:18

准备工作

关于如何在阿里云日志服务 SLS 创建实例,可参考快速入门

步骤一:开通日志服务

步骤二:创建 Project 和 Logstore

对于 Project 的地域,请选择与 EMQX Cloud 部署相同的地域,便于通过内网做数据传输。

0016481410f789527f7783139adaf98

在对应的 Project 中,创建以下 4 个 Logstore(Kafka topic)

message_flow:用于记录消息从 pub--->emqx--->sub 的流转情况。

message_lost:用于记录消息丢失的情况。

session_sub_unsub:用于记录客户端订阅/取消订阅主题的情况。

client_online_offline:用于记录客户端上下线的情况。

001648141252a3dc6660f335aa89f0c

步骤三:采集日志

在 Logstore 实例处,点击数据接入,选择 Kafka 协议,在本示例中我们以这种方式采集数据。

001648141384248e04f403018fc4a9c


EMQX Cloud 数据集成配置

步骤一:创建资源

○ Kafka 服务器:SLS 内网地址

○ 用户名:SLS Project 名称

○ 密码:${access-key-id}#${access-key-secret}

○ 开启 SSL:true
0016481419754b5a733d3fed6830bc6

步骤二:新建规则和动作

将下面规则 SQL 填入到 SQL 输入框中。

消息流转

①pub--->emqx


SELECT
  id,
  clientid,
  topic,
  qos,
  event,
  timestamp,
  publish_received_at,
  payload
FROM "t/#"


然后将数据转发到 message_flow。

001648141f7db79c96c81e8ca77aa5d


消息丢失


SELECT
  id,
  clientid,
  topic,
  qos,
  event,
timestamp,
  publish_received_at,
  reason,
  payload
FROM
"$events/message_dropped", "$events/delivery_dropped"
WHERE topic =~ 't/#'


然后将数据转发到 message_lost。

00164814355e4d02dd40235f5aea96a


客户端上下线


SELECT
*
FROM
"$events/client_connected", "$events/client_disconnected"


然后将数据转发到 client_online_offline。

0016481438083587e5c04e7cc650e75


客户端订阅/取消订阅主题


SELECT
*
FROM
"$events/session_subscribed", "$events/session_unsubscribed"


然后将数据转发到 session_sub_unsub。

0016481439fca8fc36e1adc473d86e3

测试

步骤一:使用 MQTT X 模拟数据上报

需要将 broker.emqx.io 替换成已创建的部署连接地址,并添加客户端认证信息。

001648143b2786b1f61d11fe195588c


步骤二:查询与分析日志

以 message_flow 为例,记录了消息从 pub--->emqx--->sub 的流转记录。

✓ 对于 QoS 0:

消息到  EMQX (message.publish) -> EMQX 发出(message.delivered)

✓ 对于 QoS 1 和 2: 

消息到  EMQX (message.publish) -> EMQX 投递出去(message.delivered) -> 客户端收到消息返回 ack (message.acked)

在阿里云 SLS 页面,通过 sql 查询出从 EMQX 获取到的几个事件时间戳,然后求差值,即可获取到消息传递具体耗时。

001648143c9040de8e0b6e381931564

您也可以将以上数据添加到仪表盘中进行查看。

001648143db13384cfb71fcaf5dd606


    您需要登录后才可以回复