Skip to content

Commit

Permalink
Fixed a closed callback being called twice
Browse files Browse the repository at this point in the history
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
AssemblyJohn committed Dec 6, 2024
1 parent d258bb6 commit dd24ab1
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 55 deletions.
6 changes: 3 additions & 3 deletions include/ocpp/common/websocket/websocket_libwebsockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ class WebsocketLibwebsockets final : public WebsocketBase {
void thread_deferred_callback_queue();

/// \brief Called when a TLS websocket connection is established, calls the connected callback
void on_conn_connected();
void on_conn_connected(ConnectionData* conn_data);

/// \brief Called when a TLS websocket connection is closed
void on_conn_close();
void on_conn_close(ConnectionData* conn_data);

/// \brief Called when a TLS websocket connection fails to be established
void on_conn_fail();
void on_conn_fail(ConnectionData* conn_data);

/// \brief When the connection can send data
void on_conn_writable();
Expand Down
146 changes: 94 additions & 52 deletions lib/ocpp/common/websocket/websocket_libwebsockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static constexpr int MESSAGE_SEND_TIMEOUT = 20;
/// \brief Current connection data, sets the internal state of the
struct ConnectionData {
explicit ConnectionData(WebsocketLibwebsockets* owner) :
wsi(nullptr), owner(owner), is_running(true), state(EConnectionState::INITIALIZE) {
wsi(nullptr), owner(owner), is_running(true), is_close_run(false), state(EConnectionState::INITIALIZE) {
}

~ConnectionData() {
Expand Down Expand Up @@ -157,6 +157,16 @@ struct ConnectionData {
return (is_running == false);
}

void mark_close_executed() {
std::lock_guard lock(this->mutex);
this->is_close_run = true;
}

bool is_close_executed() {
std::lock_guard lock(this->mutex);
return this->is_close_run;
}

public:
/// \brief This should be used for a cleanup before calling the
/// init functions because releasing the unique ptrs has
Expand Down Expand Up @@ -188,6 +198,9 @@ struct ConnectionData {
EVLOG_AND_THROW(std::runtime_error("Cleanup must be called before re-initing a connection!"));
}

// Reset the close status
is_close_run = false;

// Causes a deadlock in callback_minimal if not reset
this->lws_ctx = std::unique_ptr<lws_context>(lws_ctx);

Expand Down Expand Up @@ -230,6 +243,7 @@ struct ConnectionData {
std::mutex mutex;

bool is_running;
bool is_close_run;
EConnectionState state;

private:
Expand Down Expand Up @@ -702,7 +716,7 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
EVLOG_error << "Could not initialize connection options.";

local_data->update_state(EConnectionState::ERROR);
on_conn_fail();
on_conn_fail(local_data.get());
} else {
lws_client_connect_info i;
memset(&i, 0, sizeof(lws_client_connect_info));
Expand Down Expand Up @@ -757,14 +771,15 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
// Print data for debug
EVLOG_info << "LWS connect with info "
<< "port: [" << i.port << "] address: [" << i.address << "] path: [" << i.path << "] protocol: ["
<< i.protocol << "]";
<< i.protocol << "]"
<< " security profile: [" << this->connection_options.security_profile << "]";

if (lws_client_connect_via_info(&i) == nullptr) {
EVLOG_error << "LWS connect failed!";
// This condition can occur when connecting fails to an IP address
// retries need to be attempted
local_data->update_state(EConnectionState::ERROR);
on_conn_fail();
on_conn_fail(local_data.get());
} else {
local_data->init_connection(local_lws);

Expand Down Expand Up @@ -818,7 +833,25 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
local_data->update_state(EConnectionState::FINALIZED);
try_reconnect = false;

EVLOG_info << "Connection reconnect attempts exhausted, exiting websocket loop";
EVLOG_info << "Connection reconnect attempts exhausted, exiting websocket loop, passing back control "
"to the application logic";

// Give back control to the application
if (false == local_data->is_close_executed()) {
this->push_deferred_callback([this]() {
if (this->closed_callback) {
this->closed_callback(WebsocketCloseReason::Normal);
} else {
EVLOG_error << "Closed callback not registered!";
}

if (this->disconnected_callback) {
this->disconnected_callback();
} else {
EVLOG_error << "Disconnected callback not registered!";
}
});
}
}
}

Expand Down Expand Up @@ -993,6 +1026,26 @@ void WebsocketLibwebsockets::close_internal(const WebsocketCloseReason code, con
// Close any ongoing thread
safe_close_threads();

// Only call this if it was not called already, that can happen in the
// case where we did not had the opportunity to init/connect at least once
// and therefore the minimal_callback is not called to have the time to call
// the 'close_callback'
if (conn_data && (false == conn_data->is_close_executed())) {
this->push_deferred_callback([this, code]() {
if (this->closed_callback) {
this->closed_callback(code);
} else {
EVLOG_error << "Closed callback not registered!";
}

if (this->disconnected_callback) {
this->disconnected_callback();
} else {
EVLOG_error << "Disconnected callback not registered!";
}
});
}

// Release the connection data and state
conn_data.reset();
this->m_is_connected = false;
Expand All @@ -1001,20 +1054,6 @@ void WebsocketLibwebsockets::close_internal(const WebsocketCloseReason code, con
std::lock_guard<std::mutex> lk(this->reconnect_mutex);
this->reconnect_timer_tpm.stop();
}

this->push_deferred_callback([this, code]() {
if (this->closed_callback) {
this->closed_callback(code);
} else {
EVLOG_error << "Closed callback not registered!";
}

if (this->disconnected_callback) {
this->disconnected_callback();
} else {
EVLOG_error << "Disconnected callback not registered!";
}
});
}

void WebsocketLibwebsockets::reconnect(long delay) {
Expand Down Expand Up @@ -1275,7 +1314,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason,
}

data->update_state(EConnectionState::ERROR);
on_conn_fail();
on_conn_fail(data);

return 0;
}
Expand All @@ -1297,7 +1336,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason,

case LWS_CALLBACK_CLIENT_ESTABLISHED:
data->update_state(EConnectionState::CONNECTED);
on_conn_connected();
on_conn_connected(data);

// Attempt first write after connection
lws_callback_on_writable(wsi);
Expand All @@ -1313,7 +1352,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason,
EVLOG_info << "Websocket peer initiated close with reason: [" << close_reason << "] close code: [" << close_code
<< "]. Reconnecting";
data->update_state(EConnectionState::ERROR);
on_conn_fail();
on_conn_fail(data);

// Return 0 to print peer close reason
return 0;
Expand All @@ -1325,12 +1364,12 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason,
if (data->is_interupted()) {
EVLOG_info << "Client closed, was requested internally, finalizing connection, not reconnecting";
data->update_state(EConnectionState::FINALIZED);
on_conn_close();
on_conn_close(data);
} else {
EVLOG_info << "Client closed, was not requested internally, attempting reconnection";
// It means the server went away, attempt to reconnect
data->update_state(EConnectionState::ERROR);
on_conn_fail();
on_conn_fail(data);
}

break;
Expand Down Expand Up @@ -1391,7 +1430,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason,
return 0;
}

void WebsocketLibwebsockets::on_conn_connected() {
void WebsocketLibwebsockets::on_conn_connected(ConnectionData* conn_data) {
// Called on the websocket client thread
EVLOG_info << "OCPP client successfully connected to server with version: " << this->connected_ocpp_version;

Expand All @@ -1417,7 +1456,7 @@ void WebsocketLibwebsockets::on_conn_connected() {
});
}

void WebsocketLibwebsockets::on_conn_close() {
void WebsocketLibwebsockets::on_conn_close(ConnectionData* conn_data) {
// Called on the websocket client thread
EVLOG_info << "OCPP client closed connection to server";

Expand Down Expand Up @@ -1447,6 +1486,35 @@ void WebsocketLibwebsockets::on_conn_close() {
EVLOG_error << "Disconnected callback not registered!";
}
});

// We have polled the close
conn_data->mark_close_executed();
}

void WebsocketLibwebsockets::on_conn_fail(ConnectionData* conn_data) {
// Called on the websocket client thread
EVLOG_error << "OCPP client connection to server failed";

if (this->m_is_connected) {
this->push_deferred_callback([this]() {
if (this->disconnected_callback) {
this->disconnected_callback();
} else {
EVLOG_error << "Disconnected callback not registered!";
}
});
}

this->m_is_connected = false;

// Clear any irrelevant data after a DC
clear_all_queues();

// Notify any message senders that are waiting, since we can't send messages any more
message_queue.notify_waiting_thread();

// TODO: See if this is required for a faster fail
// lws_set_timeout(conn_data->get_conn(), (enum pending_timeout)1, LWS_TO_KILL_ASYNC);
}

void WebsocketLibwebsockets::on_conn_message(std::string&& message) {
Expand Down Expand Up @@ -1538,32 +1606,6 @@ void WebsocketLibwebsockets::on_conn_writable() {
}
}

void WebsocketLibwebsockets::on_conn_fail() {
// Called on the websocket client thread
EVLOG_error << "OCPP client connection to server failed";

if (this->m_is_connected) {
this->push_deferred_callback([this]() {
if (this->disconnected_callback) {
this->disconnected_callback();
} else {
EVLOG_error << "Disconnected callback not registered!";
}
});
}

this->m_is_connected = false;

// Clear any irrelevant data after a DC
clear_all_queues();

// Notify any message senders that are waiting, since we can't send messages any more
message_queue.notify_waiting_thread();

// TODO: See if this is required for a faster fail
// lws_set_timeout(conn_data->get_conn(), (enum pending_timeout)1, LWS_TO_KILL_ASYNC);
}

void WebsocketLibwebsockets::push_deferred_callback(const std::function<void()>& callback) {
if (!callback) {
EVLOG_error << "Attempting to push stale callback in deferred queue!";
Expand Down

0 comments on commit dd24ab1

Please sign in to comment.