From de0ded4b4890d5aeae3d26d0ce1a6cce8057d742 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 19 Nov 2024 15:14:49 +0900 Subject: [PATCH] feat(hesai): switch to new UDP socket implementation Signed-off-by: Max SCHMELLER --- .../hesai_hw_interface.hpp | 11 ++- .../hesai_hw_interface.cpp | 69 +++++++------------ .../nebula_ros/hesai/hesai_ros_wrapper.hpp | 2 +- nebula_ros/src/hesai/hesai_ros_wrapper.cpp | 5 +- 4 files changed, 33 insertions(+), 54 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp index d37a33e0..d577fa03 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp @@ -18,6 +18,7 @@ // boost/property_tree/ in some versions of boost. // See: https://github.com/boostorg/property_tree/issues/51 #include "nebula_common/nebula_status.hpp" +#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp" #include @@ -30,7 +31,6 @@ #endif #include "boost_tcp_driver/http_client_driver.hpp" #include "boost_tcp_driver/tcp_driver.hpp" -#include "boost_udp_driver/udp_driver.hpp" #include "nebula_common/hesai/hesai_common.hpp" #include "nebula_common/hesai/hesai_status.hpp" #include "nebula_common/util/expected.hpp" @@ -136,12 +136,11 @@ class HesaiHwInterface using ptc_cmd_result_t = nebula::util::expected, ptc_error_t>; - std::unique_ptr<::drivers::common::IoContext> cloud_io_context_; + connections::UdpSocket udp_socket_{}; std::shared_ptr m_owned_ctx; - std::unique_ptr<::drivers::udp_driver::UdpDriver> cloud_udp_driver_; std::shared_ptr<::drivers::tcp_driver::TcpDriver> tcp_driver_; std::shared_ptr sensor_configuration_; - std::function & buffer)> + std::function & buffer)> cloud_packet_callback_; /**This function pointer is called when the scan is complete*/ std::mutex mtx_inflight_tcp_request_; @@ -219,7 +218,7 @@ class HesaiHwInterface /// @brief Callback function to receive the Cloud Packet data from the UDP Driver /// @param buffer Buffer containing the data received from the UDP socket - void ReceiveSensorPacketCallback(std::vector & buffer); + void ReceiveSensorPacketCallback(const std::vector & buffer); /// @brief Starting the interface that handles UDP streams /// @return Resulting status Status SensorInterfaceStart(); @@ -242,7 +241,7 @@ class HesaiHwInterface /// @brief Registering callback for PandarScan /// @param scan_callback Callback function /// @return Resulting status - Status RegisterScanCallback(std::function &)> scan_callback); + Status RegisterScanCallback(std::function &)> scan_callback); /// @brief Getting data with PTC_COMMAND_GET_LIDAR_CALIBRATION /// @return Resulting status std::string GetLidarCalibrationString(); diff --git a/nebula_hw_interfaces/src/nebula_hesai_hw_interfaces/hesai_hw_interface.cpp b/nebula_hw_interfaces/src/nebula_hesai_hw_interfaces/hesai_hw_interface.cpp index 0a08dd93..112fbf57 100644 --- a/nebula_hw_interfaces/src/nebula_hesai_hw_interfaces/hesai_hw_interface.cpp +++ b/nebula_hw_interfaces/src/nebula_hesai_hw_interfaces/hesai_hw_interface.cpp @@ -6,6 +6,7 @@ #include "nebula_common/hesai/hesai_status.hpp" #include "nebula_common/nebula_common.hpp" #include "nebula_common/nebula_status.hpp" +#include "nebula_hw_interfaces/nebula_hw_interfaces_common/connections/udp.hpp" #include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp" #include @@ -14,6 +15,7 @@ #include #include +#include #include #include #include @@ -34,9 +36,7 @@ using std::string_literals::operator""s; using nlohmann::json; HesaiHwInterface::HesaiHwInterface() -: cloud_io_context_{new ::drivers::common::IoContext(1)}, - m_owned_ctx{new boost::asio::io_context(1)}, - cloud_udp_driver_{new ::drivers::udp_driver::UdpDriver(*cloud_io_context_)}, +: m_owned_ctx{new boost::asio::io_context(1)}, tcp_driver_{new ::drivers::tcp_driver::TcpDriver(m_owned_ctx)} { } @@ -159,63 +159,42 @@ Status HesaiHwInterface::SetSensorConfiguration( Status HesaiHwInterface::SensorInterfaceStart() { - try { - PrintInfo("Starting UDP receiver"); - if (sensor_configuration_->multicast_ip.empty()) { - cloud_udp_driver_->init_receiver( - sensor_configuration_->host_ip, sensor_configuration_->data_port); - } else { - cloud_udp_driver_->init_receiver( - sensor_configuration_->multicast_ip, sensor_configuration_->data_port, - sensor_configuration_->host_ip, sensor_configuration_->data_port); - cloud_udp_driver_->receiver()->setMulticast(true); - } -#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE - PrintError("init ok"); -#endif - cloud_udp_driver_->receiver()->open(); -#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE - PrintError("open ok"); -#endif - - bool success = cloud_udp_driver_->receiver()->setKernelBufferSize(UDP_SOCKET_BUFFER_SIZE); - if (!success) { - PrintError( - "Could not set receive buffer size. Try increasing net.core.rmem_max to " + - std::to_string(UDP_SOCKET_BUFFER_SIZE) + " B."); - return Status::ERROR_1; - } + udp_socket_.init(sensor_configuration_->host_ip, sensor_configuration_->data_port); + if (!sensor_configuration_->multicast_ip.empty()) { + udp_socket_.join_multicast_group(sensor_configuration_->multicast_ip); + } - cloud_udp_driver_->receiver()->bind(); -#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE - PrintError("bind ok"); -#endif + udp_socket_.set_mtu(MTU_SIZE); - cloud_udp_driver_->receiver()->asyncReceive( - std::bind(&HesaiHwInterface::ReceiveSensorPacketCallback, this, std::placeholders::_1)); -#ifdef WITH_DEBUG_STDOUT_HESAI_HW_INTERFACE - PrintError("async receive set"); -#endif - } catch (const std::exception & ex) { - Status status = Status::UDP_CONNECTION_ERROR; - std::cerr << status << " for " << sensor_configuration_->sensor_ip << ":" - << sensor_configuration_->data_port << " - " << ex.what() << std::endl; - return status; + try { + udp_socket_.set_socket_buffer_size(UDP_SOCKET_BUFFER_SIZE); + } catch (const connections::SocketError & e) { + throw std::runtime_error( + "Could not set socket receive buffer size to " + std::to_string(UDP_SOCKET_BUFFER_SIZE) + + ". Try increasing net.core.rmem_max."); } + + udp_socket_.bind().subscribe([&]( + const std::vector & packet, + const connections::UdpSocket::ReceiveMetadata & /* metadata */) { + ReceiveSensorPacketCallback(packet); + }); + return Status::OK; } Status HesaiHwInterface::RegisterScanCallback( - std::function &)> scan_callback) + std::function &)> scan_callback) { cloud_packet_callback_ = std::move(scan_callback); return Status::OK; } -void HesaiHwInterface::ReceiveSensorPacketCallback(std::vector & buffer) +void HesaiHwInterface::ReceiveSensorPacketCallback(const std::vector & buffer) { cloud_packet_callback_(buffer); } + Status HesaiHwInterface::SensorInterfaceStop() { return Status::ERROR_1; diff --git a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp index f8d221c3..d08d0ba9 100644 --- a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp @@ -62,7 +62,7 @@ class HesaiRosWrapper final : public rclcpp::Node Status stream_start(); private: - void receive_cloud_packet_callback(std::vector & packet); + void receive_cloud_packet_callback(const std::vector & packet); void receive_scan_message_callback(std::unique_ptr scan_msg); diff --git a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp index 73869d1f..c10bcdf3 100644 --- a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp +++ b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -413,7 +414,7 @@ rcl_interfaces::msg::SetParametersResult HesaiRosWrapper::on_parameter_change( return rcl_interfaces::build().successful(true).reason(""); } -void HesaiRosWrapper::receive_cloud_packet_callback(std::vector & packet) +void HesaiRosWrapper::receive_cloud_packet_callback(const std::vector & packet) { if (!decoder_wrapper_ || decoder_wrapper_->status() != Status::OK) { return; @@ -426,7 +427,7 @@ void HesaiRosWrapper::receive_cloud_packet_callback(std::vector & packe auto msg_ptr = std::make_unique(); msg_ptr->stamp.sec = static_cast(timestamp_ns / 1'000'000'000); msg_ptr->stamp.nanosec = static_cast(timestamp_ns % 1'000'000'000); - msg_ptr->data.swap(packet); + msg_ptr->data = packet; if (!packet_queue_.try_push(std::move(msg_ptr))) { RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped");