Mqtt 系列:集群通信
每一个 MQTT
节点只负载了部分客户端,要实现消息在集群中不同客户端间的传递,必须要实现集群间的消息共享。本文便是介绍其中的实现方式。
概述
要实现集群间消息的传递,一般有三种方式:
- 借助消息中间件,如 RocketMQ, 将
MQTT
中的Topic
映射到MQ
中间件的对等实体上,如Queue
, 消息的传递从而转化为MQ
中间件消息的订阅; - 借助数据库,如 Mysql, 将消息写入到数据库中,
MQTT broker
定期获取这些消息,从而实现消息的传递; - Broker 间互联通信,每一个 Broker 节点都与其它结点建立 Tcp 连接,消息通过 TCP 连接广播出去,也可借助第三方组件来实现该功能,如 JGroups.
这三种方式各有优缺点,第一种方式有较大的吞吐量,但需要引入第三方中间件,占用较多资源。第二种方式,系统的瓶颈在于数据库读写能力,一般而言,系统的吞吐量相对较小。第三种方式相对而言,不用额外占用资源,又可获得较好的性能表现,是一种较为适中的方案,缺点是要实现一个安全可靠的通信系统,有一定的技术难度。
本文的方案是采用第三种方式。
集群方案
该方案的本质是建立一个连接所有结点的通信网络,每一个结点既是服务器也是客户端,如下图所示:
该通信网络有如下特点:
- 每一结点与其它节点都建立一个
Tcp
连接,如果有n
个结点,则其中一结点要与其它所有的结点建立n-1 TCP
条连接; - 相同结点之间共享一个连接,如
Server 1,Server 2
两个结点,Server 1-> Server 2
,Server 2-> Server 1
使用同一个TCP
连接; - 整个网络总共有
n*(n-1)/2
条TCP
连接; - 每一个结点都会赋予一个整数值的编号,如
1,2,3...
, 连接总是由编号小的服务器发起;
集群的配置如下:
1 | cluster.model=singleton|cluster |
每一个结点都会开放出一个 3883
端口来进行集群的通信,本质上来说,每一个结点都是一个 RPC
服务器,所以集群通信组件使用的是本文作者之前编写的 alligator-rpc
.
消息类型
在集群中需要通信的有四种消息类型:
消息类型 | 描述 |
---|---|
PUBLISH 消息 | 客户端发送的 PUBLISH 消息 |
订阅消息 | Broker 节点的订阅关系,用于消息的路由,决定是否向该节点广播消息 |
服务器登陆消息 | 服务器之间的认证消息 |
客户端登陆消息 | 客户端登陆消息,客户端重新登陆会将之前的客户端下线,该消息用于通知其它节点上的同一个客户端下线 |
消息路由
每一个 Broker 节点都只负载了部分客户端,一个 Broker 节点收到一条 PUBLISH
消息之后,由于它没有全局的订阅关系视图,它不能判断其它节点是否订阅了该消息的 Topic
. 为了让其它节点的客户端也收到该消息,简单的做法是向集群中其它结点广播该消息。但该方案有比较大缺点,向集群广播所有消息,一方向会加大集群的压力,另外一方面也会浪费集群的网络带宽,造成了不必要的消息传输。
一种优化的方案是每一结点都维护一个全局的订阅关系 Ctrie
树,该Ctrie
树包括了集群中所有客户端(客户端有所在服务器的信息)的订阅关系,因此可以判断出那些服务器订阅了消息,从而进行消息的转。但方案也有一个问题,如果客户端数量很多且订阅关系较复杂时,生成的 Ctrie
树会比较大,最终降低了查询匹配的效率。
为了避免 Ctrie
树过大的问题,可以改变数据的粒度,Ctrie
树是以客户端为维度进行匹配,现在转换思路,以服务器为维度来维护订阅关系。
最终的方案以 Topic Filter
首个 token
为索引来维护服务器级别的订阅关系,假定集群有三个结点,分别是 1, 2, 3
, 每一个结点都维护了各自的 Topic Filter
, 如下表所示:
Topic Filter | Sever | 首个 token |
---|---|---|
order/biz,user/new,live/id | 1 | order,user,live |
live/id,meeting/id | 2 | live,meeting |
log/stat | 3 | log |
说明:
首个 token:主要是指 Topic Filter
的第一个层级。
然后再以 首个 token
为索引,建立服务器的订阅关系,如下表所示:
首个 token | Server List |
---|---|
order | {1} |
user | {2} |
live | {1,2} |
meeting | {2} |
log | {3} |
知道了服务器的订阅关系之后,收到一个 PUBLISH
消息之后,提取出 Topic
中的 首个 token
, 索引出订阅的服务器列表便可进行消息的路由转发。
二级路由
假定一个消息在两个节点之间传递,如 Client 1--->Server 1--->Server 2--->Client 2
, 节点之间 Server 1--->Server 2
的消息转发由服务器的订阅关系决定,节点到客户端 Server 2--->Client 2
的消息转发由 Ctrie
决定。可以看出,消息在多个节点之间传递时,便存在二级路由关系,即:1) 服务器订阅关系;2)客户端订阅关系 Ctrie
.