diff --git a/Test/Sample.AsyncSocket.h b/Test/Sample.AsyncSocket.h index d15cbb9..7006f01 100644 --- a/Test/Sample.AsyncSocket.h +++ b/Test/Sample.AsyncSocket.h @@ -21,12 +21,12 @@ void example_binding(const vu::Endpoint& endpoint) printf("client %d closed\n", client.get_remote_sai().sin_port); }); - server.on(vu::AsyncSocket::SEND, [](vu::Socket& client) -> void - { - std::string s = "hello from server"; - client.send(s.data(), int(s.size())); - printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str()); - }); + //server.on(vu::AsyncSocket::SEND, [](vu::Socket& client) -> void + //{ + // std::string s = "hello from server"; + // client.send(s.data(), int(s.size())); + // printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str()); + //}); server.on(vu::AsyncSocket::RECV, [](vu::Socket& client) -> void { @@ -57,12 +57,12 @@ void example_inheritance(const vu::Endpoint& endpoint) printf("client %d closed\n", client.get_remote_sai().sin_port); } - virtual void on_send(vu::Socket& client) - { - std::string s = "hello from server"; - client.send(s.data(), int(s.size())); - printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str()); - } + //virtual void on_send(vu::Socket& client) + //{ + // std::string s = "hello from server"; + // client.send(s.data(), int(s.size())); + // printf("client %d send `%s`\n", client.get_remote_sai().sin_port, s.c_str()); + //} virtual void on_recv(vu::Socket& client) { diff --git a/include/Vutils.h b/include/Vutils.h index 7fd48ba..150e5d7 100644 --- a/include/Vutils.h +++ b/include/Vutils.h @@ -1324,7 +1324,7 @@ class Socket : public LastError VUResult vuapi accept(Handle& socket); VUResult vuapi connect(const Endpoint& endpoint); - VUResult vuapi disconnect(const shutdowns_t flags = SD_BOTH, const bool cleanup = true); + VUResult vuapi disconnect(const shutdowns_t flags = SD_BOTH, const bool cleanup = false); IResult vuapi send(const char* ptr_data, int size, const flags_t flags = MSG_NONE); IResult vuapi send(const Buffer& data, const flags_t flags = MSG_NONE); @@ -1376,11 +1376,11 @@ class AsyncSocket : public LastError enum function : uint { - CONNECT, - OPEN, - CLOSE, - RECV, - SEND, + CONNECT, // for only client side + OPEN, // for only server side + CLOSE, // for both server & client + RECV, // for both server & client + SEND, // for both server & client (should not use except required a special case) UNDEFINED, }; @@ -1404,12 +1404,16 @@ class AsyncSocket : public LastError bool vuapi running() const; /** + * for client side * https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsaeventselect?redirectedfrom=MSDN#return-value * Note: After connected (FD_CONNECT), client will be auto generated first event FD_WRITE. */ VUResult vuapi connect(const Endpoint& endpoint); VUResult vuapi connect(const std::string& address, const ushort port); + /** + * for server side + */ VUResult vuapi bind(const Endpoint& endpoint); VUResult vuapi bind(const std::string& address, const ushort port); @@ -1419,14 +1423,13 @@ class AsyncSocket : public LastError */ VUResult vuapi listen(const int maxcon = SOMAXCONN); - VUResult vuapi run(); - VUResult vuapi run_in_thread(); + VUResult vuapi run(const bool in_worker_thread = false); VUResult vuapi stop(); - IResult vuapi close(); + IResult vuapi close(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = false); - std::set vuapi get_connections() const; - VUResult vuapi disconnect_connections(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = true); + void vuapi get_connections(std::set& connections); + VUResult vuapi disconnect_connections(const Socket::shutdowns_t flags = SD_BOTH, const bool cleanup = false); IResult vuapi send(const SOCKET& connection, const char* ptr_data, int size, const Socket::flags_t flags = MSG_NONE); IResult vuapi send(const SOCKET& connection, const Buffer& data, const Socket::flags_t flags = MSG_NONE); @@ -1442,6 +1445,7 @@ class AsyncSocket : public LastError protected: void vuapi initialze(); VUResult vuapi loop(); + VUResult vuapi run_loop(); IResult vuapi do_connect(WSANETWORKEVENTS& events, SOCKET& connection); IResult vuapi do_open(WSANETWORKEVENTS& events, SOCKET& connection); @@ -1455,6 +1459,7 @@ class AsyncSocket : public LastError bool m_running; DWORD m_n_events; SOCKET m_connections[WSA_MAXIMUM_WAIT_EVENTS]; + std::recursive_mutex m_mutex_client_list; WSAEVENT m_events[WSA_MAXIMUM_WAIT_EVENTS]; fn_prototype_t m_functions[function::UNDEFINED]; std::mutex m_mutex; diff --git a/src/details/asyncsocket.cpp b/src/details/asyncsocket.cpp index a017d4b..8ae94e8 100644 --- a/src/details/asyncsocket.cpp +++ b/src/details/asyncsocket.cpp @@ -40,6 +40,8 @@ void vuapi AsyncSocket::initialze() { m_n_events = 0; + std::lock_guard lg(m_mutex_client_list); + memset(m_connections, int(INVALID_SOCKET), sizeof(m_connections)); memset(m_events, int(0), sizeof(m_events)); @@ -95,6 +97,8 @@ VUResult vuapi AsyncSocket::listen(const int maxcon) return 2; } + std::lock_guard lg(m_mutex_client_list); + m_connections[m_n_events] = m_socket.handle(); m_events[m_n_events] = event; m_n_events++; @@ -106,11 +110,11 @@ VUResult vuapi AsyncSocket::listen(const int maxcon) return result; } -IResult vuapi AsyncSocket::close() +IResult vuapi AsyncSocket::close(const Socket::shutdowns_t flags, const bool cleanup) { this->stop(); - this->disconnect_connections(); + this->disconnect_connections(flags, cleanup); if (m_thread != INVALID_HANDLE_VALUE) { @@ -148,6 +152,8 @@ VUResult vuapi AsyncSocket::connect(const Endpoint& endpoint) return 2; } + std::lock_guard lg(m_mutex_client_list); + auto result = m_socket.connect(endpoint); if (result == VU_OK) { @@ -169,34 +175,37 @@ VUResult vuapi AsyncSocket::connect(const std::string& address, const ushort por return this->connect(endpoint); } -std::set vuapi AsyncSocket::get_connections() const +void vuapi AsyncSocket::get_connections(std::set& connections) { - std::set result; + connections.clear(); if (m_socket.available()) { + std::lock_guard lg(m_mutex_client_list); + for (auto& socket : m_connections) { - if (socket == INVALID_SOCKET) + if (socket == INVALID_SOCKET) // ignore invalid socket handle { continue; } - if (m_side == side_type::SERVER && socket == m_socket.handle()) + if (m_side == side_type::SERVER && socket == m_socket.handle()) // ignore server socket handle { continue; } - result.insert(socket); + connections.insert(socket); } } - - return result; } VUResult vuapi AsyncSocket::disconnect_connections(const Socket::shutdowns_t flags, const bool cleanup) { - auto connections = this->get_connections(); + std::lock_guard lg(m_mutex_client_list); + + std::set connections; + this->get_connections(connections); for (const auto& connection : connections) { Socket socket(m_socket); @@ -217,14 +226,21 @@ static DWORD WINAPI AsyncSocket_Threading(LPVOID lpParam) return 0; } -VUResult vuapi AsyncSocket::run_in_thread() +VUResult vuapi AsyncSocket::run(const bool in_worker_thread) { - m_thread = CreateThread(nullptr, 0, LPTHREAD_START_ROUTINE(AsyncSocket_Threading), this, 0, nullptr); - m_last_error_code = GetLastError(); - return m_thread != INVALID_HANDLE_VALUE ? VU_OK : 1; + if (in_worker_thread) + { + m_thread = CreateThread(nullptr, 0, LPTHREAD_START_ROUTINE(AsyncSocket_Threading), this, 0, nullptr); + m_last_error_code = GetLastError(); + return m_thread != INVALID_HANDLE_VALUE ? VU_OK : 1; + } + else + { + return this->run_loop(); + } } -VUResult vuapi AsyncSocket::run() +VUResult vuapi AsyncSocket::run_loop() { if (!m_socket.available()) { @@ -349,6 +365,8 @@ IResult vuapi AsyncSocket::do_open(WSANETWORKEVENTS& events, SOCKET& connection) return events.iErrorCode[FD_ACCEPT_BIT]; } + std::lock_guard lg(m_mutex_client_list); + Socket::Handle obj = { 0 }; int n = static_cast(sizeof(obj.sai)); @@ -380,6 +398,8 @@ IResult vuapi AsyncSocket::do_recv(WSANETWORKEVENTS& events, SOCKET& connection) return events.iErrorCode[FD_READ_BIT]; } + std::lock_guard lg(m_mutex_client_list); + Socket socket(m_socket); socket.attach(connection); this->on_recv(socket); @@ -395,6 +415,8 @@ IResult vuapi AsyncSocket::do_send(WSANETWORKEVENTS& events, SOCKET& connection) return events.iErrorCode[FD_WRITE_BIT]; } + std::lock_guard lg(m_mutex_client_list); + Socket socket(m_socket); socket.attach(connection); this->on_send(socket); @@ -405,10 +427,15 @@ IResult vuapi AsyncSocket::do_send(WSANETWORKEVENTS& events, SOCKET& connection) IResult vuapi AsyncSocket::do_close(WSANETWORKEVENTS& events, SOCKET& connection) { - if (events.iErrorCode[FD_CLOSE_BIT] != 0) - { - return events.iErrorCode[FD_CLOSE_BIT]; - } + // TODO: In certain cases(e.g., user - mode drivers), it crashes. + // I'm not sure why, so temporarily comment out these codes. + // + // if (events.iErrorCode[FD_CLOSE_BIT] != 0) + // { + // return events.iErrorCode[FD_CLOSE_BIT]; + // } + + std::lock_guard lg(m_mutex_client_list); std::vector> in_used_connections; diff --git a/src/details/socket.cpp b/src/details/socket.cpp index a4eb1c8..2289e2f 100644 --- a/src/details/socket.cpp +++ b/src/details/socket.cpp @@ -637,7 +637,7 @@ VUResult vuapi Socket::disconnect(const shutdowns_t flags, const bool cleanup) return 1; } - if (cleanup) // clean-up all remaining data in the socket + if (cleanup) // clean-up all remaining data in the socket (does not need with the SD_BOTH flag) { vu::Buffer temp; this->recv_all(temp);