Skip to content

Commit

Permalink
Fix C++ and Python pubsub tests
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
hidmic committed Nov 8, 2021
1 parent f4853a5 commit c20e2b5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
30 changes: 22 additions & 8 deletions drake_ros_core/test/test_pub_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,39 @@ TEST(Integration, sub_to_pub) {
// Create subscription listening to publisher system
std::vector<std::unique_ptr<test_msgs::msg::BasicTypes>> rx_msgs;
auto rx_callback = [&](std::unique_ptr<test_msgs::msg::BasicTypes> msg) {
rx_msgs.push_back(std::move(msg));
// Cope with lack of synchronization between subscriber
// and publisher systems by ignoring duplicate messages.
if (rx_msgs.empty() || rx_msgs.back()->uint64_value != msg->uint64_value) {
rx_msgs.push_back(std::move(msg));
}
};
auto subscription = node->create_subscription<test_msgs::msg::BasicTypes>(
"out", qos, rx_callback);

// Send messages into the drake system
for (size_t p = 0; p < num_msgs; ++p) {
publisher->publish(std::make_unique<test_msgs::msg::BasicTypes>());
}

size_t num_msgs_sent = 0;
const int timeout_sec = 5;
const int spins_per_sec = 10;
const float time_delta = 1.0f / spins_per_sec;
for (int i = 0; i < timeout_sec * spins_per_sec && rx_msgs.size() < num_msgs;
++i) {
for (int i = 0; i < timeout_sec * spins_per_sec; ++i) {
if (rx_msgs.size() >= num_msgs) {
break;
}
// Cope with lack of synchronization between subscriber
// and publisher systems by sending one message at a time.
if (rx_msgs.size() == num_msgs_sent) {
// Send message into the drake system
auto message = std::make_unique<test_msgs::msg::BasicTypes>();
message->uint64_value = num_msgs_sent++;
publisher->publish(std::move(message));
}
rclcpp::spin_some(node);
simulator->AdvanceTo(simulator_context.get_time() + time_delta);
}

// Make sure same number of messages got out
ASSERT_EQ(num_msgs, rx_msgs.size());
// Make sure all messages got out and in the right order
for (size_t p = 0; p < num_msgs; ++p) {
EXPECT_EQ(rx_msgs[p]->uint64_value, p);
}
}
25 changes: 19 additions & 6 deletions drake_ros_core/test/test_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,36 @@ def test_nominal_case():
publisher = node.create_publisher(BasicTypes, 'in_py', qos)

# Create subscription listening to publisher system
num_msgs = 5
rx_msgs = []
rx_callback = (lambda msg: rx_msgs.append(msg))
node.create_subscription(BasicTypes, 'out_py', rx_callback, qos)

# Send messages into the drake system
num_msgs = 5
for _ in range(num_msgs):
publisher.publish(BasicTypes())
def rx_callback(msg):
# Cope with lack of synchronization between subscriber
# and publisher systems by ignoring duplicate messages.
if not rx_msgs or rx_msgs[-1].uint64_value != msg.uint64_value:
rx_msgs.append(msg)
node.create_subscription(BasicTypes, 'out_py', rx_callback, qos)

num_msgs_sent = 0
timeout_sec = 5
spins_per_sec = 10
time_delta = 1.0 / spins_per_sec
for _ in range(timeout_sec * spins_per_sec):
if len(rx_msgs) >= num_msgs:
break
# Cope with lack of synchronization between subscriber
# and publisher systems by sending one message at a time.
if len(rx_msgs) == num_msgs_sent:
# Send messages into the drake system
message = BasicTypes()
message.uint64_value = num_msgs_sent
publisher.publish(message)
num_msgs_sent = num_msgs_sent + 1
rclpy.spin_once(node, timeout_sec=time_delta)
simulator.AdvanceTo(simulator_context.get_time() + time_delta)

# Make sure same number of messages got out
assert num_msgs == len(rx_msgs)
# Make sure all messages got out and in the right order
for i in range(num_msgs):
assert rx_msgs[i].uint64_value == i

0 comments on commit c20e2b5

Please sign in to comment.