Skip to content

Commit

Permalink
feat(hesai): switch to new UDP socket implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Max SCHMELLER <[email protected]>
  • Loading branch information
mojomex committed Nov 21, 2024
1 parent 84e5493 commit de0ded4
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// boost/property_tree/ in some versions of boost.
// See: https://github.com/boostorg/property_tree/issues/51
#include "nebula_common/nebula_status.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"

#include <boost/version.hpp>

Expand All @@ -30,7 +31,6 @@
#endif
#include "boost_tcp_driver/http_client_driver.hpp"
#include "boost_tcp_driver/tcp_driver.hpp"
#include "boost_udp_driver/udp_driver.hpp"
#include "nebula_common/hesai/hesai_common.hpp"
#include "nebula_common/hesai/hesai_status.hpp"
#include "nebula_common/util/expected.hpp"
Expand Down Expand Up @@ -136,12 +136,11 @@ class HesaiHwInterface

using ptc_cmd_result_t = nebula::util::expected<std::vector<uint8_t>, ptc_error_t>;

std::unique_ptr<::drivers::common::IoContext> cloud_io_context_;
connections::UdpSocket udp_socket_{};
std::shared_ptr<boost::asio::io_context> m_owned_ctx;
std::unique_ptr<::drivers::udp_driver::UdpDriver> cloud_udp_driver_;
std::shared_ptr<::drivers::tcp_driver::TcpDriver> tcp_driver_;
std::shared_ptr<const HesaiSensorConfiguration> sensor_configuration_;
std::function<void(std::vector<uint8_t> & buffer)>
std::function<void(const std::vector<uint8_t> & buffer)>
cloud_packet_callback_; /**This function pointer is called when the scan is complete*/

std::mutex mtx_inflight_tcp_request_;
Expand Down Expand Up @@ -219,7 +218,7 @@ class HesaiHwInterface

/// @brief Callback function to receive the Cloud Packet data from the UDP Driver
/// @param buffer Buffer containing the data received from the UDP socket
void ReceiveSensorPacketCallback(std::vector<uint8_t> & buffer);
void ReceiveSensorPacketCallback(const std::vector<uint8_t> & buffer);
/// @brief Starting the interface that handles UDP streams
/// @return Resulting status
Status SensorInterfaceStart();
Expand All @@ -242,7 +241,7 @@ class HesaiHwInterface
/// @brief Registering callback for PandarScan
/// @param scan_callback Callback function
/// @return Resulting status
Status RegisterScanCallback(std::function<void(std::vector<uint8_t> &)> scan_callback);
Status RegisterScanCallback(std::function<void(const std::vector<uint8_t> &)> scan_callback);
/// @brief Getting data with PTC_COMMAND_GET_LIDAR_CALIBRATION
/// @return Resulting status
std::string GetLidarCalibrationString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "nebula_common/hesai/hesai_status.hpp"
#include "nebula_common/nebula_common.hpp"
#include "nebula_common/nebula_status.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp"
#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp"

#include <nlohmann/json.hpp>
Expand All @@ -14,6 +15,7 @@
#include <boost/asio/socket_base.hpp>

#include <cassert>
#include <cstddef>
#include <sstream>
#include <stdexcept>
#include <string>
Expand All @@ -34,9 +36,7 @@ using std::string_literals::operator""s;
using nlohmann::json;

