Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP - do not merge) Refactor hard-to-read logic #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/mocap_pose/mocap_pose.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstdint>
#include <sys/types.h>
#include <rclcpp/rclcpp.hpp>
#include <RTProtocol.h>
//#include <std_msgs/msg/string.hpp>

const std::string kGpsSensorTopic = "/fmu/in/SensorGps";
Expand All @@ -21,6 +22,7 @@ class MocapPose : public rclcpp::Node

rclcpp::Time QualisysToRosTimestamp(unsigned long long ts);
void WorkerThread();
bool runWork(CRTProtocol* rtProtocol, bool* dataAvailable, bool* streamFrames, bool* very_first_message);
struct Impl;
std::unique_ptr<Impl> impl_;
int64_t minTimestampDiff;
Expand Down
291 changes: 157 additions & 134 deletions src/mocap_pose.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,153 +253,176 @@ void MocapPose::WorkerThread()
bool very_first_message = true;
while (impl_->worker_thread_running)
{
auto now = rclcpp::Clock().now();
if (now > latest_succesfull_receive + rclcpp::Duration(10,0))
{
RCLCPP_WARN(get_logger(),
"Have not succesfully received for more than 10 secs - restarting receiver");
if (!runWork(&rtProtocol, &dataAvailable, &streamFrames, &very_first_message)) { // return false if meant to stop
break;
}
}
rtProtocol.StopCapture();
rtProtocol.Disconnect();
}
}

