ZLMediaKit源码分析(一)服务启动

ZLMediaKit源码分析(一)服务启动
ZLMediaKit源码分析(二)推流创建
ZLMediaKit源码分析(三)拉流创建

服务创建流程

  1. 创建TcpServer实例,其父类Server同时被实例化。
  2. EventPollerPool初始化线程池。批量创建EventPoller,每个EventPoller对应一个线程,对应一个CPU核。该线程池线程数量对应CPU核数或者由命令行参数-t指定。
  3. 注册TcpServer::_on_create_socket回调,该回调创建Socekt,并且绑定EventPoller。
  4. 启动TcpServer,此时该TcpServer尚未绑定Socket。
  5. 注册TcpServer::_session_alloc回调,该回调函数创建Session,SessionHelper。
    5.1. Session,SessionHelper一一对应,SessionHelper操作全局静态变量session_map,管理session。
    5.2. 注册SessionHelper::_on_create_socket回调,该回调也创建Socket,但是该回调创建的Socket没有Poller负载相关操作,其poller继承自父。
  6. 启动TcpServer,首先TcpServer::setupEvent()创建Socket。
    socket绑定的poller,是在TcpServer的父类Server初始化的时候返回的。
  7. TcpServer::setupEvent()为socket注册两个回调函数。
    7.1. Socket::_on_before_accept,在接受客户端链接请求之前,创建socket,此时绑定的poller有负载考虑。
    7.2. Socket::_on_accept,接受客户端链接请求。使用Socket::_on_before_accept创建的socket,调用步骤4注册的TcpServer::_session_alloc回调创建session。并为该socket绑定数据可读回调。
  8. 启动监听服务。
    8.1. 添加客户端请求事件。
    8.2. 注册客户端请求响应回调,主要调用Socket::onAccept()。
  9. 客户端响应处理。
    9.1. 接受客户端请求accept()。
    9.2. 调用Socket::_on_before_accept回调,创建socket,并考虑到poller负载。
    9.3. 如果8.2. 创建失败,则继承父_poller,创建socket。一般这里不执行。
    9.4. 调用Socket::_on_accept接受客户端请求,核心调用回调TcpServer::onAcceptConnection(),创建客户端session,注册数据可读事件和回调。
    9.5. 注册客户端可读事件,绑定数据可读回调。
  10. 根据poller数量,批量clone服务。

创建TcpServer

创建TcpServer并调用start().

server/main.cpp
        uint16_t rtspPort = mINI::Instance()[Rtsp::kPort];
        auto rtspSrv = std::make_shared<TcpServer>();;
        if (rtspPort) {
    
     rtspSrv->start<RtspSession>(rtspPort); }

TcpServer()初始化做了两件重要的事情:

  1. 创建EventPollerPool。
    EventPollerPool是TcpServer父类Server初始化的时候创建的。会根据cpu核数count或者命令行参数-t count创建count个poller实例,并对应启动线程。
  2. 注册Socket创建回调TcpServer::_on_create_socket。该回调有两大用途:
    Server启动时,优先创建socket。
    Server accept客户端链接请求时,创建客户端socket。
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// main() 函数中 auto rtspSrv = std::make_shared<TcpServer>();;
// poller默认为空
// 初始化父类 Server(poller)
TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) {
    
    
    setOnCreateSocket(nullptr);
}

Server::Server() 初始化

3rdpart/ZLToolKit/src/Network/Server.cpp
Server::Server(EventPoller::Ptr poller) {
    
    
	// EventPoller::Ptr Server::_poller;
	// EventPollerPool::Instance() 会创建线程
    _poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller();
}

创建EventPoller和线程

3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
EventPollerPool::EventPollerPool() {
    
    
	// 添加多个EventPoller并创建线程,
    auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true, s_enable_cpu_affinity);
    NoticeCenter::Instance().emitEvent(kOnStarted, *this, size);
    InfoL << "EventPoller created size: " << size;
}

EventPollerPool继承自TaskExecutorGetterImp,addPoller()的实现在父类中。
addPoller会根据cpu核数count或者命令行参数-t count创建count个poller实例,并对应启动线程。

3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread, bool enable_cpu_affinity) {
    
    
    auto cpus = thread::hardware_concurrency();
    size = size > 0 ? size : cpus;
    for (size_t i = 0; i < size; ++i) {
    
    
        auto full_name = name + " " + to_string(i);
        EventPoller::Ptr poller(new EventPoller(full_name, (ThreadPool::Priority) priority));
        poller->runLoop(false, register_thread);
        poller->async([i, cpus, full_name, enable_cpu_affinity]() {
    
    
            setThreadName(full_name.data());
            if (enable_cpu_affinity) {
    
    
                setThreadAffinity(i % cpus);
            }
        });
        // TaskExecutorGetterImp::_threads 成员变量
        // std::vector<TaskExecutor::Ptr> _threads;
        _threads.emplace_back(std::move(poller));
    }
    return size;
}

创建epoll实例。

EventPoller::EventPoller(std::string name, ThreadPool::Priority priority) {
    
    
    _name = std::move(name);
    _priority = priority;
    SockUtil::setNoBlocked(_pipe.readFD());
    SockUtil::setNoBlocked(_pipe.writeFD());

#if defined(HAS_EPOLL)
    _epoll_fd = epoll_create(EPOLL_SIZE);
    if (_epoll_fd == -1) {
    
    
        throw runtime_error(StrPrinter << "Create epoll fd failed: " << get_uv_errmsg());
    }
    SockUtil::setCloExec(_epoll_fd);
#endif //HAS_EPOLL
    _logger = Logger::Instance().shared_from_this();
    _loop_thread_id = this_thread::get_id();

    //添加内部管道事件
    if (addEvent(_pipe.readFD(), Event_Read, [this](int event) {
    
     onPipeEvent(); }) == -1) {
    
    
        throw std::runtime_error("Add pipe fd to poller failed");
    }
}

创建线程。