HesaiHwInterface::HesaiHwInterface()
: cloud_io_context_{new ::drivers::common::IoContext(1)},
m_owned_ctx{new boost::asio::io_context(1)},
cloud_udp_driver_{new ::drivers::udp_driver::UdpDriver(*cloud_io_context_)},
: m_owned_ctx{new boost::asio::io_context(1)},
tcp_driver_{new ::drivers::tcp_driver::TcpDriver(m_owned_ctx)}
{
}
Expand Down Expand Up @@ -159,63 +159,42 @@ Status HesaiHwInterface::SetSensorConfiguration(

Status HesaiHwInterface::SensorInterfaceStart()
{
try {
PrintInfo("Starting UDP receiver");
if (sensor_configuration_->multicast_ip.empty()) {
cloud_udp_driver_->init_receiver(
sensor_configuration_->host_ip, sensor_configuration_->data_port);
} else {
cloud_udp_driver_->init_receiver(
sensor_configuration_->multicast_ip, sensor_configuration_->data_port,
sensor_configuration_->host_ip, sensor_configuration_->data_port);
cloud_udp_driver_->receiver()->setMulticast(true);
}
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("init ok");
#endif
cloud_udp_driver_->receiver()->open();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("open ok");
#endif

bool success = cloud_udp_driver_->receiver()->setKernelBufferSize(UDP_SOCKET_BUFFER_SIZE);
if (!success) {
PrintError(
"Could not set receive buffer size. Try increasing net.core.rmem_max to " +
std::to_string(UDP_SOCKET_BUFFER_SIZE) + " B.");
return Status::ERROR_1;
}
udp_socket_.init(sensor_configuration_->host_ip, sensor_configuration_->data_port);
if (!sensor_configuration_->multicast_ip.empty()) {
udp_socket_.join_multicast_group(sensor_configuration_->multicast_ip);
}

cloud_udp_driver_->receiver()->bind();
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("bind ok");
#endif
udp_socket_.set_mtu(MTU_SIZE);

cloud_udp_driver_->receiver()->asyncReceive(
std::bind(&HesaiHwInterface::ReceiveSensorPacketCallback, this, std::placeholders::_1));
#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE
PrintError("async receive set");
#endif
} catch (const std::exception & ex) {
Status status = Status::UDP_CONNECTION_ERROR;
std::cerr << status << " for " << sensor_configuration_->sensor_ip << ":"
<< sensor_configuration_->data_port << " - " << ex.what() << std::endl;
return status;
try {
udp_socket_.set_socket_buffer_size(UDP_SOCKET_BUFFER_SIZE);
} catch (const connections::SocketError & e) {
throw std::runtime_error(
"Could not set socket receive buffer size to " + std::to_string(UDP_SOCKET_BUFFER_SIZE) +
". Try increasing net.core.rmem_max.");
}

udp_socket_.bind().subscribe([&](
const std::vector<uint8_t> & packet,
const connections::UdpSocket::ReceiveMetadata & /* metadata */) {
ReceiveSensorPacketCallback(packet);
});

return Status::OK;
}

Status HesaiHwInterface::RegisterScanCallback(
std::function<void(std::vector<uint8_t> &)> scan_callback)
std::function<void(const std::vector<uint8_t> &)> scan_callback)
{
cloud_packet_callback_ = std::move(scan_callback);
return Status::OK;
}

void HesaiHwInterface::ReceiveSensorPacketCallback(std::vector<uint8_t> & buffer)
void HesaiHwInterface::ReceiveSensorPacketCallback(const std::vector<uint8_t> & buffer)
{
cloud_packet_callback_(buffer);
}

Status HesaiHwInterface::SensorInterfaceStop()
{
return Status::ERROR_1;
Expand Down
2 changes: 1 addition & 1 deletion nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class HesaiRosWrapper final : public rclcpp::Node
Status stream_start();

private:
void receive_cloud_packet_callback(std::vector<uint8_t> & packet);
void receive_cloud_packet_callback(const std::vector<uint8_t> & packet);

void receive_scan_message_callback(std::unique_ptr<pandar_msgs::msg::PandarScan> scan_msg);

Expand Down
5 changes: 3 additions & 2 deletions nebula_ros/src/hesai/hesai_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <nebula_common/nebula_common.hpp>
#include <nebula_decoders/nebula_decoders_common/angles.hpp>

#include <algorithm>
#include <cstdint>
#include <filesystem>
#include <memory>
Expand Down Expand Up @@ -413,7 +414,7 @@ rcl_interfaces::msg::SetParametersResult HesaiRosWrapper::on_parameter_change(
return rcl_interfaces::build<SetParametersResult>().successful(true).reason("");
}

void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packet)
void HesaiRosWrapper::receive_cloud_packet_callback(const std::vector<uint8_t> & packet)
{
if (!decoder_wrapper_ || decoder_wrapper_->status() != Status::OK) {
return;
Expand All @@ -426,7 +427,7 @@ void HesaiRosWrapper::receive_cloud_packet_callback(std::vector<uint8_t> & packe
auto msg_ptr = std::make_unique<nebula_msgs::msg::NebulaPacket>();
msg_ptr->stamp.sec = static_cast<int>(timestamp_ns / 1'000'000'000);
msg_ptr->stamp.nanosec = static_cast<int>(timestamp_ns % 1'000'000'000);
msg_ptr->data.swap(packet);
msg_ptr->data = packet;

if (!packet_queue_.try_push(std::move(msg_ptr))) {
RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");
Expand Down

0 comments on commit de0ded4

Please sign in to comment.