这篇文章主要分析 ZLMediaKit TCP 服务器的线程模型。
概述 在 Java 体系的 Netty 框架中使用了 Reactor 网络开发模型,即一个线程负责监听接收 Accept 请求,多个工作线程(或 IO 线程)负责具体连接 Socket 的读写请求,结合 epoll 或 select 等技术,可以有效提高网络的读写效率,相关知识可以参见之前的文章 Netty 系列:Reactor 。
使用 Reactor 模型有一个缺点:Accept 处理瓶颈。Accept 请求只由一个线程来处理,在短时间内大量客户端同时连接的场景下,可能会有性能问题。这个问题的本质是单线程处理能力有限,可以使用多线程来解决这个问题。
在 ZLMediaKit 最新的版本中,便使用多线程来处理 Accept 请求,并使用负载均衡算法将客户端连接平均分配到某个工作线程中,整体的结构如下图所示:
对象介绍:
TcpServer:TCP 服务器对象,负责服务的启动、端口的监听、客户端连接的创建及 Session 会话的创建等功能;
Server Socket:ZLMediaKit 抽象的 Socket 对象,代表服务器 Socket 对象,负责端口的监听、接收 Accept 事件回调等功能;
Client Socket:ZLMediaKit 抽象的 Socket 对象,代表客户端 Socket 对象,表示与客户端的一个连接,负责1)读取数据并传递给 Session 对象,2)将数据写入到网络上;
SockNum:持有操作系统 Socket 文件描述符,并有一个类型字段,表明 Socket 的类型,有四种类型,分别是:Sock_Invalid,Sock_TCP,Sock_UDP 和 Sock_TCP_Server;
fd 文件描述符:操作系统 Socket 文件描述符;
Session:会话对象,Socket 的读写操作最终由 Session 对象来处理;
EventPoller:事件轮询对象,它持有 epoll 或 select 文件描述符, Server Socket 和 Client Socket 将添加到 epoll 或 select 结构中,EventPoller 负责事件轮询,将读写事件回调给 Socket 对象,另外,在 EventPoller 中,持有一个 PipeWrap 对象,用于触发内部事件;
thread:操作系统线程,一个 EventPoller 对象持有一个 thread 对象;
EventPollerPool:EventPoller 容器,由多个 EventPoller 对象组成,对象的数量一般就是 cpu 的数量;
网络模型 使用多线程 + epoll (select),一个 Server fd + 多个 epoll 实例。每一个线程都创建了一个 epoll 实例,并以 ET 边沿触发模式监听同一个 Server fd 的读事件,使用 EPOLLEXCLUSIVE 标志位防止惊群效应,线程阻塞在 epoll_wait上 等待客户端连接。
不同系统有不同的多路复用技术,Linux 系统为 epoll,Windows 为 select, 现以 Linux 为例。
当有客户端发送数据到 Server fd 时,针对该客户端创建一个新的会话(新的文件描述符,同样使用 ET 边沿触发),后续该客户端的数据将不会再发送到Server fd。
ZLMediaKit 使用多线程技术,每一个线程中都创建了自己私有的 epoll 实例,并以 ET 模式监听同一个 server fd 的读事件,这种方式会有惊群效应,所以需要给每一个 fd 事件加上 EPOLLEXCLUSIV E标志位(内核4.5+支持)来避免惊群。后续客户端的 fd 将均匀的分配到这多个线程中处理。
TcpServer 对象 TcpServer 是 TCP 模块的关键对象,它负责其它对象的创建工作,如: EventPoller,Socket 及 Session 等等。一个 TcpServer 对象包含一个 EventPoller 对象和一个 Server Socket 对象,而 Server Socket 会向 EventPoller 对象注册读事件,所有的 Server Socket 对象包含相同的文件描述符。
EventPoller 的数量一般是跟 cpu 的核数(同时能够处理的任务数)相关,同时一个 EventPoller 对象会关联到一个 TcpServer 对象中,这就导致生成多个 TcpServer 对象的副本,最终 TcpServer 对象的数量跟 EventPoller 数量是一样的。作者为什么这么设计呢?据我分析,主要的目的为了实现 Server Socket 对象的负载均衡。Server Socket 存放在 TcpServer 对象中,而 Server Socket 对象的数量是与 EventPoller 数量一致的, 它要向所有 EventPoller 对象注册 Accept 事件,作为 Server Socket 的容器对象,必然也要有多个。这纯粹是个人分析,若有错误,还请指正。
创建 EventPoller TcpServer 对象创建时便会创建多个 EventPoller 对象,数量一般根据 cpu 数量来确定,代码路径:TcpServer -> Server -> EventPollerPool -> TaskExecutorGetterImp.addPoller
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 size_t TaskExecutorGetterImp::addPoller (const string &name, size_t size, int priority, bool register_thread, bool enable_cpu_affinity) { auto cpus = thread::hardware_concurrency (); size = size > 0 ? size : cpus; for (size_t i = 0 ; i < size; ++i) { auto full_name = name + " " + to_string (i); auto cpu_index = i % cpus; EventPoller::Ptr poller (new EventPoller(full_name)) ; poller->runLoop (false , register_thread); poller->async ([cpu_index, full_name, priority, enable_cpu_affinity]() { ThreadPool::setPriority ((ThreadPool::Priority)priority); setThreadName (full_name.data ()); if (enable_cpu_affinity) { setThreadAffinity (cpu_index); } }); _threads.emplace_back (std::move (poller)); } return size; }
在 TcpServer 的启动操作中,会完成 TcpServer 副本对象的创建,数量与 EventPoller 对象一致,并与 EventPoller 完成绑定,最后每一个 TcpServer 对象都会持有一个 EventPoller 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void TcpServer::start_l (uint16_t port, const std::string &host, uint32_t backlog) { setupEvent (); weak_ptr<TcpServer> weak_self = std::static_pointer_cast <TcpServer>(shared_from_this ()); _timer = std::make_shared <Timer>(2.0f , [weak_self]() -> bool { auto strong_self = weak_self.lock (); if (!strong_self) { return false ; } strong_self->onManagerSession (); return true ; }, _poller); EventPollerPool::Instance ().for_each([&](const TaskExecutor::Ptr &executor) { EventPoller::Ptr poller = static_pointer_cast <EventPoller>(executor); if (poller == _poller) { return ; } auto &serverRef = _cloned_server[poller.get ()]; if (!serverRef) { serverRef = onCreatServer (poller); } if (serverRef) { serverRef->cloneFrom (*this ); } }); if (!_socket->listen (port, host.c_str (), backlog)) { string err = (StrPrinter << "Listen on " << host << " " << port << " failed: " << get_uv_errmsg (true )); throw std::runtime_error (err); } for (auto &pr: _cloned_server) { pr.second->_socket->cloneSocket (*_socket); } InfoL << "TCP server listening on [" << host << "]: " << port; }
如上代码所示,TcpServer 副本会复制主 TcpServer 对象的数据,包括 Socket 对象。
创建 Server Socket 在 TcpServer 启动过程中,也会创建 Server Socket 对象,并设置 OnBeforeAccept 和 OnAccept 函数,这两个函数在接收 Accept 事件时触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void TcpServer::setupEvent () { _socket = createSocket (_poller); weak_ptr<TcpServer> weak_self = std::static_pointer_cast <TcpServer>(shared_from_this ()); _socket->setOnBeforeAccept ([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr { if (auto strong_self = weak_self.lock ()) { return strong_self->onBeforeAcceptConnection (poller); } return nullptr ; }); _socket->setOnAccept ([weak_self](Socket::Ptr &sock, shared_ptr<void > &complete) { if (auto strong_self = weak_self.lock ()) { auto ptr = sock->getPoller ().get (); auto server = strong_self->getServer (ptr); ptr->async ([server, sock, complete]() { server->onAcceptConnection (sock); }); } }); }
负载均衡 负载均衡主要包括两个方面:1)Accept 负责均衡;2)客户端连接 Clien Socket 负载均衡。
Accept 负载均衡 为了解决 Accept 请求瓶颈的问题,使用多线程同时处理 Accept 请求,一个请求只能由一个线程处理,具体分配那个线程取决于操作系统负责均衡的算法。
每一个 EventPoller 线程都创建了一个 epoll 实例,并以 ET 边沿触发模式监听同一个 Server fd 的读事件,使用 EPOLLEXCLUSIVE 标志位防止惊群效应,线程阻塞在 epoll_wait 上等待客户端连接。
Server fd 包含在 Server Socket 对象中,且所有的 Server Socket 对象都指向同一个 Server fd。该 Server fd 注册到所有 EventPoller 对象的 epoll 实例上。有连接请求时,由操作系统负责分配一个 EventPoller 线程处理,从而实现了 Accept 的负载均衡。
在上面的内容中讲到,TcpServer 与 EventPoller 一一对应,Server Socket 对象会向 EventPoller 注册事件回调,注册有两个时机,一是调用 Socket listen 方法时,二是复制 Socket 对象时。具体内容在后续的章节中进行描述。
Client Socket 负载均衡 EventPoller 收到 Accept 事件时会生成一个 Client Socket 客户端连接对象,该客户端连接对象最终分配给哪一个 EventPoller 线程处理?一般有两个策略:
使用当前 EventPoller 线程,即处理 Accept 请求的 EventPoller 线程;
根据业务负载均衡算法,分配一个负载较小的 EventPoller 线程。
第 1 种方法相对比较简单,负载是否均衡取决于操作系统底层的负载均衡算法,根据 ZLMediaKit 作者的测试,在 cpu 核数较多的情况下,负载均衡算法不是很理想。第 2 种方法则是根据业务上的统计指标,分配一个负载较小的的线程。实际上,在 ZLMediaKit 中都使用了这两种方法,优先使用第 2 中方法,第 2 种方法失败了,则使用第 1 种的方法,代码所下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 int Socket::onAccept (const SockNum::Ptr &sock, int event) noexcept { int fd; struct sockaddr_storage peer_addr; socklen_t addr_len = sizeof (peer_addr); while (true ) { if (event & EventPoller::Event_Read) { do { fd = (int )accept (sock->rawFd (), (struct sockaddr *)&peer_addr, &addr_len); } while (-1 == fd && UV_EINTR == get_uv_error (true )); ... Socket::Ptr peer_sock; try { LOCK_GUARD (_mtx_event); peer_sock = _on_before_accept(_poller); } catch (std::exception &ex) { ErrorL << "Exception occurred when emit on_before_accept: " << ex.what (); close (fd); continue ; } if (!peer_sock) { peer_sock = Socket::createSocket (_poller, false ); } ... } }
优先调用 _on_before_accept 回调方法创建客户端 Socket, 失败了则使用当前 EventPoller 线程创建。
_on_before_accept 回调方法在创建 Server Socket 时指定,具体内容包括:
1 2 3 4 5 6 7 8 9 10 11 12 _socket->setOnBeforeAccept ([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr { if (auto strong_self = weak_self.lock ()) { return strong_self->onBeforeAcceptConnection (poller); } return nullptr ; }); Socket::Ptr TcpServer::onBeforeAcceptConnection (const EventPoller::Ptr &poller) { assert (_poller->isCurrentThread ()); return createSocket (EventPollerPool::Instance ().getPoller (false )); }
在 _on_before_accept 方法中最终会调用 TcpServer 的 onBeforeAcceptConnection 方法,在该方法中调用 EventPollerPool 的 getPoller 方法分配一个 EventPoller 一个线程。
分配的算法在 TaskExecutor 中实现,具体就是取最小负载的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 TaskExecutor::Ptr TaskExecutorGetterImp::getExecutor () { auto thread_pos = _thread_pos; if (thread_pos >= _threads.size ()) { thread_pos = 0 ; } TaskExecutor::Ptr executor_min_load = _threads[thread_pos]; auto min_load = executor_min_load->load (); for (size_t i = 0 ; i < _threads.size (); ++i) { ++thread_pos; if (thread_pos >= _threads.size ()) { thread_pos = 0 ; } auto th = _threads[thread_pos]; auto load = th->load (); if (load < min_load) { min_load = load; executor_min_load = th; } if (min_load == 0 ) { break ; } } _thread_pos = thread_pos; return executor_min_load; }
事件回调 在上面的内容中,我们提到,Socket 将读写事件注册到 EventPoller 对象上。实际上是注册到 EventPoller 对象中的多路复用对象上,如 epoll,kqueue 和 select。ZLMediaKit 会根据系统类型自行选择。
事件注册 Socket 事件注册通过 attachEvent 方法完成,其中 Tcp 服务器注册读和错误事件,回调方法为 onAccept,而客户端会注册读写和错误事件。这些读写事件最终会添加到 EventPoller 对象的 epoll,kqueue 和 select 结构中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 bool Socket::attachEvent (const SockNum::Ptr &sock) { weak_ptr<Socket> weak_self = shared_from_this (); if (sock->type () == SockNum::Sock_TCP_Server) { auto result = _poller->addEvent (sock->rawFd (), EventPoller::Event_Read | EventPoller::Event_Error, [weak_self, sock](int event) { if (auto strong_self = weak_self.lock ()) { strong_self->onAccept (sock, event); } }); return -1 != result; } auto read_buffer = _poller->getSharedBuffer (); auto result = _poller->addEvent (sock->rawFd (), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) { auto strong_self = weak_self.lock (); if (!strong_self) { return ; } if (event & EventPoller::Event_Read) { strong_self->onRead (sock, read_buffer); } if (event & EventPoller::Event_Write) { strong_self->onWriteAble (sock); } if (event & EventPoller::Event_Error) { strong_self->emitErr (getSockErr (sock->rawFd ())); } }); return -1 != result; }
Socket 回调方法说明:
onAccept: 处理 Servet socket Accept 事件;
onRead:处理 Client socket Reead 事件;
onWriteAble:处理 Client socket 可写事件;
emitErr:处理 Client socket Error 事件
以 epoll 为例,Socket 事件添加到 epoll 结构中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int EventPoller::addEvent (int fd, int event, PollEventCB cb) { TimeTicker (); if (!cb) { WarnL << "PollEventCB is empty" ; return -1 ; } if (isCurrentThread ()) { #if defined(HAS_EPOLL) struct epoll_event ev = {0 }; ev.events = (toEpoll (event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; int ret = epoll_ctl (_event_fd, EPOLL_CTL_ADD, fd, &ev); if (ret != -1 ) { _event_map.emplace (fd, std::make_shared <PollEventCB>(std::move (cb))); } return ret; ... }
Socket 的文件描述符通过系统调用 epoll_ctl 添加到 epoll 中,后续产生的相关事件都会触发回调方法的调用。
事件触发 在 EventPoller 中,通过 runLoop 方法进行事件轮询,检查 Socket 文件描述符是否有事件产生,有的话则调用注册的回调方法。这是采用事件驱动的编程模式,有事件就触发事件的回调。代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 void EventPoller::runLoop (bool blocked, bool ref_self) { if (blocked) { if (ref_self) { s_current_poller = shared_from_this (); } _sem_run_started.post (); _exit_flag = false ; uint64_t minDelay; #if defined(HAS_EPOLL) struct epoll_event events[EPOLL_SIZE]; while (!_exit_flag) { minDelay = getMinDelay (); startSleep (); int ret = epoll_wait (_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1 ); sleepWakeUp (); if (ret <= 0 ) { continue ; } _event_cache_expired.clear (); for (int i = 0 ; i < ret; ++i) { struct epoll_event &ev = events[i]; int fd = ev.data.fd; if (_event_cache_expired.count (fd)) { continue ; } auto it = _event_map.find (fd); if (it == _event_map.end ()) { epoll_ctl (_event_fd, EPOLL_CTL_DEL, fd, nullptr ); continue ; } auto cb = it->second; try { (*cb)(toPoller (ev.events)); } catch (std::exception &ex) { ErrorL << "Exception occurred when do event task: " << ex.what (); } } } ... } else { _loop_thread = new thread (&EventPoller::runLoop, this , true , ref_self); _sem_run_started.wait (); } }
Accept 回调 在 Socket Accept 回调中,主要是创建 Client Socket,并根据负载均衡方法,分配一个负载最小的 EventPoller 线程。并最终调用到 TcpServer 对象中的 onAcceptConnection 方法,在方法中完成如下功能:
创建 Session 对象;
注册 Client Socket 事件回调,将读写事件路由到 Session 对象的方法中。
根据业务的不同,可以有多种的 Session 子类,如 RtspSession,RtmpSession 和 HttpSession。Socket 的数据最终将由这些 Session 对象来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 Session::Ptr TcpServer::onAcceptConnection (const Socket::Ptr &sock) { assert (_poller->isCurrentThread ()); weak_ptr<TcpServer> weak_self = std::static_pointer_cast <TcpServer>(shared_from_this ()); auto helper = _session_alloc(std::static_pointer_cast <TcpServer>(shared_from_this ()), sock); auto session = helper->session (); session->attachServer (*this ); auto success = _session_map.emplace (helper.get (), helper).second; assert (success == true ); weak_ptr<Session> weak_session = session; sock->setOnRead ([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int ) { auto strong_session = weak_session.lock (); if (!strong_session) { return ; } try { strong_session->onRecv (buf); } catch (SockException &ex) { strong_session->shutdown (ex); } catch (exception &ex) { strong_session->shutdown (SockException (Err_shutdown, ex.what ())); } }); SessionHelper *ptr = helper.get (); auto cls = ptr->className (); sock->setOnErr ([weak_self, weak_session, ptr, cls](const SockException &err) { onceToken token (nullptr , [&]() { auto strong_self = weak_self.lock (); if (!strong_self) { return ; } assert (strong_self->_poller->isCurrentThread ()); if (!strong_self->_is_on_manager) { strong_self->_session_map.erase (ptr); } else { strong_self->_poller->async ([weak_self, ptr]() { auto strong_self = weak_self.lock (); if (strong_self) { strong_self->_session_map.erase (ptr); } }, false ); } }); auto strong_session = weak_session.lock (); if (strong_session) { TraceP (strong_session) << cls << " on err: " << err; strong_session->onError (err); } }); return session; }
TcpServer 使用实例 TcpServer 使用如下所示,启动时需要指定一个 Session 实例对象,该对象承担了上层的业务数据处理。
1 2 TcpServer::Ptr server(new TcpServer()); server->start<EchoSession>(9000);//监听9000端口
参考:
1. 《ZLToolKit源码学习笔记》(20)网络模块之TcpServer