这篇文章主要分析 ZLMediaKit Udp 服务器的线程模型。
UDP 负载均衡存在的问题 对于 TCP 服务器,由于 TCP 是有连接的协议,天然存在两种类型的 Socket(或 fd),Server Socket 用来监听端口接收连接请求,一个连接请求会生成一个 Clinet Socket,用于数据的传输。一个连接有五个元素(素称五元组)来惟一标识,即协议类型(TCP)、源地址、源端口、目标地址及目标端口,由这些元素可以惟一确定一个 Client Socket,也可以找到其对应的进程(线程)。Reactor 模型就是基于这样的前提设计的,一个或多个线程监听端口处理连接并生成 Client Socket,并将 Client Socket 平均分配给其它线程单独处理读写请求。
对于 UDP 服务器,UDP 是无连接的协议,一个 UDP Socket 没有服务器客户端之分,对于服务器来说,一个 Socket 就可以接受所有的客户端数据,这必然导致数据的处理变得非常复杂,比如,如何区分不同的客户端,如何高效且安全的在多线程下处理客户端数据等。
为了充分利用多核多线程的性能优势,UDP 可以模拟成有连接的 TCP 协议,使用 Reactor 模型,即多线程 + epoll(select) 的编程模型,为了达到这个目标,需要借助以下的技术:
端口重用SO_REUSEADDR、SO_REUSEPORT;
模拟连接,一个客户端请求建立一个 Client,确定五元组(包括协议类型)。
端口重用 SO_REUSEADDR、SO_REUSEPORT UDP 服务器使用一个端口来处理所有客户端请求,一个客户端生成一个 UDP Socket,多个客户端就会生成多个 UDP Socket,这些 UDP Socket 监听相同的服务器地址和端口。要使这些包含相同地址和 端口的 UDP Sokcet 创建成功,需要添加端口重用配置 SO_REUSEADDR、SO_REUSEPORT。
connect 连接 通过端口重用机制,创建了 UDP Socket,五元组已经明确了三个,即协议类型(UDP)、目标地址及目标端口。现在还没有明确客户端地址及客户端端口,这时可以通过 connect 方法传入客户端地址和端口,从而模拟出一个连接,此后,从该客户端发出的数据都会传递给该 UDP Socket。
1 2 3 4 5 6 7 8 9 10 11 12 13 sd = socket (AF_INET, SOCK_DGRAM, 0 ); srv_addr.sin_family = AF_INET; srv_addr.sin_port = htons (9600 ); srv_addr.sin_addr.s_addr = 0 ; bind (sd, (struct sockaddr* )&srv_addr, addrlen);cli_addr.sin_family = AF_INET; cli_addr.sin_port = htons (9601 ); cli_addr.sin_addr.s_addr = inet_addr ("192.168.1.2" ) connect (sd, (struct sockaddr* )&svr_addr, addrlen);
调用 connect 方法之后,这条连接就可以建立了。当一个 UDP Socket 去 connect 一个远端地址和端口时,并没有发送任何的数据包,其效果仅仅是在本地建立了一个五元组 + 映射关系,可以让内核找到对应的 UDP Socket,进而找到对应的进程(或线程)。
使用 connect 方式有一个缺陷,就是客户端的地址和端口必须固定,一旦地址和端口变了,UDP Socket 也就变了。
与 TCP 服务器一样,需要一个 Server UDP Socket 来模拟接收连接请求,一般是接收到第一个数据包的时候就创建一个新的 Clien UDP Socket 来接收后续的数据包。如果客户端数据传输较快,Server UDP Socket 连续收到了多个来自同一个客户端的数据包,此时,需要将后续的包转发给 Clien UDP Socket 所在的线程进行处理。
UDP 服务器的结构与 TCP 服务器基本一致,其整体结构如下所示:
网络模型 与 TcpServer 类似,使用多线程 + epoll (select),一个 Server fd + 多个 epoll 实例。
每一个线程都创建了一个 epoll 实例,并以 ET 边沿触发模式监听同一个 Server fd 的读事件,使用 EPOLLEXCLUSIVE 标志位防止惊群效应,线程阻塞在 epoll_wait上 等待客户端连接。
不同系统有不同的多路复用技术,Linux 系统为 epoll,Windows 为 select, 现以 Linux 为例。
当有客户端发送数据到 Server fd 时,针对该客户端创建一个新的会话(新的文件描述符,同样使用 ET 边沿触发),后续该客户端的数据将不会再发送到Server fd。
ZLMediaKit 使用多线程技术,每一个线程中都创建了自己私有的 epoll 实例,并以 ET 模式监听同一个 server fd 的读( UDP 还有写)事件,这种方式会有惊群效应,所以需要给每一个 fd 事件加上 EPOLLEXCLUSIV E标志位(内核 4.5+ 支持)来避免惊群。后续客户端的 fd 将均匀的分配到这多个线程中处理。
在 ZLMediaKit 中,UDP 服务器与 TCP 服务器结构与流程基本一致,下面说下它们的差异点。
UdpServer 对象 注册 Server Socket 事件 在 UdpServer 对象 setupEvent 方法中,定义了收到 UDP 数据包时的回调方法,在该方法中将会创建 Client UDP Socket,同时处理收到多个数据包的问题。
1 2 3 4 5 6 7 8 9 void UdpServer::setupEvent () { _socket = createSocket (_poller); std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast <UdpServer>(shared_from_this ()); _socket->setOnRead ([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { if (auto strong_self = weak_self.lock ()) { strong_self->onRead (buf, addr, addr_len); } }); }
开启端口重用 在 UdpServer 启动过程中,会绑定服务器地址及端口,并设置端口重用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 int SockUtil::bindUdpSock (const uint16_t port, const char *local_ip, bool enable_reuse) { int fd = -1 ; int family = support_ipv6 () ? (is_ipv4 (local_ip) ? AF_INET : AF_INET6) : AF_INET; if ((fd = (int )socket (family, SOCK_DGRAM, IPPROTO_UDP)) == -1 ) { WarnL << "Create socket failed: " << get_uv_errmsg (true ); return -1 ; } if (enable_reuse) { setReuseable (fd); } setNoSigpipe (fd); setNoBlocked (fd); setSendBuf (fd); setRecvBuf (fd); setCloseWait (fd); setCloExec (fd); if (bind_sock (fd, local_ip, port, family) == -1 ) { close (fd); return -1 ; } return fd; } int SockUtil::setReuseable (int fd, bool on, bool reuse_port) { int opt = on ? 1 : 0 ; int ret = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, static_cast <socklen_t >(sizeof (opt))); if (ret == -1 ) { TraceL << "setsockopt SO_REUSEADDR failed" ; return ret; } #if defined(SO_REUSEPORT) if (reuse_port) { ret = setsockopt (fd, SOL_SOCKET, SO_REUSEPORT, (char *) &opt, static_cast <socklen_t >(sizeof (opt))); if (ret == -1 ) { TraceL << "setsockopt SO_REUSEPORT failed" ; } } #endif return ret; }
Server UDP Socket 在 TCP 服务器中,每一个 TcpServer 副本都会生成一个 Server Socket, 会存在多个 Server Socket,但所有的 Server Socket 指向同一个 Server fd,本质是同一个 Socket。而 UDP 服务器不一样,每一个 UdpServer 副本关联的 Server UDP Socket 都是独立的 Socket,它们指向独立的 Server fd,这些 Server fd 重用同一个服务器地址和端口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void UdpServer::start_l (uint16_t port, const std::string &host) { if (!_socket->bindUdpSock (port, host.c_str ())) { std::string err = (StrPrinter << "Bind udp socket on " << host << " " << port << " failed: " << get_uv_errmsg (true )); throw std::runtime_error (err); } for (auto &pr: _cloned_server) { #if 0 pr.second->_socket->cloneSocket (*_socket); #else pr.second->_socket->bindUdpSock (_socket->get_local_port (), _socket->get_local_ip ()); #endif } InfoL << "UDP server bind to [" << host << "]: " << port; }
创建 Client UDP Socket 当 Server Udp Socket 收到客户端的第一个数据包时,会触发 UdpServer 的 onRead 方法,会检查是否存在 UDP Session,如果不存在,则创建 Client UDP Socket 和 UDP Session,并将二者关联起来。
在下面的代码中,包含如下的内容:
分配负载最小的 EventPoller 线程对象,并与新创建的 Client UDP Socket 关联起来,这样便可将 Client UDP Socket 的读写事件注册到 EventPoller 的 epoll 实例上,由该 EventPoller 线程处理后续的请求;
判断分配的 EventPoller 线程是否是当前线程,如果是,则直接创建 UDP Session 并定义回调方法,如果不是,则向新分配的 EventPoller 线程添加一个异步任务,由该异步任务创建 UDP Session,并且需要复制数据包,回调给 Session 处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 SessionHelper::Ptr UdpServer::createSession (const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { auto socket = createSocket (EventPollerPool::Instance ().getPoller (false ), buf, addr, addr_len); if (!socket) { return nullptr ; } auto addr_str = string ((char *) addr, addr_len); std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast <UdpServer>(shared_from_this ()); auto helper_creator = [this , weak_self, socket, addr_str, id]() -> SessionHelper::Ptr { auto server = weak_self.lock (); if (!server) { return nullptr ; } lock_guard<std::recursive_mutex> lck (*_session_mutex) ; auto it = _session_map->find (id); if (it != _session_map->end ()) { return it->second; } assert (_socket); socket->bindUdpSock (_socket->get_local_port (), _socket->get_local_ip ()); socket->bindPeerAddr ((struct sockaddr *) addr_str.data (), addr_str.size ()); auto helper = _session_alloc(server, socket); helper->session ()->attachServer (*this ); std::weak_ptr<SessionHelper> weak_helper = helper; socket->setOnRead ([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { auto strong_self = weak_self.lock (); if (!strong_self) { return ; } if (id == makeSockId (addr, addr_len)) { if (auto strong_helper = weak_helper.lock ()) { emitSessionRecv (strong_helper, buf); } return ; } strong_self->onRead_l (false , id, buf, addr, addr_len); }); socket->setOnErr ([weak_self, weak_helper, id](const SockException &err) { onceToken token (nullptr , [&]() { auto strong_self = weak_self.lock (); if (!strong_self) { return ; } strong_self->_poller->doDelayTask (kUdpDelayCloseMS, [weak_self, id]() { if (auto strong_self = weak_self.lock ()) { lock_guard<std::recursive_mutex> lck (*strong_self->_session_mutex); strong_self->_session_map->erase (id); } return 0 ; }); }); if (auto strong_helper = weak_helper.lock ()) { TraceP (strong_helper->session ()) << strong_helper->className () << " on err: " << err; strong_helper->enable = false ; strong_helper->session ()->onError (err); } }); auto pr = _session_map->emplace (id, std::move (helper)); assert (pr.second); return pr.first->second; }; if (socket->getPoller ()->isCurrentThread ()) { return helper_creator (); } auto cacheable_buf = std::make_shared <BufferString>(buf->toString ()); socket->getPoller ()->async ([helper_creator, cacheable_buf]() { auto helper = helper_creator (); if (helper) { helper->session ()->getPoller ()->async ([helper, cacheable_buf]() { emitSessionRecv (helper, cacheable_buf); }); } }); return nullptr ; }
创建好 Client UDP Socket 之后,调用 bind 和 connect 方法确定一个连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 socket->bindUdpSock (_socket->get_local_port (), _socket->get_local_ip ()); socket->bindPeerAddr ((struct sockaddr *) addr_str.data (), addr_str.size ()); bool Socket::bindUdpSock (uint16_t port, const string &local_ip, bool enable_reuse) { closeSock (); int fd = SockUtil::bindUdpSock (port, local_ip.data (), enable_reuse); if (fd == -1 ) { return false ; } return fromSock_l (std::make_shared <SockNum>(fd, SockNum::Sock_UDP)); } bool Socket::bindPeerAddr (const struct sockaddr *dst_addr, socklen_t addr_len, bool soft_bind) { LOCK_GUARD (_mtx_sock_fd); if (!_sock_fd) { return false ; } if (_sock_fd->type () != SockNum::Sock_UDP) { return false ; } addr_len = addr_len ? addr_len : SockUtil::get_sock_len (dst_addr); if (soft_bind) { _udp_send_dst = std::make_shared <struct sockaddr_storage>(); memcpy (_udp_send_dst.get (), dst_addr, addr_len); } else { _udp_send_dst = nullptr ; if (-1 == ::connect (_sock_fd->rawFd (), dst_addr, addr_len)) { WarnL << "Connect socket to peer address failed: " << SockUtil::inet_ntoa (dst_addr); return false ; } memcpy (&_peer_addr, dst_addr, addr_len); } return true ; }
处理多数据包的问题 Server UDP Socket 可能会收到来自同一个客户端的多个数据包,第一个数据包会触发创建 Client UDP Socket 和 UDP Session。如果不作处理会存在并发问题,可能会导致同一个客户端创建多个 Client UDP Socket。在 ZLMediaKit 中,使用锁机制来控制并发或多个数据包的问题,只能有一个线程获取锁创建 Client UDP Socket 和 UDP Session,其它线程(或同一个线程)都会被阻塞,直到 Session 创建成功。后续的请求只需要获取 Session 即可。代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 SessionHelper::Ptr UdpServer::getOrCreateSession (const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { { std::lock_guard<std::recursive_mutex> lock (*_session_mutex) ; auto it = _session_map->find (id); if (it != _session_map->end ()) { return it->second; } } is_new = true ; return createSession (id, buf, addr, addr_len); }
获取 Session 之后,会将数据回调给 Session,由上层处理来处理。如果收到数据的是 Server UDP Socket,则需要将数据传递给 Client UDP Socket 所在的 EventPoller 线程,主要是通过向 EventPoller 线程添加一个异步任务来完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void UdpServer::onRead_l (bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { bool is_new = false ; if (auto helper = getOrCreateSession (id, buf, addr, addr_len, is_new)) { if (helper->session ()->getPoller ()->isCurrentThread ()) { emitSessionRecv (helper, buf); } else { WarnL << "UDP packet incoming from other thread" ; std::weak_ptr<SessionHelper> weak_helper = helper; auto cacheable_buf = std::make_shared <BufferString>(buf->toString ()); helper->session ()->async ([weak_helper, cacheable_buf]() { if (auto strong_helper = weak_helper.lock ()) { emitSessionRecv (strong_helper, cacheable_buf); } }); } #if !defined(NDEBUG) if (!is_new) { TraceL << "UDP packet incoming from " << (is_server_fd ? "server fd" : "other peer fd" ); } #endif } }
参考:
1. 《ZLToolKit源码学习笔记》(22)网络模块之UdpServer 2. 告知你不为人知的 UDP:连接性和负载均衡