3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// poller->runLoop(false, register_thread);
void EventPoller::runLoop(bool blocked, bool ref_self) {
    
    
    // 默认blocked为false
    if (blocked) {
    
    
        ThreadPool::setPriority(_priority);
        lock_guard<mutex> lck(_mtx_running);
        _loop_thread_id = this_thread::get_id();
        if (ref_self) {
    
    
            s_current_poller = shared_from_this();
        }
        _sem_run_started.post();
        _exit_flag = false;
        uint64_t minDelay;
#if defined(HAS_EPOLL)
        struct epoll_event events[EPOLL_SIZE];
        while (!_exit_flag) {
    
    
            minDelay = getMinDelay();
            startSleep();//用于统计当前线程负载情况
            int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
            sleepWakeUp();//用于统计当前线程负载情况
            if (ret <= 0) {
    
    
                //超时或被打断
                continue;
            }
            for (int i = 0; i < ret; ++i) {
    
    
                struct epoll_event &ev = events[i];
                int fd = ev.data.fd;
                auto it = _event_map.find(fd);
                if (it == _event_map.end()) {
    
    
                    epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                    continue;
                }
                auto cb = it->second;
                try {
    
    
                    (*cb)(toPoller(ev.events));
                } catch (std::exception &ex) {
    
    
                    ErrorL << "Exception occurred when do event task: " << ex.what();
                }
            }
        }
#else
        // select
        ......
#endif //HAS_EPOLL
    } else {
    
    
        _loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);
        _sem_run_started.wait();
    }
}

根据负载获取EventPoller

声明:

3rdpart/ZLToolKit/src/Poller/EventPoller.h
class EventPollerPool : public std::enable_shared_from_this<EventPollerPool>, public TaskExecutorGetterImp {
    
    
public:
	......
    /**
     * 根据负载情况获取轻负载的实例
     * 如果优先返回当前线程,那么会返回当前线程
     * 返回当前线程的目的是为了提高线程安全性
     * @param prefer_current_thread 是否优先获取当前线程
     */
    EventPoller::Ptr getPoller(bool prefer_current_thread = true);
	......
}

实现:

3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) {
    
    
auto poller = EventPoller::getCurrentPoller();
	// prefer_current_thread == true
	// _prefer_current_thread
    if (prefer_current_thread && _prefer_current_thread && poller) {
    
    
        return poller;
	}
	// 执行了下面
    return dynamic_pointer_cast<EventPoller>(getExecutor());
}
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// static
EventPoller::Ptr EventPoller::getCurrentPoller() {
    
    
    return s_current_poller.lock();
}

根据负载返回TaskExecutor。

3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
TaskExecutor::Ptr TaskExecutorGetterImp::getExecutor() {
    
    
    auto thread_pos = _thread_pos;
    if (thread_pos >= _threads.size()) {
    
    
        thread_pos = 0;
    }

    TaskExecutor::Ptr executor_min_load = _threads[thread_pos];
    auto min_load = executor_min_load->load();

    for (size_t i = 0; i < _threads.size(); ++i, ++thread_pos) {
    
    
        if (thread_pos >= _threads.size()) {
    
    
            thread_pos = 0;
        }

        auto th = _threads[thread_pos];
        auto load = th->load();

        if (load < min_load) {
    
    
            min_load = load;
            executor_min_load = th;
        }
        if (min_load == 0) {
    
    
            break;
        }
    }
    _thread_pos = thread_pos;
    return executor_min_load;
}

注册TcpServer::_on_create_socket回调

调用:

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// main() 函数中 auto rtspSrv = std::make_shared<TcpServer>();;
// 所以poller默认为空
// 初始化父类 Server(poller)
TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) {
    
    
    setOnCreateSocket(nullptr);
}

实现:

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// cb 传入为nullptr
void TcpServer::setOnCreateSocket(Socket::onCreateSocket cb) {
    
    
    if (cb) {
    
    
        _on_create_socket = std::move(cb);
	} else {
    
    
        // 走这里
        // 注册回调函数, 只在这里注册
        // 调用该回调函数时,poller不为空
        // TcpServer::_on_create_socket
        _on_create_socket = [](const EventPoller::Ptr &poller) {
    
    
            return Socket::createSocket(poller, false);
        };
    }
	for (auto &pr : _cloned_server) {
    
    
        // cb 传入为nullptr
        pr.second->setOnCreateSocket(cb);
    }
}

创建Socket

../3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Ptr Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex){
    
    
    return std::make_shared<Socket>(poller, enable_mutex);
}

如果传入poller参数为空,则寻找最轻负载poller绑定socket。

../3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Socket(const EventPoller::Ptr &poller, bool enable_mutex) :
        _mtx_sock_fd(enable_mutex), _mtx_event(enable_mutex),
        _mtx_send_buf_waiting(enable_mutex), _mtx_send_buf_sending(enable_mutex){
    
    

    _poller = poller;
    if (!_poller) {
    
    
        _poller = EventPollerPool::Instance().getPoller();
    }
    setOnRead(nullptr);
    setOnErr(nullptr);
    setOnAccept(nullptr);
    setOnFlush(nullptr);
    setOnBeforeAccept(nullptr);
    setOnSendResult(nullptr);
}

TcpServer::start()

该函数启动TcpServer服务。主要做两件事情:

  1. 注册TcpServer::_session_alloc回调。
    a) 为SessionType(对应为RtspSession)申请内存空间。绑定socket。
    b) 调用SocketHelper::setOnCreateSocket(),注册创建socket的回调函数,实际调用TcpServer::_on_create_socket。Session无setOnCreateSocket()实现,继承自SocketHelper。
    c) 创建SessionHelper,绑定server和session。Session和SessionHelper一一对应。SessionHelper通过全局变量SessionMap管理session。
  2. 调用start_l(port, host, backlog)启动监听任务。

注册TcpServer::_session_alloc

