离线消息一对多

EMQX 企业版技术支持发表于:2022年05月26日 09:44:13

一对多消息存储

图片1.png

1. Publish 端发布一条消息;

2. Backend 将消息记录在数据库中;

3. Subscribe1 和 Subscribe2 订阅主题;

4. Backend 从数据库中获取该主题的消息;

5. 发送消息给 Subscribe1 和 Subscribe2;

6. Backend 记录 Subscribe1 和 Subscribe2 已读消息位置,下次获取消息从该位置开始。

演示

以emqx_backend_pgsql插件为例:

image.png

先在/opt/emqx/etc/plugins目录下编辑emqx_backend_pgsql.conf文件

配置 PostgreSQL 服务器

支持配置多台PostgreSQL服务器连接池:

## Pgsql 服务器地址
backend.pgsql.pool1.server = 127.0.0.1:5432

## Pgsql 连接池大小
backend.pgsql.pool1.pool_size = 8

## Pgsql 用户名
backend.pgsql.pool1.username = root

## Pgsql 密码
backend.pgsql.pool1.password = public

## Pgsql 数据库名称
backend.pgsql.pool1.database = mqtt

## Pgsql Ssl
backend.pgsql.pool1.ssl = false

配置 PostgreSQL 存储规则

## backend.pgsql.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## backend.pgsql.hook.session.connected.2     = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

backend.pgsql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## backend.pgsql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## backend.pgsql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}

backend.pgsql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## backend.pgsql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## backend.pgsql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

backend.pgsql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## 获取离线消息### "offline_opts": 获取离线消息的配置
####   - max_returned_count: 单次拉去的最大离线消息数目
####   - time_range: 仅拉去在当前时间范围的消息
## backend.pgsql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## 如果需要存储 Qos0 消息, 可开启以下配置
## 警告: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次
## backend.pgsql.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

创建 PostgreSQL 数据库

使用用户名 postgres 创建名为 'mqtt' 的数据库

createdb -U postgres mqtt
 
psql -U postgres mqtt
 
mqtt=> \dn;
List of schemas
Name  | Owner
--------+-------
public | postgres
(1 row)

需要创建两张表:

PostgreSQL 消息存储表

mqtt_msg 存储MQTT消息:

CREATE TABLE mqtt_msg (
  id SERIAL8 primary key,
  msgid character varying(64),
  sender character varying(64),
  topic character varying(255),
  qos integer,
  retain integer,
  payload text,
  arrived timestamp without time zone);

PostgreSQL 消息确认表

mqtt_acked 存储客户端消息确认:

CREATE TABLE mqtt_acked (
  id SERIAL8 primary key,
  clientid character varying(64),
  topic character varying(64),
  mid integer,
  created timestamp without time zone,
  UNIQUE (clientid, topic));

这样可以实现离线消息一对多且不重复 但收到离线消息的前提是:在acked表项中对应的clientid-topic关系

 1.客户端上线 订阅主题 就会在acked表中存储对应的关系

图片2.png

2.客户端离线后,如果有离线消息存储到数据库中,离线的客户端再次上线,就会收到对应的离线消息

图片3.png

    您需要登录后才可以回复