这篇文章主要讲述在 MQTT Broker
中数据如何进行持久化操作,包括什么数据需要进行持久化 (WHO)
以及怎么进行持久化操作 (HOW)
。
概述
根据存活时间的长短,可以将数据分为两种类型:
- 会话数据:跟
Client
会话同周期,随着 Client
退出而销毁,如会话状态、发送/接收中的数据、Will
数据及订阅数据;
- 静态数据:需要长时间存在的数据,如用户及
QoS 1&2
级别的数据。
根据数据存储的特性,两种类型的数据分别使用不同的持久化工具,对于会话数据而言,根据需要实时读取及存活周期较短的特性,可以使用缓存来进行存储,如 Redis
; 而对于需要长期存储的静态数据,可以使用数据库来进行存储,如 Mysql
. 在本文中,便是使用了 Redis
和 Mysql
来存储数据;
会话数据
在 MQTT broker
中,会话数据包括以下类型:
- 会话信息及状态;
- 发送/接收中的
QoS 1&2
数据;
Topic
消息消费偏移量;
Will
数据;
- 订阅数据(Subscription);
Retain
数据(严格来说,它并不算会话数据,只是适合用缓存存储)。
会话信息及状态
会话代表了一次 Client
的登陆,Client
后续的操作都需要绑定在会话上。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class StoredSession {
private String clientId;
private String userName;
private boolean clean;
private int serverId;
private int status;
private long timestamp;
}
|
说明:
- clean: 表明会话是否保持,如果不保持,则每一次登陆都是一个新的会话;
- serverId: 表明登陆的
Broker id
, 客户端重新登陆之后,可能不在之前的 Broker
上。
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public interface ClientSessionRepository {
StoredSession getSession(String clientId);
void addSession(String clientId, StoredSession session);
void updateSession(String clientId, StoredSession session);
void removeSession(String clientId);
boolean contain(String clientId);
}
|
3. 存储格式
1 2 3 4 5 6
|
private static final String SESSION_KEY_FORMAT = "s:ol:%s";
|
发送/接收中的 QoS 1&2
数据
发送中的数据包括:1)QoS 1&2
级别中已发送但未确认的 PUBLISH
消息;2)QoS 2
级别中已发送但未确认的 PUBREL
消息;接收中的数据包括 QoS 2
级别的 PUBLISH
消息。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class PublishInnerMessage{
private String topic;
private boolean retain;
private int qos;
private byte[] payload;
private int messageId;
private long timestamp;
}
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| public interface InflightMessageRepository {
void addMessage(String clientId, PublishInnerMessage message, boolean isSending);
void removeMessage(String clientId, int packetId, boolean isSending);
PublishInnerMessage getMessage(String clientId, int packetId, boolean isSending);
List<PublishInnerMessage> getAllMessages(String clientId, boolean isSending);
boolean contain(String clientId, int packetId, boolean isSending);
void addPubRel(String clientId, int packetId);
void removePubRel(String clientId, int packetId);
Set<Integer> getAllPubRel(String clientId);
void clean(String clientId);
}
|
3. 存储格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
private static final String SESSION_INFLIGHT_KEY_FORMAT = "s:f:p:%s";
private static final String SESSION_PUBREL_KEY_FORMAT = "s:f:r:%s";
private static final String SESSION_RECEIVE_KEY_FORMAT = "s:f:ri:%s";
|
Topic
消息消费偏移量
会话中需要记录 QoS 1&2
级别消息的消费偏移量,这样可以保证 QoS 1&2
消息的语义,消息不会丢,下次登陆时,可以继续消费之前数据。同时,根据偏移量可以实时判断出是否有数据丢失,再从数据库读取缺失的数据,从而保证数据的可靠性。
1. 数据格式
只要记录 Client
, Topic
及 Offset
三者之间的关系即可。
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public interface OffsetRepository {
void addTopicOffset(String clientId, String topic, long offset);
void updateTopicOffset(String clientId, String topic, long offset);
int getTopicOffset(String clientId, String topic);
Map<String, Integer> getAllTopicOffsets(String clientId);
}
|
3. 存储格式
1 2 3 4 5 6
|
private static final String SESSION_TOPIC_OFFSET_FORMAT = "s:t:%s";
|
Will
数据
Will
数据主要是 Client
异常下线之后发送的消息。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Will {
private String topic;
private byte [] payload;
private int qos;
private boolean retained; }
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public interface WillRepository {
Will getWill(String clientId);
void addWill(String clientId, Will will);
void updateWill(String clientId, Will will);
void removeWill(String clientId);
}
|
3. 存储格式
1 2 3 4 5 6
|
private static final String SESSION_WILL_FORMAT = "s:w:%s";
|
订阅数据(Subscription)
订阅数据(Subscription) 主要是存储 Client
与 Topic
之间的订阅关系。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class StoredSubscription {
private int qos;
private String clientId;
private String topicFilter; }
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public interface SubscriptionsRepository {
List<StoredSubscription> getAllSubscriptions(String clientId);
void addSubscription(String clientId, StoredSubscription subscription);
void removeSubscription(String clientId, String topic); }
|
3. 存储格式
1 2 3 4 5 6
|
private static final String SESSION_SUBSCRIPTION_FORMAT = "s:s:%s";
|
Retain
数据
Retain
数据实际是设置在 Topic
维度上的,与单个 Client
没有关系。它是类似 Topic-->List
的结构,便于使用 Redis
中的 KV
结构进行存储。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12
| public class RetainedMessage {
private int qos;
private byte[] payload; }
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public interface RetainedRepository {
void clean(String topic);
void addRetainMessage(String topic, RetainedMessage msg);
List<RetainedMessage> getAllRetainMessage(String topic); }
|
3. 存储格式
1 2 3 4 5 6
|
private static final String TOPIC_RETAIN_FORMAT = "t:r:%s";
|
静态数据
静态数据的特点是需要长期存储且访问不是很频繁,适合存储到数据库中。
消息存储
经过确认的 QoS 1&2
级别的消息最终会存储到数据库中,主要有两个作用:1)可以读取离线数据;2)可以解决消息在集群中传递丢失的问题(消息经过确认落库之后再广播消息,可能存在消息丢失的问题)。这个两个功能都要结合 Client Offset
来实现;
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class StoredMessage {
private long id;
private int packageId;
private String topic;
private int qos;
private byte[] payload;
private long offset; }
|
说明:
- packageId: 消息 id, 在一次会话中是惟一的,同一个
Client
不同会话间, packageId 有可能会重复;
- offset:全局消息 id, 在同一个
Topic
中, offset
是惟一的,不会重复。
在现有的实现中。全局消息 id 通过 Redis
的自增 String Key
来实现,其定义如下:
1 2 3 4 5 6
|
private static final String TOPIC_OFFSET_FORMAT = "t:o:%s";
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public interface MessageRepository {
void addMessage(StoredMessage msg);
StoredMessage getMessage(String topic, long offset);
List<StoredMessage> getAllMessage(String topic, long startOffset); }
|
消息根据 offset
可以实现精准及范围查询。
用户信息
用户信息主要是存储用户基本信息,包括用户名及密码信息。
1. 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class StoredUser {
private String username;
private String password;
private byte status; }
|
2. 操作方法
1 2 3 4 5 6 7 8 9 10 11
| public interface UserRepository {
StoredUser findUser(String username);
}
|
可以根据用户名查询用户信息。