Skip to content

Commit

Permalink
Fix behavior on connection death
Browse files Browse the repository at this point in the history
  • Loading branch information
abhaybd committed Aug 12, 2023
1 parent 03e0786 commit 1251fab
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/network/MissionControlProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using websocket::msghandler_t;
using websocket::validator_t;

const std::chrono::milliseconds TELEM_REPORT_PERIOD = 100ms;
const std::chrono::milliseconds HEARTBEAT_TIMEOUT_PERIOD = 1000ms;

// TODO: possibly use frozen::string for this so we don't have to use raw char ptrs
// request keys
Expand Down Expand Up @@ -264,6 +265,14 @@ void MissionControlProtocol::sendCameraStreamReport(
this->_server.sendJSON(Constants::MC_PROTOCOL_NAME, msg);
}

void MissionControlProtocol::handleHeartbeatTimeout() {
this->stopAndShutdownPowerRepeat();
robot::emergencyStop();
log(LOG_ERROR, "Heartbeat timed out! Emergency stopping.\n");
Globals::E_STOP = true;
Globals::armIKEnabled = false;
}

void MissionControlProtocol::handleConnection() {
// Turn off inverse kinematics on connection
Globals::armIKEnabled = false;
Expand Down Expand Up @@ -376,6 +385,8 @@ MissionControlProtocol::MissionControlProtocol(SingleClientWSServer& server)
this->addConnectionHandler(std::bind(&MissionControlProtocol::handleConnection, this));
this->addDisconnectionHandler(
std::bind(&MissionControlProtocol::stopAndShutdownPowerRepeat, this));

this->setPongTimeoutHandler(HEARTBEAT_TIMEOUT_PERIOD, std::bind(&MissionControlProtocol::handleHeartbeatTimeout, this));

this->_streaming_running = true;
this->_streaming_thread = std::thread(&MissionControlProtocol::videoStreamTask, this);
Expand Down
1 change: 1 addition & 0 deletions src/network/MissionControlProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MissionControlProtocol : public WebSocketProtocol { // TODO: add documenta
void sendJointPositionReport(const std::string& jointName, int32_t position);
void sendRoverPos();
void handleConnection();
void handleHeartbeatTimeout();
void startPowerRepeat();
void stopAndShutdownPowerRepeat();
void setRequestedJointPower(jointid_t joint, double power);
Expand Down
4 changes: 4 additions & 0 deletions src/network/websocket/WebSocketProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ void WebSocketProtocol::addDisconnectionHandler(const connhandler_t& handler) {
disconnectionHandlers.push_back(handler);
}

void WebSocketProtocol::setPongTimeoutHandler(std::chrono::milliseconds timeout, const pongtimeouthandler_t& handler) {
pongInfo = {timeout, handler};
}

void WebSocketProtocol::clientConnected() {
for (const auto& f : connectionHandlers) {
f();
Expand Down
23 changes: 15 additions & 8 deletions src/network/websocket/WebSocketProtocol.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once

#include <chrono>
#include <functional>
#include <map>
#include <optional>
#include <string>

#include <nlohmann/json.hpp>
Expand All @@ -14,6 +16,7 @@ using nlohmann::json;
typedef std::function<void(const json&)> msghandler_t;
typedef std::function<bool(const json&)> validator_t;
typedef std::function<void()> connhandler_t;
typedef std::function<void()> pongtimeouthandler_t;

/**
* @brief Defines a protocol which will be served at an endpoint of a server.
Expand Down Expand Up @@ -85,14 +88,7 @@ class WebSocketProtocol {

void addDisconnectionHandler(const connhandler_t& handler);

/**
* @brief Process the given JSON object that was sent to this protocol's endpoint.
* Generally, this shouldn't be used by client code.
*
* @param obj The JSON object to be processed by this protocol. It is expected to have a
* "type" key.
*/
void processMessage(const json& obj) const;
void setPongTimeoutHandler(std::chrono::milliseconds timeout, const pongtimeouthandler_t& handler);

/**
* @brief Invoke all connection handlers for this protocol.
Expand All @@ -114,11 +110,22 @@ class WebSocketProtocol {
std::string getProtocolPath() const;

private:
friend class SingleClientWSServer;
std::string protocolPath;
std::map<std::string, msghandler_t> handlerMap;
std::map<std::string, validator_t> validatorMap;
std::vector<connhandler_t> connectionHandlers;
std::vector<connhandler_t> disconnectionHandlers;
std::optional<std::pair<std::chrono::milliseconds, pongtimeouthandler_t>> pongInfo;

/**
* @brief Process the given JSON object that was sent to this protocol's endpoint.
* Generally, this shouldn't be used by client code.
*
* @param obj The JSON object to be processed by this protocol. It is expected to have a
* "type" key.
*/
void processMessage(const json& obj) const;
};

} // namespace websocket
Expand Down
28 changes: 28 additions & 0 deletions src/network/websocket/WebSocketServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ SingleClientWSServer::SingleClientWSServer(const std::string& serverName, uint16
server.set_validate_handler([&](connection_hdl hdl) { return this->validate(hdl); });
server.set_message_handler(
[&](connection_hdl hdl, message_t msg) { this->onMessage(hdl, msg); });
server.set_pong_timeout_handler(
[&](connection_hdl hdl, std::string payload) { this->onPongTimeout(hdl, payload); });
}

SingleClientWSServer::~SingleClientWSServer() {
Expand Down Expand Up @@ -80,6 +82,16 @@ bool SingleClientWSServer::addProtocol(std::unique_ptr<WebSocketProtocol> protoc
std::string path = protocol->getProtocolPath();
if (protocolMap.find(path) == protocolMap.end()) {
protocolMap.emplace(path, std::move(protocol));
const auto& pongInfo = protocolMap.at(path).protocol->pongInfo;
if (pongInfo.has_value()) {
auto eventID = pingScheduler.scheduleEvent(pongInfo->first / 2, [this, path]() {
const auto& pd = this->protocolMap.at(path);
if (pd.client.has_value()) {
server.ping(pd.client.value(), path);
}
});
protocolMap.at(path).pingEventID = eventID;
}
return true;
} else {
return false;
Expand Down Expand Up @@ -148,6 +160,10 @@ void SingleClientWSServer::onClose(connection_hdl hdl) {

auto& protocolData = protocolMap.at(path);
protocolData.client.reset();
if (protocolData.pingEventID.has_value()) {
pingScheduler.removeEvent(protocolData.pingEventID.value());
protocolData.pingEventID.reset();
}
protocolData.protocol->clientDisconnected();
}

Expand All @@ -162,5 +178,17 @@ void SingleClientWSServer::onMessage(connection_hdl hdl, message_t message) {
json obj = json::parse(jsonStr);
protocolMap.at(path).protocol->processMessage(obj);
}

void SingleClientWSServer::onPongTimeout(connection_hdl hdl, const std::string& payload) {
auto conn = server.get_con_from_hdl(hdl);

assert(protocolMap.find(payload) != protocolMap.end());

log(LOG_ERROR, "Pong timeout on %s\n", payload.c_str());
auto& pongInfo = protocolMap.at(payload).protocol->pongInfo;
if (pongInfo.has_value()) {
pongInfo->second();
}
}
} // namespace websocket
} // namespace net
4 changes: 4 additions & 0 deletions src/network/websocket/WebSocketServer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "WebSocketProtocol.h"
#include "../../utils/scheduler.h"

#include <functional>
#include <map>
Expand Down Expand Up @@ -98,6 +99,7 @@ class SingleClientWSServer {
ProtocolData(std::unique_ptr<WebSocketProtocol> protocol);
std::unique_ptr<WebSocketProtocol> protocol;
std::optional<connection_hdl> client;
std::optional<util::PeriodicScheduler<>::eventid_t> pingEventID;
};

std::string serverName;
Expand All @@ -106,11 +108,13 @@ class SingleClientWSServer {
bool isRunning;
std::map<std::string, ProtocolData> protocolMap;
std::thread serverThread;
util::PeriodicScheduler<> pingScheduler;

bool validate(connection_hdl hdl);
void onOpen(connection_hdl hdl);
void onClose(connection_hdl hdl);
void onMessage(connection_hdl hdl, message_t message);
void onPongTimeout(connection_hdl hdl, const std::string& payload);
void serverTask();
};
} // namespace websocket
Expand Down

0 comments on commit 1251fab

Please sign in to comment.