one loop per thread 思想实践在Muduo 中为新建TcpConnection 时从Event Loop Pool 挑选一个Loop 给TcpConnection 使用;
TcpServer 掌握的_loop 只负责对接新连接,而新连接将使用EventLoop 线程池Loop作为其IO Loop, 并通过Round-Robin 分配Channel 监听的事件;
1. EventLoopThreadPool
class EventLoopThreadPool : boost::noncopyable {
public:
EventLoopThreadPool(EventLoop *baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { _numThreads = numThreads; }
void start();
EventLoop *getNextLoop();
private:
EventLoop *_baseLoop;
bool _started;
int _numThreads;
int _next;
std::vector<EventLoopThread *> _threads;
std::vector<EventLoop *> _loops;
};
开启多线程服务还需要在TcpServer::start() 中
void EventLoopThreadPool::start() {
assert(!_started);
_baseLoop->assertInLoopThread();
_started = true;
for (int i = 0; i < _numThreads; ++i) {
EventLoopThread *t = new EventLoopThread;
_threads.push_back(t);
_loops.push_back(t->startLoop());
}
}
_threads 保存 EventLoopThread 线程体
_loops 保存EventLoopThread 产生的Loop(只要向此Loop 添加Channel 即可监听事件,因为此线程的Loop 已经开启Loop 事件循环;
2. TcpServer
TcpServer::newConnection()
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) {
_loop->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof(buf), "#%d", _nextConnId);
++_nextConnId;
std::string connName = _name + buf;
LOG_INFO << "TcpServer::newConnection [" << _name << "] - new connection ["
<< connName << "] from " << peerAddr.toHostPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
+ EventLoop *ioLoop = _threadPool->getNextLoop();
TcpConnectionPtr conn(
! new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
_connections[connName] = conn;
conn->setConnectionCallback(_connectionCallback);
conn->setMessageCallback(_messageCallback);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
conn->setWriteCompleteCallback(_writeCompleteCallback);
! ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
决定新事件分发的Loop 会在获取新连接时从EventLoop 线程池子选取,而单线程的程序中,此Loop 默认为baseLoop(即TcpServer 中成员变量 _loop);
removeConnection 原来版本拆分为removeConnection And removeConnectionInLoop,因为TcpConnection 会在自己的ioLoop 线程中调用removeConnection,因此需要将其转移至TcpServer _loop 线程中;而TcpConnection 销毁需要在ioLoop 中完成,因此removeConnectionInLoop 中最后仍然是ioLoop queueInLoop connectDestroyed(ioLoop 掌管连接的建立和摧毁,在建立和摧毁都要通过TcpConnectionPtr 从EventLoopThreadPool 获取的那一个Loop ,而_loop 掌管TcpServer 中对连接的分配和其中Channel 分发监听);
TcpConnection 中Channel 掌控着对端的读写事件;
void TcpServer::removeConnection(const TcpConnectionPtr &conn) {
+ _loop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
+}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn) {
_loop->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << _name
<< "] - connection " << conn->name();
size_t n = _connections.erase(conn->name());
assert(n == 1);
(void)n;
+ EventLoop *ioLoop = conn->getLoop();
! ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。