3rdpart/ZLToolKit/src/Network/TcpServer.h
class TcpServer : public Server {
    
    
public:
    using Ptr = std::shared_ptr<TcpServer>;
    ......
	// if (rtspPort) { rtspSrv->start<RtspSession>(rtspPort); }
	// RtspSession等同于SessionType, RtspSession由Session派生
	template<typename SessionType>
    void start(uint16_t port, const std::string &host = "::", uint32_t backlog = 1024) {
    
    
        // Session创建器,通过它创建不同类型的服务器
        // 注册创建session的回调
        // std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> TcpServer::_session_alloc;
        _session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
    
    
        	// RtspSession等同于SessionType, RtspSession由Session派生
            auto session = std::make_shared<SessionType>(sock);
         	// 首先调用TcpServer::setOnCreateSocket() 
			// 然后再调用Socket::createSocket(),只注册回调函数
            session->setOnCreateSocket(server->_on_create_socket);
            return std::make_shared<SessionHelper>(server, session);
        };
        start_l(port, host, backlog);
	}
private:
    ......
    Socket::Ptr _socket;
    std::shared_ptr<Timer> _timer;
    Socket::onCreateSocket _on_create_socket;
    std::unordered_map<SessionHelper *, SessionHelper::Ptr> _session_map;
    std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> _session_alloc;
};

RtspSession继承自Session继承自SocketHelper。

3rdpart/ZLToolKit/src/Network/Socket.cpp
void SocketHelper::setOnCreateSocket(Socket::onCreateSocket cb){
    
    
	if (cb) {
    
    
    	// 走这里,此时cb不为空,TcpServer::_on_create_socket
        _on_create_socket = std::move(cb);
    } else {
    
    
        _on_create_socket = [](const EventPoller::Ptr &poller) {
    
    
            return Socket::createSocket(poller, false);
        };
    }
}

SocketHelper::_on_create_socket 实际为TcpServer::_on_create_socket,在Socket::onAccept()->Socket::createSocket()中执行。
且当Socket::onAccept()->Socket::_on_before_accept创建失败的时候执行,继承父poller,没有poller负载考虑。

创建SessionHelper管理Session

3rdpart/ZLToolKit/src/Network/Server.cpp
SessionHelper::SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session) {
    
    
    _server = server;
    _session = std::move(session);
	//记录session至全局的map,方便后面管理
	// 这个是个静态变量, 只初始化一次。
    _session_map = SessionMap::Instance().shared_from_this();
    _identifier = _session->getIdentifier();
    _session_map->add(_identifier, _session);
}

TcpServer::start_l()

缩进太深了,专门分析一下。

TcpServer::start_l()

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog) {
    
    
	// 1. 创建socket,并绑定回调
	// 创建TCPServer::_socket
	// 注册TCPServer::_socket回调函数setOnBeforeAccept,setOnAccept
	setupEvent();

	// 2. 启动监听
    if (!_socket->listen(port, host.c_str(), backlog)) {
    
    
        //创建tcp监听失败,可能是由于端口占用或权限问题
        string err = (StrPrinter << "Listen on " << host << " " << port << " failed: " << get_uv_errmsg(true));
        throw std::runtime_error(err);
    }

    // 3. Timer
    //新建一个定时器定时管理这些tcp会话
    weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
    _timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
    
    
        auto strong_self = weak_self.lock();
        if (!strong_self) {
    
    
            return false;
        }
        strong_self->onManagerSession();
        return true;
    }, _poller);

    // 4. 循环创建多个TcpServer
    EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
    
    
        EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
        if (poller == _poller || !poller) {
    
    
            return;
        }
        auto &serverRef = _cloned_server[poller.get()];
        if (!serverRef) {
    
    
            serverRef = onCreatServer(poller);
        }
        if (serverRef) {
    
    
            serverRef->cloneFrom(*this);
        }
    });

    InfoL << "TCP server listening on [" << host << "]: " << port;
}

TcpServer::setupEvent() 创建监听Socket

setupEvent()有两个主要作用:

  1. 根据poller创建对应的Socket。
  2. 为Socket注册两个回调,在响应客户端请求是调用。
    a) setOnBeforeAccept() 创建客户端socket。根据poller最轻负载创建Socket。
    b) setOnAccept() 接收客户端数据。

此次调用setupEvent(),TcpServer::_poller由TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance().getPoller(true)创建。

第二次调用setupEvent()在EventPollerPool::Instance().for_each()批量clone的时候,TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance()->TaskExecutorGetterImp::addPoller()已经批量创建了poller。

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::setupEvent() {
    
    
	// Socket::Ptr TCPServer::_socket;
	// TcpServer::_poller继承父类Server protect变量,此时不为空
    _socket = createSocket(_poller);
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());

	// 回调函数1
	// 实际是为Socket::_on_before_accept赋值,回调函数。
	// 该回调函数内调用TcpServer::onBeforeAcceptConnection(poller)
    _socket->setOnBeforeAccept([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr {
    
    
        if (auto strong_self = weak_self.lock()) {
    
    
            return strong_self->onBeforeAcceptConnection(poller);
        }
        return nullptr;
	});

	// 回调函数2 
	// 实际是为Socket::_on_accept赋值。
	// 该回调内调用TcpServer::onAcceptConnection()
    _socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
    
    
        if (auto strong_self = weak_self.lock()) {
    
    
            auto ptr = sock->getPoller().get();
            auto server = strong_self->getServer(ptr);
            ptr->async([server, sock, complete]() {
    
    
                //该tcp客户端派发给对应线程的TcpServer服务器
                server->onAcceptConnection(sock);
            });
        }
    });
}

TcpServer创建Socket

  1. 调用TcpServer::createSocket(),
  2. 调用回调函数TcpServer::_on_create_socket(poller),TcpServer::setOnCreateSocekt()注册该回调为Socket::createSocket()。
  3. 调用Socket::createSocket(), 构造Socket::Socket(const EventPoller::Ptr &poller, bool enable_mutex),返回Socket::Ptr。