if (!rtProtocol.Connected())
{
if (!rtProtocol.Connect(impl_->serverAddress.c_str(),
impl_->basePort,
&udpPort,
impl_->sdkMajorVersion,
impl_->sdkMinorVersion,
impl_->bigEndian))
{
RCLCPP_WARN(get_logger(),
"Trying to connect to %s:%d : %s",
impl_->serverAddress.c_str(),
// return false if meant to stop working
bool MocapPose::runWork(CRTProtocol* rtProtocol, bool* dataAvailable, bool* streamFrames, bool* very_first_message)
{
auto now = rclcpp::Clock().now();
if (now > latest_succesfull_receive + rclcpp::Duration(10,0))
{
RCLCPP_WARN(get_logger(),
"Have not succesfully received for more than 10 secs - restarting receiver");
return false;
}

if (!rtProtocol->Connected())
{
if (!rtProtocol->Connect(impl_->serverAddress.c_str(),
impl_->basePort,
rtProtocol.GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
else
{
RCLCPP_INFO(
get_logger(), "Succesfully connected to %s:%d", impl_->serverAddress.c_str(), impl_->basePort);
}
}
nullptr,
impl_->sdkMajorVersion,
impl_->sdkMinorVersion,
impl_->bigEndian))
{
RCLCPP_WARN(get_logger(),
"Trying to connect to %s:%d : %s",
impl_->serverAddress.c_str(),
impl_->basePort,
rtProtocol->GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return true;
}
else
{
RCLCPP_INFO(
get_logger(), "Succesfully connected to %s:%d", impl_->serverAddress.c_str(), impl_->basePort);
}
}

if (!dataAvailable)
{
if (!rtProtocol.Read6DOFSettings(dataAvailable))
{
RCLCPP_WARN(get_logger(), "6DoF data is not available: %s", rtProtocol.GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
else
{
RCLCPP_INFO(get_logger(), "6DoF data is available (server configured well)");
}
}
if (!dataAvailable)
{
if (!rtProtocol->Read6DOFSettings(*dataAvailable))
{
RCLCPP_WARN(get_logger(), "6DoF data is not available: %s", rtProtocol->GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return true;
}
else
{
RCLCPP_INFO(get_logger(), "6DoF data is available (server configured well)");
}
}

if (!streamFrames)
{
if (!rtProtocol.StreamFrames(
CRTProtocol::RateAllFrames, 0, udpPort, nullptr, CRTProtocol::cComponent6d))
{
RCLCPP_WARN(get_logger(), "Cannot start data streaming: %s", rtProtocol.GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
else
{
RCLCPP_INFO(get_logger(), "Data streaming succesfully started");
}
streamFrames = true;
}
if (!streamFrames)
{
// use TCP only. previously we used UDP (which Mocap negotiates on top of the
// TCP-based control connection), but those UDP packets look from NAT gateway perspective
// like unsolicited packets to they aren't able to traverse NAT gateways without
// special port forwards, especially multiple ones (if we're on mesh networks). and
// also having multiple drones behind one NAT gateway would be a really hard problem.
if (!rtProtocol->StreamFrames(
CRTProtocol::RateAllFrames, 0, 0, nullptr, CRTProtocol::cComponent6d))
{
RCLCPP_WARN(get_logger(), "Cannot start data streaming: %s", rtProtocol->GetErrorString());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return true;
}
else
{
RCLCPP_INFO(get_logger(), "Data streaming succesfully started");
}
*streamFrames = true;
}

CRTPacket::EPacketType packetType;
CRTPacket::EPacketType packetType;

if (rtProtocol.Receive(packetType, true) == CNetwork::ResponseType::success)
{
if (packetType == CRTPacket::PacketData)
{
float fX, fY, fZ;
float rotation[9];

CRTPacket* rtPacket = rtProtocol.GetRTPacket();
bool body_found = false;

for (unsigned int i = 0; i < rtPacket->Get6DOFBodyCount(); i++)
{
if (rtPacket->Get6DOFBody(i, fX, fY, fZ, rotation))
{
auto name = std::string(rtProtocol.Get6DOFBodyName(i));

if (name == impl_->bodyName)
{
body_found = true;

// convert millimeters into meters
const Eigen::Vector3f Pos = Eigen::Vector3f(fX, fY, fZ) / 1000.F;
const Eigen::Map<Eigen::Matrix3f> R(rotation);
const Eigen::Quaternionf Q(R);
if (rtProtocol->Receive(packetType, true) != CNetwork::ResponseType::success)
{
return true;
}

if (packetType != CRTPacket::PacketData)
{
return true;
}

float fX, fY, fZ;
float rotation[9];

CRTPacket* rtPacket = rtProtocol->GetRTPacket();
bool body_found = false;

for (unsigned int i = 0; i < rtPacket->Get6DOFBodyCount(); i++)
{
if (!rtPacket->Get6DOFBody(i, fX, fY, fZ, rotation))
{
continue;
}

auto name = std::string(rtProtocol->Get6DOFBodyName(i));

if (name != impl_->bodyName)
{
continue;
}

body_found = true;

// convert millimeters into meters
const Eigen::Vector3f Pos = Eigen::Vector3f(fX, fY, fZ) / 1000.F;
const Eigen::Map<Eigen::Matrix3f> R(rotation);
const Eigen::Quaternionf Q(R);
#if 0
RCLCPP_INFO(this->get_logger(),
"Position: [%6.2f %6.2f %6.2f]\t Quaternion: [%6.3f %6.3f %6.3f %6.3f]",
Pos[0],
Pos[1],
Pos[2],
Q.w(),
Q.x(),
Q.y(),
Q.z());
RCLCPP_INFO(this->get_logger(),
"Position: [%6.2f %6.2f %6.2f]\t Quaternion: [%6.3f %6.3f %6.3f %6.3f]",
Pos[0],
Pos[1],
Pos[2],
Q.w(),
Q.x(),
Q.y(),
Q.z());
#endif
const auto timestamp = rclcpp::Clock().now();
const auto gps_timestamp = QualisysToRosTimestamp(rtPacket->GetTimeStamp());
const auto gps_msg = impl_->PrepareGpsMessage(Pos, Q, timestamp, timestamp);
const bool time_to_publish = (impl_->last_published_timestamp.seconds() +
impl_->publishing_timestep) <= timestamp.seconds();

const bool translation_is_valid =
!std::isnan(Pos[0]) && !std::isnan(Pos[1]) && !std::isnan(Pos[2]);
const bool rotation_is_valid = !std::isnan(Q.w()) && !std::isnan(Q.x()) &&
!std::isnan(Q.y()) && !std::isnan(Q.z());
const bool data_is_valid = translation_is_valid && rotation_is_valid;

latest_succesfull_receive = rclcpp::Clock().now();

if (data_is_valid)
{
if (very_first_message || time_to_publish)
{
RCLCPP_INFO(this->get_logger(),
"Publish GPS at time %lf, previous_time: %lf , gps ts %lf",
timestamp.seconds(),
impl_->last_published_timestamp.seconds(),
gps_timestamp.seconds());
impl_->publisher->publish(gps_msg);
impl_->last_published_timestamp = timestamp;
very_first_message = false;
}
}
else
{
RCLCPP_WARN(get_logger(), "Data is full of NaNs and thus NOT PUBLISHED!");
}
}
}
}

if (!body_found)
{
RCLCPP_WARN(get_logger(),
"Cannot find body named \"%s\" in Qualisys Data Stream",
impl_->bodyName.c_str());
}
}
const auto timestamp = rclcpp::Clock().now();
const auto gps_timestamp = QualisysToRosTimestamp(rtPacket->GetTimeStamp());
const auto gps_msg = impl_->PrepareGpsMessage(Pos, Q, timestamp, timestamp);
const bool time_to_publish = (impl_->last_published_timestamp.seconds() +
impl_->publishing_timestep) <= timestamp.seconds();

const bool translation_is_valid =
!std::isnan(Pos[0]) && !std::isnan(Pos[1]) && !std::isnan(Pos[2]);
const bool rotation_is_valid = !std::isnan(Q.w()) && !std::isnan(Q.x()) &&
!std::isnan(Q.y()) && !std::isnan(Q.z());
const bool data_is_valid = translation_is_valid && rotation_is_valid;

latest_succesfull_receive = rclcpp::Clock().now();

if (data_is_valid)
{
if (very_first_message || time_to_publish)
{
RCLCPP_INFO(this->get_logger(),
"Publish GPS at time %lf, previous_time: %lf , gps ts %lf",
timestamp.seconds(),
impl_->last_published_timestamp.seconds(),
gps_timestamp.seconds());
impl_->publisher->publish(gps_msg);
impl_->last_published_timestamp = timestamp;
*very_first_message = false;
}
}
rtProtocol.StopCapture();
rtProtocol.Disconnect();
else
{
RCLCPP_WARN(get_logger(), "Data is full of NaNs and thus NOT PUBLISHED!");
}
}

if (!body_found)
{
RCLCPP_WARN(get_logger(),
"Cannot find body named \"%s\" in Qualisys Data Stream",
impl_->bodyName.c_str());
}

return true;
}

MocapPose::~MocapPose()
Expand Down