From cf901b008229a489752ed9507d9ad2d378a477c7 Mon Sep 17 00:00:00 2001 From: Ion Agorria Date: Tue, 2 Jan 2024 00:14:12 +0100 Subject: [PATCH] net: Rework code a bit --- Source/Network/CMakeLists.txt | 1 + Source/Network/NetComEventBuffer.cpp | 36 --- Source/Network/NetConnection.cpp | 233 +++----------------- Source/Network/NetConnection.h | 39 +++- Source/Network/NetConnectionHandler.cpp | 211 ++++++++++++++++++ Source/Network/P2P_interface.h | 10 +- Source/Network/P2P_interface2Th.cpp | 4 +- Source/Network/P2P_interface2Th_NetConn.cpp | 46 +--- 8 files changed, 292 insertions(+), 288 deletions(-) create mode 100644 Source/Network/NetConnectionHandler.cpp diff --git a/Source/Network/CMakeLists.txt b/Source/Network/CMakeLists.txt index df0af3560..061f413c6 100644 --- a/Source/Network/CMakeLists.txt +++ b/Source/Network/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(Network STATIC P2P_interface2Th_Host.cpp P2P_interfaceAnyTh.cpp NetConnection.cpp + NetConnectionHandler.cpp ) target_include_directories(Network PRIVATE diff --git a/Source/Network/NetComEventBuffer.cpp b/Source/Network/NetComEventBuffer.cpp index e4580a1ff..228b9ddf3 100644 --- a/Source/Network/NetComEventBuffer.cpp +++ b/Source/Network/NetComEventBuffer.cpp @@ -47,32 +47,6 @@ void InOutNetComBuffer::reset() event_ID = NETCOM_ID_NONE; } - -//Out -int InOutNetComBuffer::send(PNetCenter& conn, NETID destination) -{ -/// Тест -/// unsigned int size=filled_size; -/// unsigned int msize=size; -/// while(msize>0){ -/// int sizeEvent=*((event_size_t*)(&buf[size-msize])) + sizeof(size_of_event); -/// xassert(sizeEvent<=msize); -/// msize-=sizeEvent; -/// } -/// xassert(msize==0); - - - unsigned int sent=0; - while(sent < filled_size){ // implicit == // подразумевается == - sent+=conn.Send(buf+sent, filled_size-sent, destination); - }; - xassert(filled_size==sent); - init(); - reset(); - byte_sending+=sent; - return sent; -} - //Out void InOutNetComBuffer::putNetCommand(const netCommandGeneral* event) { @@ -113,16 +87,6 @@ void InOutNetComBuffer::clearBufferOfTheProcessedCommands(void) bool InOutNetComBuffer::putBufferPacket(char* buf, unsigned int size) { -/// Test -/// unsigned int msize=size; -/// unsigned char* mbuf=buf; -/// while(msize>0){ -/// int sizeEvent=*((event_size_t*)(&buf[size-msize])) + sizeof(size_of_event); -/// xassert(sizeEvent<=msize); -/// msize-=sizeEvent; -/// } -/// xassert(msize==0); - clearBufferOfTheProcessedCommands(); if(length()-filled_size < size) { xassert(0 && "Net input buffer is small."); diff --git a/Source/Network/NetConnection.cpp b/Source/Network/NetConnection.cpp index 4e2f40302..cb71af80b 100644 --- a/Source/Network/NetConnection.cpp +++ b/Source/Network/NetConnection.cpp @@ -126,16 +126,37 @@ void NetConnection::close(bool error) { } } +int NetConnection::send_raw(const void* buffer, uint32_t len) { + if (!has_socket()) { + return -1; + } + if (buffer == nullptr) { + ErrH.Abort("Got null buffer in send_raw"); + } + + int sent = 0; + while (sent < len) { + int amount = SDLNet_TCP_Send(socket, static_cast(buffer) + sent, static_cast(len) - sent); + if (amount < 0) { + fprintf(stderr, "TCP send data failed result %d sent %d len %d %s\n", amount, sent, len, SDLNet_GetError()); + return -3; + } + sent += amount; + } + + return sent; +} + int NetConnection::send(const XBuffer& data) { - return send(data.buf, data.length()); + return send(reinterpret_cast(data.buf), data.length()); } -int NetConnection::send(const void* buffer, uint32_t len) { - if (!has_socket()) { - return -1; +int NetConnection::send(const uint8_t* buffer, uint32_t len) { + if (buffer == nullptr) { + ErrH.Abort("Got null buffer in send"); } uint16_t flags = 0; - XBuffer sending_buffer(const_cast(buffer), len); + XBuffer sending_buffer(const_cast(buffer), len); sending_buffer.set(len); //Compression, first thing to do to calculate actual length @@ -155,7 +176,7 @@ int NetConnection::send(const void* buffer, uint32_t len) { fprintf(stderr, "TCP send data too big len %d\n", msg_size); return -2; } - + //Assemble header and data XBuffer xbuf(msg_size); xbuf < NC_HEADER_MAGIC; @@ -166,23 +187,15 @@ int NetConnection::send(const void* buffer, uint32_t len) { xbuf.set(8); xbuf.write(sending_buffer, len); - //Send buffer - int sent = 0; - while (sent < msg_size) { - int amount = SDLNet_TCP_Send(socket, xbuf.buf + sent, msg_size - sent); - if (amount < 0) { - fprintf(stderr, "TCP send data failed result %d sent %d msg %d len %d %s\n", amount, sent, msg_size, len, SDLNet_GetError()); - return -3; - } - sent += amount; - } + int sent = send_raw(xbuf.buf, msg_size); + if (sent != msg_size) { fprintf(stderr, "TCP send length mismatch sent %d msg %d len %d %s\n", sent, msg_size, len, SDLNet_GetError()); close_error(); return -4; } - - return static_cast(len); + + return sent; } int NetConnection::receive_raw(void* buffer, uint32_t maxlen, int timeout) { @@ -190,7 +203,9 @@ int NetConnection::receive_raw(void* buffer, uint32_t maxlen, int timeout) { if (!has_socket()) { return -1; } - xassert(buffer != nullptr); + if (buffer == nullptr) { + ErrH.Abort("Got null buffer in receive_raw"); + } if (0 <= timeout) { int n = SDLNet_CheckSockets(socket_set, timeout); @@ -211,7 +226,7 @@ int NetConnection::receive_raw(void* buffer, uint32_t maxlen, int timeout) { int received = 0; while (received < maxlen) { - int amount = SDLNet_TCP_Recv(socket, buffer, static_cast(maxlen)); + int amount = SDLNet_TCP_Recv(socket, static_cast(buffer) + received, static_cast(maxlen) - received); if (amount <= 0) { fprintf(stderr, "TCP recv failed amount %d maxlen %d %s\n", amount, maxlen, SDLNet_GetError()); close_error(); @@ -300,181 +315,3 @@ int NetConnection::receive(XBuffer& buffer, int timeout) { buffer.set(amount); return amount; } - -///////// NetConnectionHandler ////////////// - -NetConnectionHandler::NetConnectionHandler(PNetCenter* center): net_center(center) { - stopConnections(); -} - -NetConnectionHandler::~NetConnectionHandler() { - reset(); - net_center = nullptr; -} - -void NetConnectionHandler::reset() { - stopListening(); - stopConnections(); -} - -NETID NetConnectionHandler::acceptConnection() { - NETID netid = NETID_NONE; - if (accept_socket) { - TCPsocket incoming_socket = SDLNet_TCP_Accept(accept_socket); - if(!incoming_socket) { - SDLNet_SetError(nullptr); - } else { - NetConnection* incoming = nullptr; - - //Find any closed connection in array - bool reused = false; - for (auto& entry : connections) { - if (entry.second->is_closed()) { - incoming = entry.second; - incoming->set_socket(incoming_socket); - reused = true; - break; - } - } - - //Check if we can add it - if (!reused) { - incoming = newConnectionFromSocket(incoming_socket, false); - } - - //incoming may be deallocated if index is not available, so get it before - netid = incoming->netid; - - net_center->handleIncomingClientConnection(incoming); - } - } - - return netid; -} - -void NetConnectionHandler::pollConnections() { - for (auto& entry : connections) { - NetConnection* connection = entry.second; - switch (connection->state) { - case NC_STATE_ACTIVE: { - size_t total_recv = 0; - while (total_recv < PERIMETER_MESSAGE_MAX_SIZE * 10) { - InputPacket* packet = new InputPacket(connection->netid); - int len = connection->receive(*packet); - if (0 < len) { - net_center->m_InputPacketList.push_back(packet); - total_recv += len; - } else { - delete packet; - break; - } - } - break; - } - case NC_STATE_ERROR_PENDING: - case NC_STATE_CLOSE_PENDING: { - net_center->DeleteClient(connection->netid, false); - //Mark it as closed, since we processed the client - connection->state = NC_STATE_CLOSED; - break; - } - default: - break; - } - } -} - -void NetConnectionHandler::stopConnections() { - for (auto& entry : connections) { - NetConnection* conn = entry.second; - conn->close(); - delete conn; - } - connections.clear(); -} - -const NetConnectionHandler::NetConnectionMap& NetConnectionHandler::getConnections() const { - return connections; -} - -NetConnection* NetConnectionHandler::getConnection(NETID netid) const { - NetConnection* conn = nullptr; - if (connections.count(netid)) { - NetConnection* candidate = connections.at(netid); - if (!candidate->is_closed()) { - conn = candidate; - } - } - return conn; -} - -bool NetConnectionHandler::startListening(uint16_t port) { - stopListening(); - stopConnections(); - - if (gb_RenderDevice->GetRenderSelection() == DEVICE_HEADLESS) { - max_connections = NETWORK_PLAYERS_MAX; - } else { - //Remove one since host is player too - max_connections = NETWORK_PLAYERS_MAX - 1; - } - - IPaddress addr; - addr.host = INADDR_ANY; - SDLNet_Write16(port, &addr.port); - - accept_socket = SDLNet_TCP_Open(&addr); - if (accept_socket == nullptr) { - fprintf(stderr, "TCP listen failed on port %d error %s\n", port, SDLNet_GetError()); - return false; - } else { - LogMsg("TCP listening on port %d\n", port); - } - return true; -} - -void NetConnectionHandler::stopListening() { - if (accept_socket) { - LogMsg("TCP listen socket closed\n"); - SDLNet_TCP_Close(accept_socket); - accept_socket = nullptr; - } -} - -NetConnection* NetConnectionHandler::startConnection(NetAddress* address) { - stopListening(); - stopConnections(); - - max_connections = 1; - - TCPsocket socket = SDLNet_TCP_Open(&address->addr); - if (!socket) { - fprintf(stderr, "TCP socket open failed address %s error %s\n", address->getString().c_str(), SDLNet_GetError()); - return nullptr; - } - - NetConnection* connection = newConnectionFromSocket(socket, true); - if (connection->netid != NETID_NONE) { - return connection; - } else { - fprintf(stderr, "Error allocating new connection\n"); - connection->close(); - delete connection; - return nullptr; - } -} - -NetConnection* NetConnectionHandler::newConnectionFromSocket(TCPsocket socket, bool host) { - NetConnection* connection = new NetConnection(socket); - if (connections.size() < max_connections) { - NETID netid; - if (host) { - netid = NETID_HOST; - } else { - netid = NETID_HOST + connections.size() + 1; - } - connection->netid = netid; - connections.insert_or_assign(netid, connection); - } - return connection; -} diff --git a/Source/Network/NetConnection.h b/Source/Network/NetConnection.h index 4556aebd7..305be4022 100644 --- a/Source/Network/NetConnection.h +++ b/Source/Network/NetConnection.h @@ -5,6 +5,7 @@ #define PERIMETER_IP_HOST_DEFAULT "127.0.0.1" const uint32_t PERIMETER_MESSAGE_MAX_SIZE = 32 * 1024 * 1024; const uint32_t PERIMETER_MESSAGE_COMPRESSION_SIZE = 128*1024; +///Specifies this message contains compressed payload const uint16_t PERIMETER_MESSAGE_FLAG_COMPRESSED = 1 << 0; ///How many milliseconds extra to wait for the data part once getting header const int RECV_DATA_AFTER_HEADER_TIMEOUT = 10000; @@ -14,9 +15,9 @@ const int CONNECTION_HANDSHAKE_TIMEOUT = 10000; #include //Special IDs -#define NETID_NONE 0 -#define NETID_HOST 1 -#define NETID_ALL 0xFF +const uint64_t NETID_NONE = 0; +const uint64_t NETID_HOST = 1; +const uint64_t NETID_ALL = -1; //Used to identify player connection typedef uint64_t NETID; @@ -68,12 +69,22 @@ class NetConnection { SDLNet_SocketSet socket_set = nullptr; TCPsocket socket = nullptr; + /** + * Sends TCP data into connection + * Closes connection upon error + * + * @param buffer pointer of data to send over wire + * @param len amount of data to read from pointer + * @return amount of bytes sent, 0 if none, <0 if error or closed + */ + int send_raw(const void* buffer, uint32_t len); + /** * Receives TCP data from connection if any * Closes connection upon error * * @param buffer pointer of data to write the received data - * @param maxlen max amount of data expected to read + * @param maxlen max amount of data expected to read into buffer * @param timeout 0 for no wait, amount of ms to wait until data is available * @return amount of bytes received, 0 if none, <0 if error or closed */ @@ -140,7 +151,7 @@ class NetConnection { * @param len amount of data to send * @return amount of bytes sent, <0 if error or closed */ - int send(const void* buffer, uint32_t len); + int send(const uint8_t* buffer, uint32_t len); /** * Receives data from connection if any @@ -165,11 +176,12 @@ class NetConnectionHandler { size_t max_connections = 0; PNetCenter* net_center = nullptr; TCPsocket accept_socket = nullptr; - typedef std::unordered_map NetConnectionMap; - NetConnectionMap connections; + std::unordered_map connections; + void stopListening(); + void stopConnections(); NetConnection* newConnectionFromSocket(TCPsocket socket, bool host); - + public: explicit NetConnectionHandler(PNetCenter* center); ~NetConnectionHandler(); @@ -186,14 +198,17 @@ class NetConnectionHandler { /** Polls the connections */ void pollConnections(); + /** + * Sends buffer data to connection by NETID + * @return amount of data sent in total, this can be several times if is NETID_ALL + */ + size_t sendToNETID(const uint8_t* buffer, size_t size, NETID destination); + //Connection stuff - void stopConnections(); - const NetConnectionMap& getConnections() const; NetConnection* getConnection(NETID netid) const; - //Listening functions + //Host related functions bool startListening(uint16_t port); - void stopListening(); //Single client connection NetConnection* startConnection(NetAddress* address); diff --git a/Source/Network/NetConnectionHandler.cpp b/Source/Network/NetConnectionHandler.cpp new file mode 100644 index 000000000..7cc465ae4 --- /dev/null +++ b/Source/Network/NetConnectionHandler.cpp @@ -0,0 +1,211 @@ +#include "NetIncludes.h" +#include "NetConnection.h" +#include "crc.h" +#include "P2P_interface.h" +#include "NetConnectionAux.h" + +NetConnectionHandler::NetConnectionHandler(PNetCenter* center): net_center(center) { + stopConnections(); +} + +NetConnectionHandler::~NetConnectionHandler() { + reset(); + net_center = nullptr; +} + +void NetConnectionHandler::reset() { + stopListening(); + stopConnections(); +} + +NETID NetConnectionHandler::acceptConnection() { + NETID netid = NETID_NONE; + if (accept_socket) { + TCPsocket incoming_socket = SDLNet_TCP_Accept(accept_socket); + if(!incoming_socket) { + SDLNet_SetError(nullptr); + } else { + NetConnection* incoming = nullptr; + + //Find any closed connection in array + bool reused = false; + for (auto& entry : connections) { + if (entry.second->is_closed()) { + incoming = entry.second; + incoming->set_socket(incoming_socket); + reused = true; + break; + } + } + + //Check if we can add it + if (!reused) { + incoming = newConnectionFromSocket(incoming_socket, false); + } + + //incoming may be deallocated if index is not available, so get it before + netid = incoming->netid; + + net_center->handleIncomingClientConnection(incoming); + } + } + + return netid; +} + +void NetConnectionHandler::pollConnections() { + for (auto& entry : connections) { + NetConnection* connection = entry.second; + switch (connection->state) { + case NC_STATE_ACTIVE: { + size_t total_recv = 0; + while (total_recv < PERIMETER_MESSAGE_MAX_SIZE * 10) { + InputPacket* packet = new InputPacket(connection->netid); + int len = connection->receive(*packet); + if (0 < len) { + net_center->m_InputPacketList.push_back(packet); + total_recv += len; + } else { + delete packet; + break; + } + } + break; + } + case NC_STATE_ERROR_PENDING: + case NC_STATE_CLOSE_PENDING: { + net_center->DeleteClient(connection->netid, false); + //Mark it as closed, since we processed the client + connection->state = NC_STATE_CLOSED; + break; + } + default: + break; + } + } +} + +void NetConnectionHandler::stopConnections() { + for (auto& entry : connections) { + NetConnection* conn = entry.second; + conn->close(); + delete conn; + } + connections.clear(); +} + +NetConnection* NetConnectionHandler::getConnection(NETID netid) const { + NetConnection* conn = nullptr; + if (connections.count(netid)) { + NetConnection* candidate = connections.at(netid); + if (!candidate->is_closed()) { + conn = candidate; + } + } + return conn; +} + +bool NetConnectionHandler::startListening(uint16_t port) { + reset(); + + if (gb_RenderDevice->GetRenderSelection() == DEVICE_HEADLESS) { + max_connections = NETWORK_PLAYERS_MAX; + } else { + //Remove one since host is player too + max_connections = NETWORK_PLAYERS_MAX - 1; + } + + IPaddress addr; + addr.host = INADDR_ANY; + SDLNet_Write16(port, &addr.port); + + accept_socket = SDLNet_TCP_Open(&addr); + if (accept_socket == nullptr) { + fprintf(stderr, "TCP listen failed on port %d error %s\n", port, SDLNet_GetError()); + return false; + } else { + LogMsg("TCP listening on port %d\n", port); + } + return true; +} + +void NetConnectionHandler::stopListening() { + if (accept_socket) { + LogMsg("TCP listen socket closed\n"); + SDLNet_TCP_Close(accept_socket); + accept_socket = nullptr; + } +} + +NetConnection* NetConnectionHandler::startConnection(NetAddress* address) { + reset(); + + max_connections = 1; + + TCPsocket socket = SDLNet_TCP_Open(&address->addr); + if (!socket) { + fprintf(stderr, "TCP socket open failed address %s error %s\n", address->getString().c_str(), SDLNet_GetError()); + return nullptr; + } + + NetConnection* connection = newConnectionFromSocket(socket, true); + if (connection->netid != NETID_NONE) { + return connection; + } else { + fprintf(stderr, "Error allocating new connection\n"); + connection->close(); + delete connection; + return nullptr; + } +} + +NetConnection* NetConnectionHandler::newConnectionFromSocket(TCPsocket socket, bool host) { + NetConnection* connection = new NetConnection(socket); + if (connections.size() < max_connections) { + NETID netid; + if (host) { + netid = NETID_HOST; + } else { + netid = NETID_HOST + connections.size() + 1; + } + connection->netid = netid; + connections.insert_or_assign(netid, connection); + } + return connection; +} + + +size_t SendBufferToConnection(const uint8_t* buffer, size_t size, NetConnection* connection) { + int retries = 5; + if (!connection || !connection->is_active()) { + return 0; + } + while (0 < retries) { + int sent = connection->send(buffer, size); + if (0 < sent) { + return size; + } else if (!connection->is_active()) { + fprintf(stderr, "SendBufferToConnection error sending %lu sent %d to %lu closed\n", size, sent, connection->netid); + break; + } else { + fprintf(stderr, "SendBufferToConnection error sending %lu sent %d to %lu retry %d\n", size, sent, connection->netid, retries); + retries--; + } + } + return 0; +} + +size_t NetConnectionHandler::sendToNETID(const uint8_t* buffer, size_t size, NETID destination) { + size_t sent = 0; + if (destination == NETID_NONE) { + fprintf(stderr, "Discarding sending to NETID_NONE\n"); + } else if (destination == NETID_ALL) { + for (auto& conn : connections) { + sent += SendBufferToConnection(buffer, size, conn.second); + } + } else { + sent = SendBufferToConnection(buffer, size, getConnection(destination)); + } + + return sent; +} diff --git a/Source/Network/P2P_interface.h b/Source/Network/P2P_interface.h index 17f2bd68a..bcc73118a 100644 --- a/Source/Network/P2P_interface.h +++ b/Source/Network/P2P_interface.h @@ -519,8 +519,8 @@ class PNetCenter { NETID m_localNETID; bool flag_connected; - void FinishGame(void); - void StartFindHost(void); + void FinishGame(); + void StartFindHost(); bool Init(); bool ServerStart(); @@ -531,11 +531,11 @@ class PNetCenter { bool Connect(); bool isConnected() const; - size_t Send(const char* buffer, size_t size, NETID destination); + size_t SendNetBuffer(InOutNetComBuffer* netbuffer, NETID destination); unsigned int flag_LockIputPacket; - void LockInputPacket(void); - void UnLockInputPacket(void); + void LockInputPacket(); + void UnLockInputPacket(); void ClearInputPacketList(); std::list m_InputPacketList; diff --git a/Source/Network/P2P_interface2Th.cpp b/Source/Network/P2P_interface2Th.cpp index 66a83a20b..d7c54e8de 100644 --- a/Source/Network/P2P_interface2Th.cpp +++ b/Source/Network/P2P_interface2Th.cpp @@ -306,7 +306,7 @@ void PNetCenter::SendEvent(netCommandGeneral& event, NETID destination) { if (destination != m_localNETID ) { out_HostBuf.putNetCommand(&event); - out_HostBuf.send(*this, destination); + SendNetBuffer(&out_HostBuf, destination); } if( (destination == m_localNETID) || (destination == NETID_ALL) ){ @@ -1125,7 +1125,7 @@ void PNetCenter::UnLockInputPacket() { void PNetCenter::ClientPredReceiveQuant() { - if(!out_ClientBuf.isEmpty()) out_ClientBuf.send(*this, m_hostNETID); + if(!out_ClientBuf.isEmpty()) SendNetBuffer(&out_ClientBuf, m_hostNETID); if(flag_LockIputPacket) return; //return 0; int cnt=0; diff --git a/Source/Network/P2P_interface2Th_NetConn.cpp b/Source/Network/P2P_interface2Th_NetConn.cpp index 6aa878689..e6812c218 100644 --- a/Source/Network/P2P_interface2Th_NetConn.cpp +++ b/Source/Network/P2P_interface2Th_NetConn.cpp @@ -177,43 +177,19 @@ bool PNetCenter::isConnected() const { return flag_connected; } -void sendToConnection(const char* buffer, size_t size, NetConnection* connection) { - int retries = 5; - if (!connection || !connection->is_active()) { - return; - } - while (0 < retries) { - int sent = connection->send(buffer, size); - if (0 < sent) { - return; - } else if (!connection->is_active()) { - fprintf(stderr, "sendToConnection error sending %lu sent %d to %lu closed\n", size, sent, connection->netid); - break; - } else { - fprintf(stderr, "sendToConnection error sending %lu sent %d to %lu retry %d\n", size, sent, connection->netid, retries); - retries--; - } - } -} - -size_t PNetCenter::Send(const char* buffer, size_t size, NETID destination) { - if (destination == NETID_ALL) { - for (auto& conn : connectionHandler.getConnections()) { - sendToConnection(buffer, size, conn.second); - } +//Out +size_t PNetCenter::SendNetBuffer(InOutNetComBuffer* netbuffer, NETID destination) { + size_t sent = 0; + if (destination == m_localNETID || (destination != m_hostNETID && !isHost())) { + fprintf(stderr, "Discarding sending %lu -> %lu\n", m_localNETID, destination); + //xassert(0); } else { - if (destination == m_localNETID - || (destination != m_hostNETID && !isHost()) - || destination == NETID_NONE - ) { - fprintf(stderr, "Discarding sending %lu bytes to %lu from %lu\n", size, destination, m_localNETID); - //xassert(0); - return size; - } - sendToConnection(buffer, size, connectionHandler.getConnection(destination)); + sent = connectionHandler.sendToNETID(reinterpret_cast(netbuffer->buf), netbuffer->filled_size, destination); } - - return size; + netbuffer->init(); + netbuffer->reset(); + netbuffer->byte_sending += sent; + return sent; } void PNetCenter::handleIncomingClientConnection(NetConnection* connection) {