参考:TcpServer::TcpServer(){setOnCreateSocket(nullptr);}

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// 此时poller不为空,是TcpServer初始化的时候,继承自Server
Socket::Ptr TcpServer::createSocket(const EventPoller::Ptr &poller) {
    
    
	// TcpServer::setOnCreateSoket()注册TcpServer::_on_create_socket
	// 实际为Socket::createSocket(),
	return _on_create_socket(poller);
}

注册Socket::setOnBeforeAccept()回调

接收请求之前创建Socket,考虑负载均衡。
调用:

3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
    
    
	......
	_socket = createSocket(_poller);
	......
	// 回调函数1
	// 实际是为Socket::_on_before_accept赋值,回调函数。
	// 该回调函数内调用TcpServer::onBeforeAcceptConnection(poller)
    _socket->setOnBeforeAccept([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr {
    
    
        if (auto strong_self = weak_self.lock()) {
    
    
         	// TcpServer::onBeforeAcceptConnection()
            return strong_self->onBeforeAcceptConnection(poller);
        }
        return nullptr;
	});
	.....
}

为Socket::_on_before_accept赋值:

3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnBeforeAccept(onCreateSocket cb){
    
    
    LOCK_GUARD(_mtx_event);
	if (cb) {
    
    
    	// 走这里
        _on_before_accept = std::move(cb);
    } else {
    
    
        _on_before_accept = [](const EventPoller::Ptr &poller) {
    
    
            return nullptr;
        };
    }
}

该回调根据poller负载,创建Socket。
参考TcpServer::setupEvent()->TcpServer::createSocket()。
EventPollerPool::Instance().getPoller(false) 会根据poller负载,返回最空闲的poller。
参考:TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance().getPoller()->TaskExecutorGetterImp::getExecutor()

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) {
    
    
    // _poller 只做判断使用
    assert(_poller->isCurrentThread());
    //此处改成自定义获取poller对象,防止负载不均衡
    return createSocket(EventPollerPool::Instance().getPoller(false));
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Socket::Ptr TcpServer::createSocket(const EventPoller::Ptr &poller) {
    
    
    return _on_create_socket(poller);
}

注册Socket::setOnAccept()回调

实际上就是为Socket::_on_accept赋值。该回调会在Socket::onAccept()中调用,接受客户端请求。
参数sock由Socket::_on_before_accept()回调创建,该回调根据负载分配一个负载最轻的poller。Socket::_on_before_accept由TcpServer::setupEvent()->Socket::setOnBeforeAccept()设置。
该回调内有重要的函数调用server->onAcceptConnection(sock),该函数有以下两个主要操作:

  1. 调用TcpServer::_session_alloc创建session,并添加该session至全局静态变量_session_map中。
  2. 为sock设置数据回调,该回调实际调用Session::onRecv()上抛数据。
    调用:
3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
    
    
	......
	_socket = createSocket(_poller);
	......
	// 回调函数2 
	// 实际是为Socket::_on_accept赋值。
	// 该回调内调用TcpServer::onAcceptConnection()
	// sock实际为Socket::_on_before_accept根据poller负载创建。
    _socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
    
    
        if (auto strong_self = weak_self.lock()) {
    
    
            auto ptr = sock->getPoller().get();
            // 逆向寻找server
            auto server = strong_self->getServer(ptr);
            ptr->async([server, sock, complete]() {
    
    
                //该tcp客户端派发给对应线程的TcpServer服务器
                server->onAcceptConnection(sock);
            });
        }
    });
    ......
}

为Socket::_on_accept赋值:

3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnAccept(onAcceptCB cb) {
    
    
    LOCK_GUARD(_mtx_event);
	if (cb) {
    
    
        // 走这里
        _on_accept = std::move(cb);
    } else {
    
    
        _on_accept = [](Socket::Ptr &sock, shared_ptr<void> &complete) {
    
    
            WarnL << "Socket not set accept callback, peer fd: " << sock->rawFD();
        };
    }
}

创建并管理Session

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// Server接收到客户端连接请求,上抛peer Socket至此。
// 一个请求一个Session
Session::Ptr TcpServer::onAcceptConnection(const Socket::Ptr &sock) {
    
    
    assert(_poller->isCurrentThread());
    weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
	//创建一个Session;这里实现创建不同的服务会话实例
	// SessionHelper
    auto helper = _session_alloc(std::dynamic_pointer_cast<TcpServer>(shared_from_this()), sock);
    auto session = helper->session();
    //把本服务器的配置传递给Session
    session->attachServer(*this);

	//_session_map::emplace肯定能成功
	// 记录 裸指针和智能指针
    auto success = _session_map.emplace(helper.get(), helper).second;
    assert(success == true);

	weak_ptr<Session> weak_session = session;
	// 回调函数1
    //会话接收数据事件
    sock->setOnRead([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int) {
    
    
        //获取会话强应用
        auto strong_session = weak_session.lock();
        if (!strong_session) {
    
    
            return;
        }
        try {
    
    
            strong_session->onRecv(buf);
        } catch (SockException &ex) {
    
    
            strong_session->shutdown(ex);
        } catch (exception &ex) {
    
    
            strong_session->shutdown(SockException(Err_shutdown, ex.what()));
        }
    });

	SessionHelper *ptr = helper.get();
	// 回调函数2
    //会话接收到错误事件
    sock->setOnErr([weak_self, weak_session, ptr](const SockException &err) {
    
    
        //在本函数作用域结束时移除会话对象
        //目的是确保移除会话前执行其onError函数
        //同时避免其onError函数抛异常时没有移除会话对象
        onceToken token(nullptr, [&]() {
    
    
            //移除掉会话
            auto strong_self = weak_self.lock();
            if (!strong_self) {
    
    
                return;
            }

            assert(strong_self->_poller->isCurrentThread());
            if (!strong_self->_is_on_manager) {
    
    
                //该事件不是onManager时触发的,直接操作map
                strong_self->_session_map.erase(ptr);
            } else {
    
    
                //遍历map时不能直接删除元素
                strong_self->_poller->async([weak_self, ptr]() {
    
    
                    auto strong_self = weak_self.lock();
                    if (strong_self) {
    
    
                        strong_self->_session_map.erase(ptr);
                    }
                }, false);
            }
        });

        //获取会话强应用
        auto strong_session = weak_session.lock();
        if (strong_session) {
    
    
            //触发onError事件回调
            strong_session->onError(err);
        }
    });
    return session;
}

