ZLMediaKit UDP 线程模型
这篇文章主要分析 ZLMediaKit Udp 服务器的线程模型。
UDP 负载均衡存在的问题
对于 TCP 服务器,由于 TCP 是有连接的协议,天然存在两种类型的 Socket(或 fd),Server Socket 用来监听端口接收连接请求,一个连接请求会生成一个 Clinet Socket,用于数据的传输。一个连接有五个元素(素称五元组)来惟一标识,即协议类型(TCP)、源地址、源端口、目标地址及目标端口,由这些元素可以惟一确定一个 Client Socket,也可以找到其对应的进程(线程)。Reactor 模型就是基于这样的前提设计的,一个或多个线程监听端口处理连接并生成 Client Socket,并将 Client Socket 平均分配给其它线程单独处理读写请求。
对于 UDP 服务器,UDP 是无连接的协议,一个 UDP Socket 没有服务器客户端之分,对于服务器来说,一个 Socket 就可以接受所有的客户端数据,这必然导致数据的处理变得非常复杂,比如,如何区分不同的客户端,如何高效且安全的在多线程下处理客户端数据等。
为了充分利用多核多线程的性能优势,UDP 可以模拟成有连接的 TCP 协议,使用 Reactor 模型,即多线程 + epoll(select) 的编程模型,为了达到这个目标,需要借助以下的技术:
- 端口重用SO_REUSEADDR、SO_REUSEPORT;
- 模拟连接,一个客户端请求建立一个 Client,确定五元组(包括协议类型)。
端口重用 SO_REUSEADDR、SO_REUSEPORT
UDP 服务器使用一个端口来处理所有客户端请求,一个客户端生成一个 UDP Socket,多个客户端就会生成多个 UDP Socket,这些 UDP Socket 监听相同的服务器地址和端口。要使这些包含相同地址和 端口的 UDP Sokcet 创建成功,需要添加端口重用配置 SO_REUSEADDR、SO_REUSEPORT。
connect 连接
通过端口重用机制,创建了 UDP Socket,五元组已经明确了三个,即协议类型(UDP)、目标地址及目标端口。现在还没有明确客户端地址及客户端端口,这时可以通过 connect 方法传入客户端地址和端口,从而模拟出一个连接,此后,从该客户端发出的数据都会传递给该 UDP Socket。
1 | sd = socket(AF_INET, SOCK_DGRAM, 0); |
调用 connect 方法之后,这条连接就可以建立了。当一个 UDP Socket 去 connect 一个远端地址和端口时,并没有发送任何的数据包,其效果仅仅是在本地建立了一个五元组 + 映射关系,可以让内核找到对应的 UDP Socket,进而找到对应的进程(或线程)。
使用 connect 方式有一个缺陷,就是客户端的地址和端口必须固定,一旦地址和端口变了,UDP Socket 也就变了。
与 TCP 服务器一样,需要一个 Server UDP Socket 来模拟接收连接请求,一般是接收到第一个数据包的时候就创建一个新的 Clien UDP Socket 来接收后续的数据包。如果客户端数据传输较快,Server UDP Socket 连续收到了多个来自同一个客户端的数据包,此时,需要将后续的包转发给 Clien UDP Socket 所在的线程进行处理。
UDP 服务器的结构与 TCP 服务器基本一致,其整体结构如下所示:
网络模型
与 TcpServer 类似,使用多线程 + epoll (select),一个 Server fd + 多个 epoll 实例。
每一个线程都创建了一个 epoll 实例,并以 ET 边沿触发模式监听同一个 Server fd 的读事件,使用 EPOLLEXCLUSIVE 标志位防止惊群效应,线程阻塞在 epoll_wait上 等待客户端连接。
不同系统有不同的多路复用技术,Linux 系统为 epoll,Windows 为 select, 现以 Linux 为例。
当有客户端发送数据到 Server fd 时,针对该客户端创建一个新的会话(新的文件描述符,同样使用 ET 边沿触发),后续该客户端的数据将不会再发送到Server fd。
ZLMediaKit 使用多线程技术,每一个线程中都创建了自己私有的 epoll 实例,并以 ET 模式监听同一个 server fd 的读( UDP 还有写)事件,这种方式会有惊群效应,所以需要给每一个 fd 事件加上 EPOLLEXCLUSIV E标志位(内核 4.5+ 支持)来避免惊群。后续客户端的 fd 将均匀的分配到这多个线程中处理。
在 ZLMediaKit 中,UDP 服务器与 TCP 服务器结构与流程基本一致,下面说下它们的差异点。
UdpServer 对象
注册 Server Socket 事件
在 UdpServer 对象 setupEvent 方法中,定义了收到 UDP 数据包时的回调方法,在该方法中将会创建 Client UDP Socket,同时处理收到多个数据包的问题。
1 | void UdpServer::setupEvent() { |
开启端口重用
在 UdpServer 启动过程中,会绑定服务器地址及端口,并设置端口重用。
1 |
|
Server UDP Socket
在 TCP 服务器中,每一个 TcpServer 副本都会生成一个 Server Socket, 会存在多个 Server Socket,但所有的 Server Socket 指向同一个 Server fd,本质是同一个 Socket。而 UDP 服务器不一样,每一个 UdpServer 副本关联的 Server UDP Socket 都是独立的 Socket,它们指向独立的 Server fd,这些 Server fd 重用同一个服务器地址和端口。
1 | void UdpServer::start_l(uint16_t port, const std::string &host) { |
创建 Client UDP Socket
当 Server Udp Socket 收到客户端的第一个数据包时,会触发 UdpServer 的 onRead 方法,会检查是否存在 UDP Session,如果不存在,则创建 Client UDP Socket 和 UDP Session,并将二者关联起来。
在下面的代码中,包含如下的内容:
- 分配负载最小的 EventPoller 线程对象,并与新创建的 Client UDP Socket 关联起来,这样便可将 Client UDP Socket 的读写事件注册到 EventPoller 的 epoll 实例上,由该 EventPoller 线程处理后续的请求;
- 判断分配的 EventPoller 线程是否是当前线程,如果是,则直接创建 UDP Session 并定义回调方法,如果不是,则向新分配的 EventPoller 线程添加一个异步任务,由该异步任务创建 UDP Session,并且需要复制数据包,回调给 Session 处理。
1 | SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { |
创建好 Client UDP Socket 之后,调用 bind 和 connect
方法确定一个连接。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 绑定服务端地址和端口
socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
// 连接客户端地址和端口
socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());
// bindUdpSock 方法
bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reuse) {
closeSock();
int fd = SockUtil::bindUdpSock(port, local_ip.data(), enable_reuse);
if (fd == -1) {
return false;
}
return fromSock_l(std::make_shared<SockNum>(fd, SockNum::Sock_UDP));
}
// bindPeerAddr 方法,调用 connect 方法
bool Socket::bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len, bool soft_bind) {
LOCK_GUARD(_mtx_sock_fd);
if (!_sock_fd) {
return false;
}
if (_sock_fd->type() != SockNum::Sock_UDP) {
return false;
}
addr_len = addr_len ? addr_len : SockUtil::get_sock_len(dst_addr);
if (soft_bind) {
// 软绑定,只保存地址
_udp_send_dst = std::make_shared<struct sockaddr_storage>();
memcpy(_udp_send_dst.get(), dst_addr, addr_len);
} else {
// 硬绑定后,取消软绑定,防止memcpy目标地址的性能损失
_udp_send_dst = nullptr;
if (-1 == ::connect(_sock_fd->rawFd(), dst_addr, addr_len)) {
WarnL << "Connect socket to peer address failed: " << SockUtil::inet_ntoa(dst_addr);
return false;
}
memcpy(&_peer_addr, dst_addr, addr_len);
}
return true;
}
处理多数据包的问题
Server UDP Socket 可能会收到来自同一个客户端的多个数据包,第一个数据包会触发创建 Client UDP Socket 和 UDP Session。如果不作处理会存在并发问题,可能会导致同一个客户端创建多个 Client UDP Socket。在 ZLMediaKit 中,使用锁机制来控制并发或多个数据包的问题,只能有一个线程获取锁创建 Client UDP Socket 和 UDP Session,其它线程(或同一个线程)都会被阻塞,直到 Session 创建成功。后续的请求只需要获取 Session 即可。代码如下所示:
1 | SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { |
获取 Session 之后,会将数据回调给 Session,由上层处理来处理。如果收到数据的是 Server UDP Socket,则需要将数据传递给 Client UDP Socket 所在的 EventPoller 线程,主要是通过向 EventPoller 线程添加一个异步任务来完成。
1 | void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { |
参考:
1. 《ZLToolKit源码学习笔记》(22)网络模块之UdpServer 2. 告知你不为人知的 UDP:连接性和负载均衡