diff --git a/include/ocpp/common/websocket/websocket_libwebsockets.hpp b/include/ocpp/common/websocket/websocket_libwebsockets.hpp index 42306ef16..c9c680263 100644 --- a/include/ocpp/common/websocket/websocket_libwebsockets.hpp +++ b/include/ocpp/common/websocket/websocket_libwebsockets.hpp @@ -70,13 +70,13 @@ class WebsocketLibwebsockets final : public WebsocketBase { void thread_deferred_callback_queue(); /// \brief Called when a TLS websocket connection is established, calls the connected callback - void on_conn_connected(); + void on_conn_connected(ConnectionData* conn_data); /// \brief Called when a TLS websocket connection is closed - void on_conn_close(); + void on_conn_close(ConnectionData* conn_data); /// \brief Called when a TLS websocket connection fails to be established - void on_conn_fail(); + void on_conn_fail(ConnectionData* conn_data); /// \brief When the connection can send data void on_conn_writable(); diff --git a/lib/ocpp/common/websocket/websocket_libwebsockets.cpp b/lib/ocpp/common/websocket/websocket_libwebsockets.cpp index 00dc5977c..9fb8e88d4 100644 --- a/lib/ocpp/common/websocket/websocket_libwebsockets.cpp +++ b/lib/ocpp/common/websocket/websocket_libwebsockets.cpp @@ -77,7 +77,7 @@ static constexpr int MESSAGE_SEND_TIMEOUT = 20; /// \brief Current connection data, sets the internal state of the struct ConnectionData { explicit ConnectionData(WebsocketLibwebsockets* owner) : - wsi(nullptr), owner(owner), is_running(true), state(EConnectionState::INITIALIZE) { + wsi(nullptr), owner(owner), is_running(true), is_close_run(false), state(EConnectionState::INITIALIZE) { } ~ConnectionData() { @@ -157,6 +157,16 @@ struct ConnectionData { return (is_running == false); } + void mark_close_executed() { + std::lock_guard lock(this->mutex); + this->is_close_run = true; + } + + bool is_close_executed() { + std::lock_guard lock(this->mutex); + return this->is_close_run; + } + public: /// \brief This should be used for a cleanup before calling the /// init functions because releasing the unique ptrs has @@ -188,6 +198,9 @@ struct ConnectionData { EVLOG_AND_THROW(std::runtime_error("Cleanup must be called before re-initing a connection!")); } + // Reset the close status + is_close_run = false; + // Causes a deadlock in callback_minimal if not reset this->lws_ctx = std::unique_ptr(lws_ctx); @@ -230,6 +243,7 @@ struct ConnectionData { std::mutex mutex; bool is_running; + bool is_close_run; EConnectionState state; private: @@ -702,7 +716,7 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptrupdate_state(EConnectionState::ERROR); - on_conn_fail(); + on_conn_fail(local_data.get()); } else { lws_client_connect_info i; memset(&i, 0, sizeof(lws_client_connect_info)); @@ -757,14 +771,15 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptrconnection_options.security_profile << "]"; if (lws_client_connect_via_info(&i) == nullptr) { EVLOG_error << "LWS connect failed!"; // This condition can occur when connecting fails to an IP address // retries need to be attempted local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); + on_conn_fail(local_data.get()); } else { local_data->init_connection(local_lws); @@ -818,7 +833,25 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptrupdate_state(EConnectionState::FINALIZED); try_reconnect = false; - EVLOG_info << "Connection reconnect attempts exhausted, exiting websocket loop"; + EVLOG_info << "Connection reconnect attempts exhausted, exiting websocket loop, passing back control " + "to the application logic"; + + // Give back control to the application + if (false == local_data->is_close_executed()) { + this->push_deferred_callback([this]() { + if (this->closed_callback) { + this->closed_callback(WebsocketCloseReason::Normal); + } else { + EVLOG_error << "Closed callback not registered!"; + } + + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); + } } } @@ -993,6 +1026,26 @@ void WebsocketLibwebsockets::close_internal(const WebsocketCloseReason code, con // Close any ongoing thread safe_close_threads(); + // Only call this if it was not called already, that can happen in the + // case where we did not had the opportunity to init/connect at least once + // and therefore the minimal_callback is not called to have the time to call + // the 'close_callback' + if (conn_data && (false == conn_data->is_close_executed())) { + this->push_deferred_callback([this, code]() { + if (this->closed_callback) { + this->closed_callback(code); + } else { + EVLOG_error << "Closed callback not registered!"; + } + + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); + } + // Release the connection data and state conn_data.reset(); this->m_is_connected = false; @@ -1001,20 +1054,6 @@ void WebsocketLibwebsockets::close_internal(const WebsocketCloseReason code, con std::lock_guard lk(this->reconnect_mutex); this->reconnect_timer_tpm.stop(); } - - this->push_deferred_callback([this, code]() { - if (this->closed_callback) { - this->closed_callback(code); - } else { - EVLOG_error << "Closed callback not registered!"; - } - - if (this->disconnected_callback) { - this->disconnected_callback(); - } else { - EVLOG_error << "Disconnected callback not registered!"; - } - }); } void WebsocketLibwebsockets::reconnect(long delay) { @@ -1275,7 +1314,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, } data->update_state(EConnectionState::ERROR); - on_conn_fail(); + on_conn_fail(data); return 0; } @@ -1297,7 +1336,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, case LWS_CALLBACK_CLIENT_ESTABLISHED: data->update_state(EConnectionState::CONNECTED); - on_conn_connected(); + on_conn_connected(data); // Attempt first write after connection lws_callback_on_writable(wsi); @@ -1313,7 +1352,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, EVLOG_info << "Websocket peer initiated close with reason: [" << close_reason << "] close code: [" << close_code << "]. Reconnecting"; data->update_state(EConnectionState::ERROR); - on_conn_fail(); + on_conn_fail(data); // Return 0 to print peer close reason return 0; @@ -1325,12 +1364,12 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, if (data->is_interupted()) { EVLOG_info << "Client closed, was requested internally, finalizing connection, not reconnecting"; data->update_state(EConnectionState::FINALIZED); - on_conn_close(); + on_conn_close(data); } else { EVLOG_info << "Client closed, was not requested internally, attempting reconnection"; // It means the server went away, attempt to reconnect data->update_state(EConnectionState::ERROR); - on_conn_fail(); + on_conn_fail(data); } break; @@ -1391,7 +1430,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, return 0; } -void WebsocketLibwebsockets::on_conn_connected() { +void WebsocketLibwebsockets::on_conn_connected(ConnectionData* conn_data) { // Called on the websocket client thread EVLOG_info << "OCPP client successfully connected to server with version: " << this->connected_ocpp_version; @@ -1417,7 +1456,7 @@ void WebsocketLibwebsockets::on_conn_connected() { }); } -void WebsocketLibwebsockets::on_conn_close() { +void WebsocketLibwebsockets::on_conn_close(ConnectionData* conn_data) { // Called on the websocket client thread EVLOG_info << "OCPP client closed connection to server"; @@ -1447,6 +1486,35 @@ void WebsocketLibwebsockets::on_conn_close() { EVLOG_error << "Disconnected callback not registered!"; } }); + + // We have polled the close + conn_data->mark_close_executed(); +} + +void WebsocketLibwebsockets::on_conn_fail(ConnectionData* conn_data) { + // Called on the websocket client thread + EVLOG_error << "OCPP client connection to server failed"; + + if (this->m_is_connected) { + this->push_deferred_callback([this]() { + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); + } + + this->m_is_connected = false; + + // Clear any irrelevant data after a DC + clear_all_queues(); + + // Notify any message senders that are waiting, since we can't send messages any more + message_queue.notify_waiting_thread(); + + // TODO: See if this is required for a faster fail + // lws_set_timeout(conn_data->get_conn(), (enum pending_timeout)1, LWS_TO_KILL_ASYNC); } void WebsocketLibwebsockets::on_conn_message(std::string&& message) { @@ -1538,32 +1606,6 @@ void WebsocketLibwebsockets::on_conn_writable() { } } -void WebsocketLibwebsockets::on_conn_fail() { - // Called on the websocket client thread - EVLOG_error << "OCPP client connection to server failed"; - - if (this->m_is_connected) { - this->push_deferred_callback([this]() { - if (this->disconnected_callback) { - this->disconnected_callback(); - } else { - EVLOG_error << "Disconnected callback not registered!"; - } - }); - } - - this->m_is_connected = false; - - // Clear any irrelevant data after a DC - clear_all_queues(); - - // Notify any message senders that are waiting, since we can't send messages any more - message_queue.notify_waiting_thread(); - - // TODO: See if this is required for a faster fail - // lws_set_timeout(conn_data->get_conn(), (enum pending_timeout)1, LWS_TO_KILL_ASYNC); -} - void WebsocketLibwebsockets::push_deferred_callback(const std::function& callback) { if (!callback) { EVLOG_error << "Attempting to push stale callback in deferred queue!";