TcpServer::_session_alloc()在TcpSerer::start()中注册。

3rdpart/ZLToolKit/src/Network/TcpServer.h
class TcpServer : public Server {
    
    
public:
    ......
	template<typename SessionType>
    void start(uint16_t port, const std::string &host = "::", uint32_t backlog = 1024) {
    
    
        // Session创建器,通过它创建不同类型的服务器
        // 注册创建session的回调
        // std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> TcpServer::_session_alloc;
        _session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
    
    
        	// RtspSession等同于SessionType, RtspSession由Session派生
            auto session = std::make_shared<SessionType>(sock);
         	// 首先调用TcpServer::setOnCreateSocket() 
			// 然后再调用Socket::createSocket(),只注册回调函数
            session->setOnCreateSocket(server->_on_create_socket);
            return std::make_shared<SessionHelper>(server, session);
        };
        start_l(port, host, backlog);
	}
private:
    ......
    std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> _session_alloc;
};

调用TcpServer::_session_alloc()创建Session(包含Socket::_on_before_accept()创建爱你的Socket,其poller负载最轻),并返回SessionHelper。

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
	//创建一个Session;这里实现创建不同的服务会话实例
	// SessionHelper
    auto helper = _session_alloc(std::dynamic_pointer_cast<TcpServer>(shared_from_this()), sock);
    auto session = helper->session();
    //把本服务器的配置传递给Session
    session->attachServer(*this);
3rdpart/ZLToolKit/src/Network/Server.cpp
const Session::Ptr &SessionHelper::session() const {
    
    
    return _session;
}

(map<SessionHelper*,SessionHelper::Ptr>) TcpServer::_session_map,记录裸指针和智能指针的对应关系。

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
	//_session_map::emplace肯定能成功
	// 记录 裸指针和智能指针
    auto success = _session_map.emplace(helper.get(), helper).second;
    assert(success == true);

注册Socket::_on_read回调

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Session::Ptr TcpServer::onAcceptConnection(const Socket::Ptr &sock) {
    
    
	省略session创建部分
	......
    //会话接收数据事件
    sock->setOnRead([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int) {
    
    
        //获取会话强应用
        auto strong_session = weak_session.lock();
        if (!strong_session) {
    
    
            return;
        }
        try {
    
    
            strong_session->onRecv(buf);
        } catch (SockException &ex) {
    
    
            strong_session->shutdown(ex);
        } catch (exception &ex) {
    
    
            strong_session->shutdown(SockException(Err_shutdown, ex.what()));
        }
    });
    ......
}

实现:

3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnRead(onReadCB cb) {
    
    
    LOCK_GUARD(_mtx_event);
	if (cb) {
    
    
        // 走这里
        _on_read = std::move(cb);
    } else {
    
    
        _on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) {
    
    
            WarnL << "Socket not set read callback, data ignored: " << buf->size();
        };
    }
}

RtspSession继承自Session。
该回调实际调用纯虚函数Session::onRevc(),实际调用RtspSession::onRecv()。

3rdpart/ZLToolKit/src/Network/Server.h
class Session : public std::enable_shared_from_this<Session>, public SocketHelper {
    
    
public:
    using Ptr = std::shared_ptr<Session>;

    Session(const Socket::Ptr &sock);
    ~Session() override = default;

    /**
     * 接收数据入口
     * @param buf 数据,可以重复使用内存区,不可被缓存使用
     */
    virtual void onRecv(const Buffer::Ptr &buf) = 0;
   	......
};

Socket::listen(ip, port)

3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::listen(uint16_t port, const string &local_ip, int backlog) {
    
    
    int sock = SockUtil::listen(port, local_ip.data(), backlog);
    if (sock == -1) {
    
    
        return false;
    }
    return listen(makeSock(sock, SockNum::Sock_TCP));
}

调用系统接口SockUtil::listen()。

3rdpart/ZLToolKit/src/Network/sckutil.cpp
int SockUtil::listen(const uint16_t port, const char *local_ip, int back_log) {
    
    
    int fd = -1;
    int family = support_ipv6() ? (is_ipv4(local_ip) ? AF_INET : AF_INET6) : AF_INET;
    if ((fd = (int)socket(family, SOCK_STREAM, IPPROTO_TCP)) == -1) {
    
    
        WarnL << "Create socket failed: " << get_uv_errmsg(true);
        return -1;
    }
    // 设置多路复用
    setReuseable(fd, true, false);
    setNoBlocked(fd);
    setCloExec(fd);

    if (bind_sock(fd, local_ip, port, family) == -1) {
    
    
        close(fd);
        return -1;
    }

    //开始监听
    if (::listen(fd, back_log) == -1) {
    
    
        WarnL << "Listen socket failed: " << get_uv_errmsg(true);
        close(fd);
        return -1;
    }

    return fd;
}

调用Socket::listen():

3rdpart/ZLToolKit/src/Network/Socket.cpp
// sock为系统接口socket() 返回套接字ID
bool Socket::listen(const SockFD::Ptr &sock){
    
    
    closeSock();
    weak_ptr<SockFD> weak_sock = sock;
    weak_ptr<Socket> weak_self = shared_from_this();
	_enable_recv = true;
	// EventPoller::Ptr Socket::_poller; 
	// epoll实例addEvent绑定套接字
	int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error, 
	[weak_self, weak_sock](int event) {
    
     // 注册回调函数
        auto strong_self = weak_self.lock();
        auto strong_sock = weak_sock.lock();
        if (!strong_self || !strong_sock) {
    
    
            return;
        }
    	// 事件回调创建新链接,这是回调中的具体执行函数。
        strong_self->onAccept(strong_sock, event);
	}
	);

    if (result == -1) {
    
    
        return false;
    }

    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd = sock;
    return true;
}

