diff --git a/drake_ros_core/test/test_pub_sub.cc b/drake_ros_core/test/test_pub_sub.cc index bd3d7634..aab19212 100644 --- a/drake_ros_core/test/test_pub_sub.cc +++ b/drake_ros_core/test/test_pub_sub.cc @@ -73,25 +73,39 @@ TEST(Integration, sub_to_pub) { // Create subscription listening to publisher system std::vector> rx_msgs; auto rx_callback = [&](std::unique_ptr msg) { - rx_msgs.push_back(std::move(msg)); + // Cope with lack of synchronization between subscriber + // and publisher systems by ignoring duplicate messages. + if (rx_msgs.empty() || rx_msgs.back()->uint64_value != msg->uint64_value) { + rx_msgs.push_back(std::move(msg)); + } }; auto subscription = node->create_subscription( "out", qos, rx_callback); - // Send messages into the drake system - for (size_t p = 0; p < num_msgs; ++p) { - publisher->publish(std::make_unique()); - } - + size_t num_msgs_sent = 0; const int timeout_sec = 5; const int spins_per_sec = 10; const float time_delta = 1.0f / spins_per_sec; - for (int i = 0; i < timeout_sec * spins_per_sec && rx_msgs.size() < num_msgs; - ++i) { + for (int i = 0; i < timeout_sec * spins_per_sec; ++i) { + if (rx_msgs.size() >= num_msgs) { + break; + } + // Cope with lack of synchronization between subscriber + // and publisher systems by sending one message at a time. + if (rx_msgs.size() == num_msgs_sent) { + // Send message into the drake system + auto message = std::make_unique(); + message->uint64_value = num_msgs_sent++; + publisher->publish(std::move(message)); + } rclcpp::spin_some(node); simulator->AdvanceTo(simulator_context.get_time() + time_delta); } // Make sure same number of messages got out ASSERT_EQ(num_msgs, rx_msgs.size()); + // Make sure all messages got out and in the right order + for (size_t p = 0; p < num_msgs; ++p) { + EXPECT_EQ(rx_msgs[p]->uint64_value, p); + } } diff --git a/drake_ros_core/test/test_pub_sub.py b/drake_ros_core/test/test_pub_sub.py index 0f0cca33..55097fec 100644 --- a/drake_ros_core/test/test_pub_sub.py +++ b/drake_ros_core/test/test_pub_sub.py @@ -60,23 +60,36 @@ def test_nominal_case(): publisher = node.create_publisher(BasicTypes, 'in_py', qos) # Create subscription listening to publisher system + num_msgs = 5 rx_msgs = [] - rx_callback = (lambda msg: rx_msgs.append(msg)) - node.create_subscription(BasicTypes, 'out_py', rx_callback, qos) - # Send messages into the drake system - num_msgs = 5 - for _ in range(num_msgs): - publisher.publish(BasicTypes()) + def rx_callback(msg): + # Cope with lack of synchronization between subscriber + # and publisher systems by ignoring duplicate messages. + if not rx_msgs or rx_msgs[-1].uint64_value != msg.uint64_value: + rx_msgs.append(msg) + node.create_subscription(BasicTypes, 'out_py', rx_callback, qos) + num_msgs_sent = 0 timeout_sec = 5 spins_per_sec = 10 time_delta = 1.0 / spins_per_sec for _ in range(timeout_sec * spins_per_sec): if len(rx_msgs) >= num_msgs: break + # Cope with lack of synchronization between subscriber + # and publisher systems by sending one message at a time. + if len(rx_msgs) == num_msgs_sent: + # Send messages into the drake system + message = BasicTypes() + message.uint64_value = num_msgs_sent + publisher.publish(message) + num_msgs_sent = num_msgs_sent + 1 rclpy.spin_once(node, timeout_sec=time_delta) simulator.AdvanceTo(simulator_context.get_time() + time_delta) # Make sure same number of messages got out assert num_msgs == len(rx_msgs) + # Make sure all messages got out and in the right order + for i in range(num_msgs): + assert rx_msgs[i].uint64_value == i