Mqtt 系列:集群通信

每一个 MQTT 节点只负载了部分客户端,要实现消息在集群中不同客户端间的传递,必须要实现集群间的消息共享。本文便是介绍其中的实现方式。

概述

要实现集群间消息的传递,一般有三种方式:

  1. 借助消息中间件,如 RocketMQ, 将 MQTT 中的 Topic 映射到 MQ 中间件的对等实体上,如 Queue, 消息的传递从而转化为 MQ 中间件消息的订阅;
  2. 借助数据库,如 Mysql, 将消息写入到数据库中,MQTT broker 定期获取这些消息,从而实现消息的传递;
  3. Broker 间互联通信,每一个 Broker 节点都与其它结点建立 Tcp 连接,消息通过 TCP 连接广播出去,也可借助第三方组件来实现该功能,如 JGroups.

这三种方式各有优缺点,第一种方式有较大的吞吐量,但需要引入第三方中间件,占用较多资源。第二种方式,系统的瓶颈在于数据库读写能力,一般而言,系统的吞吐量相对较小。第三种方式相对而言,不用额外占用资源,又可获得较好的性能表现,是一种较为适中的方案,缺点是要实现一个安全可靠的通信系统,有一定的技术难度。

本文的方案是采用第三种方式。

集群方案

该方案的本质是建立一个连接所有结点的通信网络,每一个结点既是服务器也是客户端,如下图所示: mqtt-cluster

该通信网络有如下特点:

  1. 每一结点与其它节点都建立一个 Tcp 连接,如果有 n 个结点,则其中一结点要与其它所有的结点建立 n-1 TCP 条连接;
  2. 相同结点之间共享一个连接,如 Server 1,Server 2 两个结点,Server 1-> Server 2, Server 2-> Server 1 使用同一个 TCP 连接;
  3. 整个网络总共有 n*(n-1)/2TCP 连接;
  4. 每一个结点都会赋予一个整数值的编号,如 1,2,3..., 连接总是由编号小的服务器发起;

集群的配置如下:

1
2
3
4
cluster.model=singleton|cluster
server.id=1
server.1=192.168.1.100:3883
server.2=192.168.1.101:3883

每一个结点都会开放出一个 3883 端口来进行集群的通信,本质上来说,每一个结点都是一个 RPC 服务器,所以集群通信组件使用的是本文作者之前编写的 alligator-rpc.

消息类型

在集群中需要通信的有四种消息类型:

消息类型 描述
PUBLISH 消息 客户端发送的 PUBLISH 消息
订阅消息 Broker 节点的订阅关系,用于消息的路由,决定是否向该节点广播消息
服务器登陆消息 服务器之间的认证消息
客户端登陆消息 客户端登陆消息,客户端重新登陆会将之前的客户端下线,该消息用于通知其它节点上的同一个客户端下线

消息路由

每一个 Broker 节点都只负载了部分客户端,一个 Broker 节点收到一条 PUBLISH 消息之后,由于它没有全局的订阅关系视图,它不能判断其它节点是否订阅了该消息的 Topic. 为了让其它节点的客户端也收到该消息,简单的做法是向集群中其它结点广播该消息。但该方案有比较大缺点,向集群广播所有消息,一方向会加大集群的压力,另外一方面也会浪费集群的网络带宽,造成了不必要的消息传输。

一种优化的方案是每一结点都维护一个全局的订阅关系 Ctrie 树,该Ctrie 树包括了集群中所有客户端(客户端有所在服务器的信息)的订阅关系,因此可以判断出那些服务器订阅了消息,从而进行消息的转。但方案也有一个问题,如果客户端数量很多且订阅关系较复杂时,生成的 Ctrie 树会比较大,最终降低了查询匹配的效率。

为了避免 Ctrie 树过大的问题,可以改变数据的粒度,Ctrie 树是以客户端为维度进行匹配,现在转换思路,以服务器为维度来维护订阅关系。

最终的方案以 Topic Filter 首个 token 为索引来维护服务器级别的订阅关系,假定集群有三个结点,分别是 1, 2, 3, 每一个结点都维护了各自的 Topic Filter, 如下表所示:

Topic Filter Sever 首个 token
order/biz,user/new,live/id 1 order,user,live
live/id,meeting/id 2 live,meeting
log/stat 3 log

说明: 首个 token:主要是指 Topic Filter 的第一个层级。

然后再以 首个 token 为索引,建立服务器的订阅关系,如下表所示:

首个 token Server List
order {1}
user {2}
live {1,2}
meeting {2}
log {3}

知道了服务器的订阅关系之后,收到一个 PUBLISH 消息之后,提取出 Topic 中的 首个 token, 索引出订阅的服务器列表便可进行消息的路由转发。

二级路由

假定一个消息在两个节点之间传递,如 Client 1--->Server 1--->Server 2--->Client 2, 节点之间 Server 1--->Server 2 的消息转发由服务器的订阅关系决定,节点到客户端 Server 2--->Client 2 的消息转发由 Ctrie 决定。可以看出,消息在多个节点之间传递时,便存在二级路由关系,即:1) 服务器订阅关系;2)客户端订阅关系 Ctrie.