Socket::closeSock()

3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::closeSock() {
    
    
    _con_timer = nullptr;
    _async_con_cb = nullptr;

    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd  = nullptr;
}

epoll添加请求链接事件

epoll绑定套接字。
PollEventCB注册在Socket::listen(const SockFD::Ptr &sock)。

3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
    
    
    TimeTicker();
    if (!cb) {
    
    
        WarnL << "PollEventCB is empty";
        return -1;
    }

    if (isCurrentThread()) {
    
    
#if defined(HAS_EPOLL)
        struct epoll_event ev = {
    
    0};
        ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
        ev.data.fd = fd;
        int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
        if (ret == 0) {
    
    
            _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
        }
        return ret;
#else
#ifndef _WIN32
        //win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制
        if (fd >= FD_SETSIZE || _event_map.size() >= FD_SETSIZE) {
    
    
            WarnL << "select() can not watch fd bigger than " << FD_SETSIZE;
            return -1;
        }
#endif
        auto record = std::make_shared<Poll_Record>();
        record->event = event;
        record->call_back = std::move(cb);
        _event_map.emplace(fd, record);
        return 0;
#endif //HAS_EPOLL
    }

	async([this, fd, event, cb]() {
    
    
        // 注册的回调,在Socket::listen(const SockFD::Ptr &sock)中。
        addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
    });
    return 0;
}

同步事件。

3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务
// 默认为true
Task::Ptr EventPoller::async(TaskIn task, bool may_sync) {
    
    
    return async_l(std::move(task), may_sync, false);
}
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) {
    
    
    TimeTicker();
    if (may_sync && isCurrentThread()) {
    
    
        task();
        return nullptr;
    }

    auto ret = std::make_shared<Task>(std::move(task));
    {
    
    
        lock_guard<mutex> lck(_mtx_task);
        if (first) {
    
    
            _list_task.emplace_front(ret);
        } else {
    
    
            _list_task.emplace_back(ret);
        }
    }
    //写数据到管道,唤醒主线程
    _pipe.write("", 1);
    return ret;
}

请求响应Socket::onAccept()

接受客户端线程。

  1. accept() 接受客户端请求socket client connect()。
  2. 创建客户端socket,这个地方有两步操作:
    a) 优先执行Socket::_on_before_accept,根据poller负载创建socket。
    b) 保底执行Socket::createSocket(_poller, false),根据父poller创建socket。
  3. setPeerSock()设置peer socket ip port
  4. 调用Socket::_on_accept创建session,并注册数据上抛回调Socket::_on_read。
    Socket::_on_accept注册在TcpServer::setupEvent()->Socket::setOnAccept()。
  5. Socket::attachEvent() 主要执行以下操作:
    a) 获取共享缓冲区;
    b) 注册数据可读事件;
    c) 数据上抛回调:i.recvFrom()接收数据;ii.执行Socket::onRead()->Socket::_on_read上抛数据。
3rdpart/ZLToolKit/src/Network/Socket.cpp
int Socket::onAccept(const SockFD::Ptr &sock, int event) noexcept {
    
    
    int fd;
	while (true) {
    
    
        // 读取回调
        if (event & EventPoller::Event_Read) {
    
    
            do {
    
    
                // 此时只获取客户端fd,没有获取客户端IP
                // connsockfd = accept(listenfd, (struct sockaddr *)&clientaddr, &len);
                fd = (int)accept(sock->rawFd(), nullptr, nullptr);
            } while (-1 == fd && UV_EINTR == get_uv_error(true));

            if (fd == -1) {
    
    
                int err = get_uv_error(true);
                if (err == UV_EAGAIN) {
    
    
                    //没有新连接
                    return 0;
                }
                auto ex = toSockException(err);
                emitErr(ex);
                ErrorL << "Accept socket failed: " << ex.what();
                return -1;
            }

            SockUtil::setNoSigpipe(fd);
            SockUtil::setNoBlocked(fd);
            SockUtil::setNoDelay(fd);
            SockUtil::setSendBuf(fd);
            SockUtil::setRecvBuf(fd);
            SockUtil::setCloseWait(fd);
            SockUtil::setCloExec(fd);

            // 优先执行这里创建 Socket,考虑负载均衡
            // 只创建Socket使用。
            Socket::Ptr peer_sock;
            try {
    
    
                //此处捕获异常,目的是防止socket未accept尽,epoll边沿触发失效的问题
                LOCK_GUARD(_mtx_event);
                //拦截Socket对象的构造
                // 该_poller只用于判断,创建Socket用最轻负载的poller
                peer_sock = _on_before_accept(_poller);
            } catch (std::exception &ex) {
    
    
                ErrorL << "Exception occurred when emit on_before_accept: " << ex.what();
                close(fd);
                continue;
            }

            // _on_before_accept(_poller)失败才会走这里
            // 这里继承父poller,没有负载均衡。
            if (!peer_sock) {
    
    
                //此处是默认构造行为,也就是子Socket共用父Socket的poll线程并且关闭互斥锁
                peer_sock = Socket::createSocket(_poller, false);
            }

            //设置好fd,以备在onAccept事件中可以正常访问该fd
            // 此时获取客户端IP和port
            // int fd 转换为SockFD::Ptr &peer_sock_fd
            auto peer_sock_fd = peer_sock->setPeerSock(fd);

             // 实际先执行_on_accept(peer_sock, completed);
             // 后执行attachEvent
            shared_ptr<void> completed(nullptr, [peer_sock, peer_sock_fd](void *) {
    
    
                try {
    
    
                    //然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件)
                    if (!peer_sock->attachEvent(peer_sock_fd)) {
    
    
                        //加入poll监听失败,触发onErr事件,通知该Socket无效
                        peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket"));
                    }
                } catch (std::exception &ex) {
    
    
                    ErrorL << "Exception occurred: "<< ex.what();
                }
            });

            try {
    
    
                //此处捕获异常,目的是防止socket未accept尽,epoll边沿触发失效的问题
                LOCK_GUARD(_mtx_event);
                //先触发onAccept事件,此时应该监听该Socket的onRead等事件
                // Socket::setOnAccept() 设定的回调函数
                _on_accept(peer_sock, completed);
            } catch (std::exception &ex) {
    
    
                ErrorL << "Exception occurred when emit on_accept: " << ex.what();
                continue;
            }
        }  // end if

        if (event & EventPoller::Event_Error) {
    
    
            auto ex = getSockErr(sock);
            emitErr(ex);
            ErrorL << "TCP listener occurred a err: " << ex.what();
            return -1;
        }
    }
}

