Skip to content

Commit

Permalink
Tcp/Udp服务器支持单线程模式
Browse files Browse the repository at this point in the history
  • Loading branch information
xia-chu committed Jun 21, 2024
1 parent 5144e2a commit 79c10fe
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 45 deletions.
31 changes: 17 additions & 14 deletions src/Network/TcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ INSTANCE_IMP(SessionMap)
StatisticImp(TcpServer)

TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) {
_multi_poller = !poller;
setOnCreateSocket(nullptr);
}

Expand Down Expand Up @@ -82,7 +83,7 @@ TcpServer::Ptr TcpServer::onCreatServer(const EventPoller::Ptr &poller) {
Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) {
assert(_poller->isCurrentThread());
//此处改成自定义获取poller对象,防止负载不均衡
return createSocket(EventPollerPool::Instance().getPoller(false));
return createSocket(_multi_poller ? EventPollerPool::Instance().getPoller(false) : _poller);
}

void TcpServer::cloneFrom(const TcpServer &that) {
Expand Down Expand Up @@ -191,19 +192,21 @@ void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog
return true;
}, _poller);

EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
EventPoller::Ptr poller = static_pointer_cast<EventPoller>(executor);
if (poller == _poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
if (_multi_poller) {
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
EventPoller::Ptr poller = static_pointer_cast<EventPoller>(executor);
if (poller == _poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
}

if (!_socket->listen(port, host.c_str(), backlog)) {
// 创建tcp监听失败,可能是由于端口占用或权限问题
Expand Down
1 change: 1 addition & 0 deletions src/Network/TcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class TcpServer : public Server {
void setupEvent();

private:
bool _multi_poller;
bool _is_on_manager = false;
bool _main_server = true;
std::weak_ptr<TcpServer> _parent;
Expand Down
69 changes: 38 additions & 31 deletions src/Network/UdpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) {
}

UdpServer::UdpServer(const EventPoller::Ptr &poller) : Server(poller) {
_multi_poller = !poller;
setOnCreateSocket(nullptr);
}

Expand Down Expand Up @@ -87,20 +88,22 @@ void UdpServer::start_l(uint16_t port, const std::string &host) {
return false;
}, _poller);

//clone server至不同线程,让udp server支持多线程
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
auto poller = std::static_pointer_cast<EventPoller>(executor);
if (poller == _poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
if (_multi_poller) {
// clone server至不同线程,让udp server支持多线程
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
auto poller = std::static_pointer_cast<EventPoller>(executor);
if (poller == _poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
}

if (!_socket->bindUdpSock(port, host.c_str())) {
// udp 绑定端口失败, 可能是由于端口占用或权限问题
Expand Down Expand Up @@ -193,24 +196,28 @@ void UdpServer::onManagerSession() {
//拷贝map,防止遍历时移除对象
copy_map = std::make_shared<std::unordered_map<PeerIdType, SessionHelper::Ptr> >(*_session_map);
}
EventPollerPool::Instance().for_each([copy_map](const TaskExecutor::Ptr &executor) {
auto poller = std::static_pointer_cast<EventPoller>(executor);
poller->async([copy_map]() {
for (auto &pr : *copy_map) {
auto &session = pr.second->session();
if (!session->getPoller()->isCurrentThread()) {
//该session不归属该poller管理
continue;
}
try {
// UDP 会话需要处理超时
session->onManager();
} catch (exception &ex) {
WarnL << "Exception occurred when emit onManager: " << ex.what();
}
auto lam = [copy_map]() {
for (auto &pr : *copy_map) {
auto &session = pr.second->session();
if (!session->getPoller()->isCurrentThread()) {
// 该session不归属该poller管理
continue;
}
try {
// UDP 会话需要处理超时
session->onManager();
} catch (exception &ex) {
WarnL << "Exception occurred when emit onManager: " << ex.what();
}
}
};
if (_multi_poller){
EventPollerPool::Instance().for_each([lam](const TaskExecutor::Ptr &executor) {
std::static_pointer_cast<EventPoller>(executor)->async(lam);
});
});
} else {
lam();
}
}

SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
Expand All @@ -228,7 +235,7 @@ SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id

SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
// 此处改成自定义获取poller对象,防止负载不均衡
auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len);
auto socket = createSocket(_multi_poller ? EventPollerPool::Instance().getPoller(false) : _poller, buf, addr, addr_len);
if (!socket) {
//创建socket失败,本次onRead事件收到的数据直接丢弃
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/Network/UdpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class UdpServer : public Server {

private:
bool _cloned = false;
bool _multi_poller;
Socket::Ptr _socket;
std::shared_ptr<Timer> _timer;
onCreateSocket _on_create_socket;
Expand Down

0 comments on commit 79c10fe

Please sign in to comment.