From f916d92265be21664b97500e34c1d2eadcabf7af Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Thu, 12 Dec 2024 12:48:36 +0100 Subject: [PATCH] add test for advanced pubsub --- docs/api.rst | 80 ++++++------ examples/z_advanced_sub.c | 1 - src/advanced_publisher.rs | 3 + src/advanced_subscriber.rs | 6 +- tests/CMakeLists.txt | 7 ++ tests/z_int_advanced_pub_sub_test.c | 187 ++++++++++++++++++++++++++++ tests/z_int_helpers.h | 9 ++ 7 files changed, 254 insertions(+), 39 deletions(-) create mode 100644 tests/z_int_advanced_pub_sub_test.c diff --git a/docs/api.rst b/docs/api.rst index 14b35a5b9..dacf0f30b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -270,6 +270,50 @@ Functions .. doxygenfunction:: z_timestamp_id .. doxygenfunction:: z_timestamp_ntp64_time + +Payload +------- +Types +^^^^^ +.. doxygenstruct:: z_owned_bytes_t +.. doxygenstruct:: z_loaned_bytes_t +.. doxygenstruct:: z_bytes_reader_t +.. doxygenstruct:: z_owned_bytes_writer_t +.. doxygenstruct:: z_loaned_bytes_writer_t + +Functions +^^^^^^^^^ +.. doxygenfunction:: z_bytes_len +.. doxygenfunction:: z_bytes_copy_from_slice +.. doxygenfunction:: z_bytes_from_slice +.. doxygenfunction:: z_bytes_copy_from_buf +.. doxygenfunction:: z_bytes_from_buf +.. doxygenfunction:: z_bytes_from_static_buf +.. doxygenfunction:: z_bytes_copy_from_string +.. doxygenfunction:: z_bytes_from_string +.. doxygenfunction:: z_bytes_copy_from_str +.. doxygenfunction:: z_bytes_from_str +.. doxygenfunction:: z_bytes_from_static_str +.. doxygenfunction:: z_bytes_to_slice +.. doxygenfunction:: z_bytes_to_string + +.. doxygenfunction:: z_bytes_empty +.. doxygenfunction:: z_bytes_clone +.. doxygenfunction:: z_bytes_loan +.. doxygenfunction:: z_bytes_loan_mut +.. doxygenfunction:: z_bytes_drop + +.. doxygenfunction:: z_bytes_get_reader +.. doxygenfunction:: z_bytes_reader_read +.. doxygenfunction:: z_bytes_reader_seek +.. doxygenfunction:: z_bytes_reader_tell +.. doxygenfunction:: z_bytes_reader_remaining + +.. doxygenfunction:: z_bytes_writer_empty +.. doxygenfunction:: z_bytes_writer_finish +.. doxygenfunction:: z_bytes_writer_write_all +.. doxygenfunction:: z_bytes_writer_append + System ====== @@ -799,31 +843,12 @@ Serialization / Deserialization ------------------------------- Types ^^^^^ -.. doxygenstruct:: z_owned_bytes_t -.. doxygenstruct:: z_loaned_bytes_t -.. doxygenstruct:: z_bytes_reader_t -.. doxygenstruct:: z_owned_bytes_writer_t -.. doxygenstruct:: z_loaned_bytes_writer_t .. doxygenstruct:: ze_owned_serializer_t .. doxygenstruct:: ze_loaned_serializer_t .. doxygenstruct:: ze_deserializer_t Functions ^^^^^^^^^ -.. doxygenfunction:: z_bytes_len -.. doxygenfunction:: z_bytes_copy_from_slice -.. doxygenfunction:: z_bytes_from_slice -.. doxygenfunction:: z_bytes_copy_from_buf -.. doxygenfunction:: z_bytes_from_buf -.. doxygenfunction:: z_bytes_from_static_buf -.. doxygenfunction:: z_bytes_copy_from_string -.. doxygenfunction:: z_bytes_from_string -.. doxygenfunction:: z_bytes_copy_from_str -.. doxygenfunction:: z_bytes_from_str -.. doxygenfunction:: z_bytes_from_static_str -.. doxygenfunction:: z_bytes_to_slice -.. doxygenfunction:: z_bytes_to_string - .. doxygenfunction:: ze_serialize_slice .. doxygenfunction:: ze_serialize_buf .. doxygenfunction:: ze_serialize_string @@ -855,23 +880,6 @@ Functions .. doxygenfunction:: ze_deserialize_double .. doxygenfunction:: ze_deserialize_bool -.. doxygenfunction:: z_bytes_empty -.. doxygenfunction:: z_bytes_clone -.. doxygenfunction:: z_bytes_loan -.. doxygenfunction:: z_bytes_loan_mut -.. doxygenfunction:: z_bytes_drop - -.. doxygenfunction:: z_bytes_get_reader -.. doxygenfunction:: z_bytes_reader_read -.. doxygenfunction:: z_bytes_reader_seek -.. doxygenfunction:: z_bytes_reader_tell -.. doxygenfunction:: z_bytes_reader_remaining - -.. doxygenfunction:: z_bytes_writer_empty -.. doxygenfunction:: z_bytes_writer_finish -.. doxygenfunction:: z_bytes_writer_write_all -.. doxygenfunction:: z_bytes_writer_append - .. doxygenfunction:: ze_serializer_empty .. doxygenfunction:: ze_serializer_finish .. doxygenfunction:: ze_serializer_serialize_slice diff --git a/examples/z_advanced_sub.c b/examples/z_advanced_sub.c index 1a3c40198..00d2c1bac 100644 --- a/examples/z_advanced_sub.c +++ b/examples/z_advanced_sub.c @@ -84,7 +84,6 @@ int main(int argc, char** argv) { printf("Unable to declare advanced subscriber.\n"); exit(-1); } - ze_owned_closure_miss_t miss_callback; z_closure(&miss_callback, miss_handler, NULL, NULL); ze_advanced_subscriber_declare_background_sample_miss_listener(z_loan(sub), z_move(miss_callback)); diff --git a/src/advanced_publisher.rs b/src/advanced_publisher.rs index 5d33abd0d..7309cc76d 100644 --- a/src/advanced_publisher.rs +++ b/src/advanced_publisher.rs @@ -152,6 +152,9 @@ pub extern "C" fn ze_declare_advanced_publisher( if options.publisher_detection { p = p.publisher_detection(); } + if options.sample_miss_detection { + p = p.sample_miss_detection(); + } if let Some(pub_detection_metadata) = &options.publisher_detection_metadata { p = p.publisher_detection_metadata(pub_detection_metadata.as_rust_type_ref()); } diff --git a/src/advanced_subscriber.rs b/src/advanced_subscriber.rs index 8ae603bc6..6312e229b 100644 --- a/src/advanced_subscriber.rs +++ b/src/advanced_subscriber.rs @@ -62,7 +62,7 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig { h = h.max_samples(val.max_samples) } if val.max_age_ms > 0 { - h = h.max_age(val.max_age_ms as f64 * 1000.0f64) + h = h.max_age(val.max_age_ms as f64 / 1000.0f64) } h } @@ -154,7 +154,9 @@ fn _declare_advanced_subscriber_inner( ); let mut sub = sub.advanced(); if let Some(options) = options { - sub = sub.query_timeout(Duration::from_millis(options.query_timeout_ms)); + if options.query_timeout_ms > 0 { + sub = sub.query_timeout(Duration::from_millis(options.query_timeout_ms)); + } if options.subscriber_detection { sub = sub.subscriber_detection() } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 834f35483..5fbe08f08 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -15,6 +15,13 @@ foreach(file ${files}) endif() endif() + # Exclude zenoh-ext tests if unstable api feature is disabled + if(NOT(ZENOHC_BUILD_WITH_UNSTABLE_API)) + if(${target} MATCHES "^.*_advanced_pub_sub.*$") + continue() + endif() + endif() + if (BUILD_SHARED_LIBS AND (${target} MATCHES "^.*_build_static.*$")) continue() endif() diff --git a/tests/z_int_advanced_pub_sub_test.c b/tests/z_int_advanced_pub_sub_test.c new file mode 100644 index 000000000..4b2787396 --- /dev/null +++ b/tests/z_int_advanced_pub_sub_test.c @@ -0,0 +1,187 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "z_int_helpers.h" + +#ifdef VALID_PLATFORM + +#include + +#include "zenoh.h" + +const char *const SEM_NAME_PUB = "/z_int_test_sync_sem_pub"; +sem_t *sem_pub; +const char *const SEM_NAME_SUB = "/z_int_test_sync_sem_sub"; +sem_t *sem_sub; + +const char *const keyexpr = "test/key"; +const char *const values[] = {"test_value_1", "test_value_2", "test_value_3", + "test_value_4", "test_value_5", "test_value_6"}; +const size_t values_count = sizeof(values) / sizeof(values[0]); + +int run_publisher() { + z_owned_config_t config; + z_config_default(&config); + if (zc_config_insert_json5(z_loan_mut(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) { + perror("Unable to configure timestamps!"); + return -1; + } + + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL), 0) { + perror("Unable to open session!"); + return -1; + } + + printf("Declaring AdvancedPublisher on '%s'...\n", keyexpr); + ze_owned_advanced_publisher_t pub; + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, keyexpr); + + ze_advanced_publisher_options_t pub_opts; + ze_advanced_publisher_options_default(&pub_opts); + ze_advanced_publisher_cache_options_t cache_options; + ze_advanced_publisher_cache_options_default(&cache_options); + cache_options.max_samples = values_count; + pub_opts.cache = &cache_options; + pub_opts.publisher_detection = true; + pub_opts.sample_miss_detection = true; + + if (ze_declare_advanced_publisher(z_loan(s), &pub, z_loan(ke), &pub_opts) < 0) { + printf("Unable to declare AdvancedPublisher for key expression!\n"); + exit(-1); + } + + // values for cache + for (int i = 0; i < values_count / 2; ++i) { + z_owned_bytes_t payload; + z_bytes_from_static_str(&payload, values[i]); + ze_advanced_publisher_put(z_loan(pub), z_move(payload), NULL); + } + + SEM_POST(sem_pub); + printf("wait: sem_sub\n"); + SEM_WAIT(sem_sub); + + // values for subscribe + for (int i = 0; i < values_count / 2; ++i) { + z_owned_bytes_t payload; + z_bytes_from_static_str(&payload, values[values_count / 2 + i]); + ze_advanced_publisher_put(z_loan(pub), z_move(payload), NULL); + } + + printf("wait: sem_sub\n"); + SEM_WAIT(sem_sub); + + z_drop(z_move(pub)); + z_drop(z_move(s)); + + return 0; +} + +void data_handler(z_loaned_sample_t *sample, void *arg) { + static int val_num = 0; + z_view_string_t keystr; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); + if (strncmp(keyexpr, z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)))) { + perror("Unexpected key received"); + exit(-1); + } + z_owned_string_t payload_str; + z_bytes_to_string(z_sample_payload(sample), &payload_str); + + const z_loaned_source_info_t *info = z_sample_source_info(sample); + z_owned_string_t id_string; + z_entity_global_id_t global_id = z_source_info_id(info); + z_id_t z_id = z_entity_global_id_zid(&global_id); + z_id_to_string(&z_id, &id_string); + + printf("Received: '%.*s'; id=%.*s; sn=%u\n", (int)z_string_len(z_loan(payload_str)), + z_string_data(z_loan(payload_str)), (int)z_string_len(z_loan(id_string)), z_string_data(z_loan(id_string)), + z_source_info_sn(info)); + ASSERT_STR_STRING_EQUAL(values[val_num], z_loan(payload_str)); + z_drop(z_move(payload_str)); + z_drop(z_move(id_string)); + + printf("data_handler: %i\n", val_num); + if (++val_num == values_count) { + SEM_POST(sem_sub); + exit(0); + }; +} + +int run_subscriber() { + printf("wait: sem_pub\n"); + SEM_WAIT(sem_pub); + + z_owned_config_t config; + z_config_default(&config); + + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + perror("Unable to open session!"); + return -1; + } + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, keyexpr); + + ze_advanced_subscriber_options_t sub_opts; + ze_advanced_subscriber_options_default(&sub_opts); + + ze_advanced_subscriber_history_options_t sub_history_options; + ze_advanced_subscriber_history_options_default(&sub_history_options); + sub_history_options.detect_late_publishers = true; + + ze_advanced_subscriber_recovery_options_t sub_recovery_options; + ze_advanced_subscriber_recovery_options_default(&sub_recovery_options); + sub_recovery_options.periodic_queries_period_ms = 1000; + sub_opts.history = &sub_history_options; + sub_opts.recovery = &sub_recovery_options; + sub_opts.subscriber_detection = true; + + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler, NULL, NULL); + printf("Declaring AdvancedSubscriber on '%s'...\n", keyexpr); + ze_owned_advanced_subscriber_t sub; + if (ze_declare_advanced_subscriber(z_loan(s), &sub, z_loan(ke), z_move(callback), &sub_opts) < 0) { + printf("Unable to declare advanced subscriber.\n"); + exit(-1); + } + + SEM_POST(sem_sub); + z_sleep_s(10); + + z_drop(z_move(sub)); + z_drop(z_move(s)); + + return -1; +} + +int main() { + SEM_INIT(sem_pub, SEM_NAME_PUB); + SEM_INIT(sem_sub, SEM_NAME_SUB); + + func_ptr_t funcs[] = {run_publisher, run_subscriber}; + assert(run_timeouted_test(funcs, 2, 10) == 0); + + SEM_DROP(sem_pub, SEM_NAME_PUB); + SEM_DROP(sem_sub, SEM_NAME_SUB); + + return 0; +} + +#else +int main() { return 0; } +#endif // VALID_PLATFORM diff --git a/tests/z_int_helpers.h b/tests/z_int_helpers.h index 51b117c3a..d01bd40db 100644 --- a/tests/z_int_helpers.h +++ b/tests/z_int_helpers.h @@ -163,3 +163,12 @@ int run_timeouted_test(func_ptr_t functions[], int num_functions, int timeout_se exit(-1); \ } \ } while (0) + +#define ASSERT_STR_STRING_EQUAL(str, string) \ + do { \ + if (strlen(str) != z_string_len(string) || \ + strncmp(str, (const char *)z_string_data(string), (int)z_string_len(string))) { \ + fprintf(stderr, "Check failed: '%s' != '%.*s'\n", str, (int)z_string_len(string), z_string_data(string)); \ + exit(-1); \ + } \ + } while (0)