accept()接受客户端请求

系统接口。

创建socket

创建客户端socket,这个地方有两步操作:
a) 优先执行Socket::_on_before_accept,根据poller负载创建socket。
b) 保底执行Socket::createSocket(_poller, false),根据父poller创建socket。

Socket::_on_before_accept

根据poller负载创建socket。

Socket::createSocket()

Socket::createSocket() 继承父poller创建socket,逻辑上不执行该函数,该函数没有poller负载的考虑。除非Socket::_on_before_accept创建socket失败。
SocketHelper::_on_create_socket 实际调用TcpServer::_on_create_socket。
Socket::_on_before_accept实际也调用的是TcpServer::_on_create_socket。

3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Ptr SocketHelper::createSocket(){
    
    
    // 使用父_poller,没有负载的考虑
    return _on_create_socket(_poller);
}

设置socket ip port

3rdpart/ZLToolKit/src/Network/Socket.cpp
SockFD::Ptr Socket::setPeerSock(int fd) {
    
    
closeSock();
	// 该函数初始化的客户端ip和port
    auto sock = makeSock(fd, SockNum::Sock_TCP);
    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd = sock;
    return sock;
}

Socket::_on_accept接受请求

Socket::_on_accept核心调用TcpServer::onAcceptConnection():

  1. 创建session;
  2. 注册数据上抛回调Socket::_on_read,实际调用Session::onRecv()->RtspSession::onRecv()。
    赋值:
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnAccept(onAcceptCB cb) {
    
    
    LOCK_GUARD(_mtx_event);
	if (cb) {
    
    
        // 走了这里
        _on_accept = std::move(cb);
    } else {
    
    
        _on_accept = [](Socket::Ptr &sock, shared_ptr<void> &complete) {
    
    
            WarnL << "Socket not set accept callback, peer fd: " << sock->rawFD();
        };
    }
}

参考:TcpServer::start_l()->TcpServer::setupEvent()->Socket::setOnAccept()。

3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
    
    
	......
	_socket = createSocket(_poller);
	......
	// 回调函数2 
	// 实际是为Socket::_on_accept赋值。
	// 该回调内调用TcpServer::onAcceptConnection()
	// sock实际为Socket::_on_before_accept根据poller负载创建。
    _socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
    
    
        if (auto strong_self = weak_self.lock()) {
    
    
            auto ptr = sock->getPoller().get();
            // 逆向寻找server
            auto server = strong_self->getServer(ptr);
            ptr->async([server, sock, complete]() {
    
    
                //该tcp客户端派发给对应线程的TcpServer服务器
                server->onAcceptConnection(sock);
            });
        }
    });
    ......
}

Socket::attachEvent() 绑定数据可读回调

Socket::attachEvent() 主要有以下三个功能:

  1. 获取共享缓冲区;
  2. 注册数据可读事件;
  3. 数据上抛回调;
    i.recvFrom()接收数据;
    ii.执行Socket::onRead()->Socket::_on_read上抛数据。
3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::attachEvent(const SockFD::Ptr &sock) {
    
    
    weak_ptr<Socket> weak_self = shared_from_this();
    weak_ptr<SockFD> weak_sock = sock;
    _enable_recv = true;
    _read_buffer = _poller->getSharedBuffer();
    auto is_udp = sock->type() == SockNum::Sock_UDP;
    int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, weak_sock, is_udp](int event) {
    
    
        auto strong_self = weak_self.lock();
        auto strong_sock = weak_sock.lock();
        if (!strong_self || !strong_sock) {
    
    
            return;
        }

        if (event & EventPoller::Event_Read) {
    
    
            strong_self->onRead(strong_sock, is_udp);
        }
        if (event & EventPoller::Event_Write) {
    
    
            strong_self->onWriteAble(strong_sock);
        }
        if (event & EventPoller::Event_Error) {
    
    
            strong_self->emitErr(getSockErr(strong_sock));
        }
	});

    return -1 != result;
}
申请共享缓冲区。
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
BufferRaw::Ptr EventPoller::getSharedBuffer() {
    
    
    auto ret = _shared_buffer.lock();
    if (!ret) {
    
    
        //预留一个字节存放\0结尾符
        ret = BufferRaw::create();
        ret->setCapacity(1 + SOCKET_DEFAULT_BUF_SIZE);
        _shared_buffer = ret;
    }
    return ret;
}
注册数据可读事件

addEvent()数据可读事件。

数据上抛回调

recvFrom()接收数据;执行Socket::onRead()->Socket::_on_read上抛数据。
Socket::onRead() 使用数据上抛回调Socket::_on_read 接收数据并且上抛。

3rdpart/ZLToolKit/src/Network/Socket.cpp
ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept{
    
    
    ssize_t ret = 0, nread = 0;
    auto sock_fd = sock->rawFd();

    auto data = _read_buffer->data();
    //最后一个字节设置为'\0'
    auto capacity = _read_buffer->getCapacity() - 1;

    struct sockaddr_storage addr;
    socklen_t len = sizeof(addr);

    while (_enable_recv) {
    
    
        do {
    
    
            nread = recvfrom(sock_fd, data, capacity, 0, (struct sockaddr *)&addr, &len);
        } while (-1 == nread && UV_EINTR == get_uv_error(true));

        if (nread == 0) {
    
    
            if (!is_udp) {
    
    
                emitErr(SockException(Err_eof, "end of file"));
            } else {
    
    
                WarnL << "Recv eof on udp socket[" << sock_fd << "]";
            }
            return ret;
        }

        if (nread == -1) {
    
    
            auto err = get_uv_error(true);
            if (err != UV_EAGAIN) {
    
    
                if (!is_udp) {
    
    
                    emitErr(toSockException(err));
                } else {
    
    
                    WarnL << "Recv err on udp socket[" << sock_fd << "]: " << uv_strerror(err);
                }
            }
            return ret;
        }

        if (_enable_speed) {
    
    
            // 更新接收速率
            _recv_speed += nread;
        }

        ret += nread;
        data[nread] = '\0';
        //设置buffer有效数据大小
        _read_buffer->setSize(nread);

        //触发回调
        LOCK_GUARD(_mtx_event);
        try {
    
    
            //此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
            // 该回调在Socket::setOnAccept()->TcpServer::onAcceptConnection()->Socket::setOnRead()设置为RtspSession::onRecv()。
            _on_read(_read_buffer, (struct sockaddr *)&addr, len);
        } catch (std::exception &ex) {
    
    
            ErrorL << "Exception occurred when emit on_read: " << ex.what();
        }
    }
    return 0;
}

Socket::setOnAccept()->TcpServer::onAcceptConnection()->Socket::setOnRead()注册Socket::_on_read。

3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnRead(onReadCB cb) {
    
    
    LOCK_GUARD(_mtx_event);
	if (cb) {
    
    
        // 走这里
        _on_read = std::move(cb);
    } else {
    
    
        _on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) {
    
    
            WarnL << "Socket not set read callback, data ignored: " << buf->size();
        };
    }
}

RtspSession继承自Session。
该回调实际调用纯虚函数Session::onRevc(),实际调用RtspSession::onRecv()。

3rdpart/ZLToolKit/src/Network/Server.h
class Session : public std::enable_shared_from_this<Session>, public SocketHelper {
    
    
public:
    using Ptr = std::shared_ptr<Session>;

    Session(const Socket::Ptr &sock);
    ~Session() override = default;

    /**
     * 接收数据入口
     * @param buf 数据,可以重复使用内存区,不可被缓存使用
     */
    virtual void onRecv(const Buffer::Ptr &buf) = 0;
   	......
};

EventPollerPool::Instance().for_each()

循环创建服务器实例

3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog) {
    
    
    ……

	// for_each()继承自父类
	//一个线程有一个TaskExecutor,现在是用这个函数处理每个 TaskExecutor
	EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
    
    
    	// 如果poller 与当前poller相同,则返回
        EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
        if (poller == _poller || !poller) {
    
    
            return;
        }
    	// std::unordered_map<const EventPoller *, Ptr> _cloned_server;
    	// clone (TcpServer)serverRef,相当于有多个server
   	 	// 这里返回serverRef=null
        auto &serverRef = _cloned_server[poller.get()];
     	// 如果serverRef查找失败
		// 则调用onCreatServer()创建TcpServer,并绑定poller。
        if (!serverRef) {
    
    
         // 此时poller已创建完毕,poller不为空
            serverRef = onCreatServer(poller);
        }
        if (serverRef) {
    
    
            serverRef->cloneFrom(*this);
        }
    });

    InfoL << "TCP server listening on [" << host << "]: " << port;
}

for_each()实现,继承自父类。

3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
void TaskExecutorGetterImp::for_each(const function<void(const TaskExecutor::Ptr &)> &cb) {
    
    
	for (auto &th : _threads) {
    
    
    	// 该回调即为EventPollerPool::Instance().for_each()注册的lambda
        cb(th);
    }
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
TcpServer::Ptr TcpServer::onCreatServer(const EventPoller::Ptr &poller) {
    
    
    return std::make_shared<TcpServer>(poller);
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::cloneFrom(const TcpServer &that) {
    
    
    if (!that._socket) {
    
    
        throw std::invalid_argument("TcpServer::cloneFrom other with null socket");
	}
	// 创建Socket::Ptr TcpServer::_socket
    setupEvent();
    _on_create_socket = that._on_create_socket;
	_session_alloc = that._session_alloc;
	// 克隆一个相同fd的Socket对象
	// 拷贝socket 包含SockFD,并listen。
    _socket->cloneFromListenSocket(*(that._socket));
    weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
    _timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
    
    
        auto strong_self = weak_self.lock();
        if (!strong_self) {
    
    
            return false;
        }
        strong_self->onManagerSession();
        return true;
    }, _poller);
    this->mINI::operator=(that);
    _parent = &that;
}

setupEvent() 参考:TcpServer::start()->TcpServer::start_l()->TcpServer::setupEvent()。

3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::cloneFromListenSocket(const Socket &other) {
    
    
    auto sock = cloneSockFD(other);
    if (!sock) {
    
    
        return false;
    }
    return listen(sock);
}
3rdpart/ZLToolKit/src/Network/Socket.cpp
SockFD::Ptr Socket::cloneSockFD(const Socket &other) {
    
    
    SockFD::Ptr sock;
    {
    
    
        LOCK_GUARD(other._mtx_sock_fd);
        if (!other._sock_fd) {
    
    
            WarnL << "sockfd of src socket is null";
            return nullptr;
        }
        sock = std::make_shared<SockFD>(*(other._sock_fd), _poller);
    }
    return sock;
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::onManagerSession() {
    
    
    assert(_poller->isCurrentThread());

    onceToken token([&]() {
    
    
        _is_on_manager = true;
    }, [&]() {
    
    
        _is_on_manager = false;
    });

    // std::unordered_map<SessionHelper *, SessionHelper::Ptr> _session_map;
    for (auto &pr : _session_map) {
    
    
        //遍历时,可能触发onErr事件(也会操作_session_map)
        try {
    
    
            pr.second->session()->onManager();
        } catch (exception &ex) {
    
    
            WarnL << ex.what();
        }
    }
}

–END–
参考:
https://blog.csdn.net/yudaichenydc/article/details/127997301

猜你喜欢

转载自blog.csdn.net/cliffordl/article/details/131105855