diff --git a/third-party/realdds/include/realdds/topics/dds-topic-names.h b/third-party/realdds/include/realdds/topics/dds-topic-names.h index 97a1ebf079..e99376dd6c 100644 --- a/third-party/realdds/include/realdds/topics/dds-topic-names.h +++ b/third-party/realdds/include/realdds/topics/dds-topic-names.h @@ -17,6 +17,9 @@ constexpr char const * ROOT = "realsense/"; constexpr size_t ROOT_LEN = 10; // NOTE: actual streams will be ROS-compatible, meaning rt/ROOT constexpr char const * ROS2_ROOT = "rt/"; +constexpr char const * ROS2_SERVICE_REQUEST_ROOT = "rq/"; +constexpr char const * ROS2_SERVICE_RESPONSE_ROOT = "rr/"; + constexpr size_t ROS2_ROOT_LEN = 3; constexpr char const * DEVICE_INFO_TOPIC_NAME = "realsense/device-info"; @@ -33,7 +36,7 @@ constexpr char const * LIST_PARAMETERS_NAME = "/list_parameters"; constexpr char const * DESCRIBE_PARAMETERS_NAME = "/describe_parameters"; constexpr char const * REQUEST_SUFFIX = "Request"; -constexpr char const * RESPONSE_SUFFIX = "Response"; +constexpr char const * RESPONSE_SUFFIX = "Reply"; namespace notification { diff --git a/tools/dds/dds-adapter/lrs-device-controller.cpp b/tools/dds/dds-adapter/lrs-device-controller.cpp index 6a957b02e8..0e3c3d8d93 100644 --- a/tools/dds/dds-adapter/lrs-device-controller.cpp +++ b/tools/dds/dds-adapter/lrs-device-controller.cpp @@ -880,7 +880,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< auto publisher = _dds_device_server->publisher(); if( auto topic = topics::ros2::get_parameters_request_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::GET_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) + topics::ROS2_SERVICE_REQUEST_ROOT + topic_root + topics::GET_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) { _get_params_reader = std::make_shared< dds_topic_reader >( topic, subscriber ); _get_params_reader->on_data_available( [&]() { on_get_params_request(); } ); @@ -890,10 +890,10 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::get_parameters_response_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::GET_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) + topics::ROS2_SERVICE_RESPONSE_ROOT + topic_root + topics::GET_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) { _get_params_writer = std::make_shared< dds_topic_writer >( topic, publisher ); - dds_topic_writer::qos wqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); + dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 _get_params_writer->override_qos_from_json( wqos, participant->settings().nested( "device", "get-parameters-response" ) ); @@ -901,7 +901,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::set_parameters_request_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::SET_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) + topics::ROS2_SERVICE_REQUEST_ROOT + topic_root + topics::SET_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) { _set_params_reader = std::make_shared< dds_topic_reader >( topic, subscriber ); _set_params_reader->on_data_available( [&]() { on_set_params_request(); } ); @@ -911,10 +911,10 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::set_parameters_response_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::SET_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) + topics::ROS2_SERVICE_RESPONSE_ROOT + topic_root + topics::SET_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) { _set_params_writer = std::make_shared< dds_topic_writer >( topic, publisher ); - dds_topic_writer::qos wqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); + dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 _set_params_writer->override_qos_from_json( wqos, participant->settings().nested( "device", "set-parameters-response" ) ); @@ -922,7 +922,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::list_parameters_request_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::GET_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) + topics::ROS2_SERVICE_REQUEST_ROOT + topic_root + topics::LIST_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) { _list_params_reader = std::make_shared< dds_topic_reader >( topic, subscriber ); _list_params_reader->on_data_available( [&]() { on_list_params_request(); } ); @@ -932,10 +932,10 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::list_parameters_response_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::LIST_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) + topics::ROS2_SERVICE_RESPONSE_ROOT + topic_root + topics::LIST_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) { _list_params_writer = std::make_shared< dds_topic_writer >( topic, publisher ); - dds_topic_writer::qos wqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); + dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 _list_params_writer->override_qos_from_json( wqos, participant->settings().nested( "device", "list-parameters-response" ) ); @@ -943,7 +943,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::describe_parameters_request_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::DESCRIBE_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) + topics::ROS2_SERVICE_REQUEST_ROOT + topic_root + topics::DESCRIBE_PARAMETERS_NAME + topics::REQUEST_SUFFIX ) ) { _describe_params_reader = std::make_shared< dds_topic_reader >( topic, subscriber ); _describe_params_reader->on_data_available( [&]() { on_describe_params_request(); } ); @@ -953,10 +953,10 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr< } if( auto topic = topics::ros2::describe_parameters_response_msg::create_topic( participant, - topics::ROS2_ROOT + topic_root + topics::DESCRIBE_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) + topics::ROS2_SERVICE_RESPONSE_ROOT + topic_root + topics::DESCRIBE_PARAMETERS_NAME + topics::RESPONSE_SUFFIX ) ) { _describe_params_writer = std::make_shared< dds_topic_writer >( topic, publisher ); - dds_topic_writer::qos wqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); + dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 _describe_params_writer->override_qos_from_json( wqos, participant->settings().nested( "device", "describe-parameters-response" ) ); @@ -1782,93 +1782,91 @@ void lrs_device_controller::on_get_params_request() { topics::ros2::get_parameters_request_msg control; dds_sample sample; - while( control.take_next( *_get_params_reader, &control, &sample ) ) + while (control.take_next(*_get_params_reader, &control, &sample)) { - if( ! control.is_valid() ) + if (!control.is_valid()) continue; - _control_dispatcher.invoke( - [control, sample, this]( dispatcher::cancellable_timer ) + auto sample_j = json::array({ + rsutils::string::from(realdds::print_raw_guid(sample.sample_identity.writer_guid())), + sample.sample_identity.sequence_number().to64long(), + }); + + LOG_DEBUG("[" << _dds_device_server->debug_name() << "] <----- get_parameters"); + topics::ros2::get_parameters_response_msg response; + for (auto &name : control.names()) + { + topics::ros2::get_parameters_response_msg::value_type value; // NOT_SET + try { - auto sample_j = json::array( { - rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ), - sample.sample_identity.sequence_number().to64long(), - } ); - LOG_DEBUG( "[" << _dds_device_server->debug_name() << "] <----- get_parameters" ); - topics::ros2::get_parameters_response_msg response; - for( auto & name : control.names() ) + auto sep = name.find('/'); + if (sep == std::string::npos) + throw std::runtime_error("invalid option name"); + auto const stream_name = name.substr(0, sep); + auto const option_name = name.substr(sep + 1); + if (option_name == "profile") { - topics::ros2::get_parameters_response_msg::value_type value; // NOT_SET - try + // return the current profile as a JSON array string + auto stream_it = _stream_name_to_server.find(stream_name); + if (stream_it == _stream_name_to_server.end()) + throw std::runtime_error("invalid stream name"); + auto profile = _bridge.get_profile(stream_it->second); + value.string_value(profile->to_json()); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL); + } + else + { + auto option = _dds_device_server->find_option(option_name, stream_name); + if (!option) + throw std::runtime_error("option not found"); + if (option->is_valid()) // Otherwise leave the value as unset { - auto sep = name.find( '/' ); - if( sep == std::string::npos ) - throw std::runtime_error( "invalid option name" ); - auto const stream_name = name.substr( 0, sep ); - auto const option_name = name.substr( sep + 1 ); - if( option_name == "profile" ) - { - // return the current profile as a JSON array string - auto stream_it = _stream_name_to_server.find( stream_name ); - if( stream_it == _stream_name_to_server.end() ) - throw std::runtime_error( "invalid stream name" ); - auto profile = _bridge.get_profile( stream_it->second ); - value.string_value( profile->to_json() ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL ); - } - else + auto &j = option->get_value(); + switch (j.type()) { - auto option = _dds_device_server->find_option( option_name, stream_name ); - if( ! option ) - throw std::runtime_error( "option not found" ); - if( option->is_valid() ) // Otherwise leave the value as unset - { - auto & j = option->get_value(); - switch( j.type() ) - { - case json::value_t::string: - value.string_value( j ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING ); - break; - case json::value_t::number_float: - value.double_value( j ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_DOUBLE ); - break; - case json::value_t::number_integer: - case json::value_t::number_unsigned: - value.integer_value( j ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_INTEGER ); - break; - case json::value_t::boolean: - value.bool_value( j ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL ); - break; - default: - // Everything else, we'll communicate but as a JSON string... - value.string_value( j ); - value.type( rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING ); - break; - } - } + case json::value_t::string: + value.string_value(j); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING); + break; + case json::value_t::number_float: + value.double_value(j); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_DOUBLE); + break; + case json::value_t::number_integer: + case json::value_t::number_unsigned: + value.integer_value(j); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_INTEGER); + break; + case json::value_t::boolean: + value.bool_value(j); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL); + break; + default: + // Everything else, we'll communicate but as a JSON string... + value.string_value(j); + value.type(rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING); + break; } - response.add( value ); - } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "][" << name << "] " << e.what() ); - response.add( value ); } } - // Now send the response back - try - { - response.write_to( *_get_params_writer ); - } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what() ); - } - } ); + response.add(value); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "][" << name << "] " << e.what()); + response.add(value); + } + } + // Now send the response back + try + { + response.write_to(*_get_params_writer); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what()); + } + } } @@ -1877,103 +1875,97 @@ void lrs_device_controller::on_set_params_request() { topics::ros2::set_parameters_request_msg control; dds_sample sample; - while( control.take_next( *_set_params_reader, &control, &sample ) ) + while (control.take_next(*_set_params_reader, &control, &sample)) { - if( ! control.is_valid() ) + if (!control.is_valid()) continue; - _control_dispatcher.invoke( - [control, sample, this]( dispatcher::cancellable_timer ) + auto sample_j = json::array({ + rsutils::string::from(realdds::print_raw_guid(sample.sample_identity.writer_guid())), + sample.sample_identity.sequence_number().to64long(), + }); + LOG_DEBUG("[" << _dds_device_server->debug_name() << "] <----- set_parameters"); + topics::ros2::set_parameters_response_msg response; + for (auto ¶meter : control.parameters()) + { + auto &name = parameter.name(); + auto &value = parameter.value(); + topics::ros2::set_parameters_response_msg::result_type result; // failed; no reason + try { - auto sample_j = json::array( { - rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ), - sample.sample_identity.sequence_number().to64long(), - } ); - LOG_DEBUG( "[" << _dds_device_server->debug_name() << "] <----- set_parameters" ); - topics::ros2::set_parameters_response_msg response; - for( auto & parameter : control.parameters() ) + auto sep = name.find('/'); + if (sep == std::string::npos) + throw std::runtime_error("invalid option name"); + auto const stream_name = name.substr(0, sep); + auto const option_name = name.substr(sep + 1); + if (option_name == "profile" && value.type() == rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING) { - auto & name = parameter.name(); - auto & value = parameter.value(); - topics::ros2::set_parameters_response_msg::result_type result; // failed; no reason - try + // return the current profile as a JSON array string + auto stream_it = _stream_name_to_server.find(stream_name); + if (stream_it == _stream_name_to_server.end()) + throw std::runtime_error("invalid stream name"); + auto server = stream_it->second; + + auto requested_profile = create_dds_stream_profile(server->type_string(), + json::parse(value.string_value())); + auto profile = find_profile(server, requested_profile); + if (!profile) + throw std::runtime_error("invalid profile " + requested_profile->to_string()); + + _bridge.open(profile); + } + else + { + auto option = _dds_device_server->find_option(option_name, stream_name); + if (!option) + throw std::runtime_error("option not found"); + json jvalue; + switch (value.type()) { - auto sep = name.find( '/' ); - if( sep == std::string::npos ) - throw std::runtime_error( "invalid option name" ); - auto const stream_name = name.substr( 0, sep ); - auto const option_name = name.substr( sep + 1 ); - if( option_name == "profile" - && value.type() == rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING ) + case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING: + try { - // return the current profile as a JSON array string - auto stream_it = _stream_name_to_server.find( stream_name ); - if( stream_it == _stream_name_to_server.end() ) - throw std::runtime_error( "invalid stream name" ); - auto server = stream_it->second; - - auto requested_profile = create_dds_stream_profile( server->type_string(), - json::parse( value.string_value() ) ); - auto profile = find_profile( server, requested_profile ); - if( ! profile ) - throw std::runtime_error( "invalid profile " + requested_profile->to_string() ); - - _bridge.open( profile ); + jvalue = json::parse(value.string_value()); } - else + catch (...) { - auto option = _dds_device_server->find_option( option_name, stream_name ); - if( ! option ) - throw std::runtime_error( "option not found" ); - json jvalue; - switch( value.type() ) - { - case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_STRING: - try - { - jvalue = json::parse( value.string_value() ); - } - catch( ... ) - { - jvalue = value.string_value(); - } - break; - case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL: - jvalue = value.bool_value(); - break; - case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_INTEGER: - jvalue = value.integer_value(); - break; - case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_DOUBLE: - jvalue = value.double_value(); - break; - default: - // Everything else, we don't yet support - throw std::runtime_error( "unsupported value type " - + std::to_string( int( value.type() ) ) ); - } - option->set_value( jvalue ); + jvalue = value.string_value(); } - result.successful( true ); - response.add( result ); + break; + case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_BOOL: + jvalue = value.bool_value(); + break; + case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_INTEGER: + jvalue = value.integer_value(); + break; + case rcl_interfaces::msg::ParameterType_Constants::PARAMETER_DOUBLE: + jvalue = value.double_value(); + break; + default: + // Everything else, we don't yet support + throw std::runtime_error("unsupported value type " + std::to_string(int(value.type()))); } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "][" << name << "] " << e.what() ); - result.reason( e.what() ); - response.add( result ); - } - } - // Now send the response back - try - { - response.write_to( *_set_params_writer ); + option->set_value(jvalue); } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what() ); - } - } ); + result.successful(true); + response.add(result); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "][" << name << "] " << e.what()); + result.reason(e.what()); + response.add(result); + } + } + // Now send the response back + try + { + response.write_to(*_set_params_writer); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what()); + } } } @@ -1982,20 +1974,17 @@ void lrs_device_controller::on_list_params_request() { topics::ros2::list_parameters_request_msg control; dds_sample sample; - while( control.take_next( *_list_params_reader, &control, &sample ) ) + while (control.take_next(*_list_params_reader, &control, &sample)) { - if( ! control.is_valid() ) + if (!control.is_valid()) continue; - _control_dispatcher.invoke( - [control, sample, this]( dispatcher::cancellable_timer ) - { - auto sample_j = json::array( { - rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ), - sample.sample_identity.sequence_number().to64long(), - } ); - LOG_DEBUG( "[" << _dds_device_server->debug_name() << "] <----- list_parameters" ); - topics::ros2::list_parameters_response_msg response; + auto sample_j = json::array({ + rsutils::string::from(realdds::print_raw_guid(sample.sample_identity.writer_guid())), + sample.sample_identity.sequence_number().to64long(), + }); + LOG_DEBUG("[" << _dds_device_server->debug_name() << "] <----- list_parameters"); + topics::ros2::list_parameters_response_msg response; #if 0 for( auto & parameter : control.prefixes() ) { @@ -2070,16 +2059,15 @@ void lrs_device_controller::on_list_params_request() } } #endif - // Now send the response back - try - { - response.write_to( *_list_params_writer ); - } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what() ); - } - } ); + // Now send the response back + try + { + response.write_to(*_list_params_writer); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what()); + } } } @@ -2088,20 +2076,17 @@ void lrs_device_controller::on_describe_params_request() { topics::ros2::describe_parameters_request_msg control; dds_sample sample; - while( control.take_next( *_describe_params_reader, &control, &sample ) ) + while (control.take_next(*_describe_params_reader, &control, &sample)) { - if( ! control.is_valid() ) + if (!control.is_valid()) continue; - _control_dispatcher.invoke( - [control, sample, this]( dispatcher::cancellable_timer ) - { - auto sample_j = json::array( { - rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ), - sample.sample_identity.sequence_number().to64long(), - } ); - LOG_DEBUG( "[" << _dds_device_server->debug_name() << "] <----- describe_parameters" ); - topics::ros2::describe_parameters_response_msg response; + auto sample_j = json::array({ + rsutils::string::from(realdds::print_raw_guid(sample.sample_identity.writer_guid())), + sample.sample_identity.sequence_number().to64long(), + }); + LOG_DEBUG("[" << _dds_device_server->debug_name() << "] <----- describe_parameters"); + topics::ros2::describe_parameters_response_msg response; #if 0 for( auto & parameter : control.prefixes() ) { @@ -2176,16 +2161,15 @@ void lrs_device_controller::on_describe_params_request() } } #endif - // Now send the response back - try - { - response.write_to( *_describe_params_writer ); - } - catch( std::exception const & e ) - { - LOG_ERROR( "[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what() ); - } - } ); + // Now send the response back + try + { + response.write_to(*_describe_params_writer); + } + catch (std::exception const &e) + { + LOG_ERROR("[" << _dds_device_server->debug_name() << "] failed to send response: " << e.what()); + } } }