Skip to content

Commit

Permalink
add test for advanced pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Dec 12, 2024
1 parent 82ec89e commit f916d92
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 39 deletions.
80 changes: 44 additions & 36 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,50 @@ Functions
.. doxygenfunction:: z_timestamp_id
.. doxygenfunction:: z_timestamp_ntp64_time


Payload
-------
Types
^^^^^
.. doxygenstruct:: z_owned_bytes_t
.. doxygenstruct:: z_loaned_bytes_t
.. doxygenstruct:: z_bytes_reader_t
.. doxygenstruct:: z_owned_bytes_writer_t
.. doxygenstruct:: z_loaned_bytes_writer_t

Functions
^^^^^^^^^
.. doxygenfunction:: z_bytes_len
.. doxygenfunction:: z_bytes_copy_from_slice
.. doxygenfunction:: z_bytes_from_slice
.. doxygenfunction:: z_bytes_copy_from_buf
.. doxygenfunction:: z_bytes_from_buf
.. doxygenfunction:: z_bytes_from_static_buf
.. doxygenfunction:: z_bytes_copy_from_string
.. doxygenfunction:: z_bytes_from_string
.. doxygenfunction:: z_bytes_copy_from_str
.. doxygenfunction:: z_bytes_from_str
.. doxygenfunction:: z_bytes_from_static_str
.. doxygenfunction:: z_bytes_to_slice
.. doxygenfunction:: z_bytes_to_string

.. doxygenfunction:: z_bytes_empty
.. doxygenfunction:: z_bytes_clone
.. doxygenfunction:: z_bytes_loan
.. doxygenfunction:: z_bytes_loan_mut
.. doxygenfunction:: z_bytes_drop

.. doxygenfunction:: z_bytes_get_reader
.. doxygenfunction:: z_bytes_reader_read
.. doxygenfunction:: z_bytes_reader_seek
.. doxygenfunction:: z_bytes_reader_tell
.. doxygenfunction:: z_bytes_reader_remaining

.. doxygenfunction:: z_bytes_writer_empty
.. doxygenfunction:: z_bytes_writer_finish
.. doxygenfunction:: z_bytes_writer_write_all
.. doxygenfunction:: z_bytes_writer_append

System
======

Expand Down Expand Up @@ -799,31 +843,12 @@ Serialization / Deserialization
-------------------------------
Types
^^^^^
.. doxygenstruct:: z_owned_bytes_t
.. doxygenstruct:: z_loaned_bytes_t
.. doxygenstruct:: z_bytes_reader_t
.. doxygenstruct:: z_owned_bytes_writer_t
.. doxygenstruct:: z_loaned_bytes_writer_t
.. doxygenstruct:: ze_owned_serializer_t
.. doxygenstruct:: ze_loaned_serializer_t
.. doxygenstruct:: ze_deserializer_t

Functions
^^^^^^^^^
.. doxygenfunction:: z_bytes_len
.. doxygenfunction:: z_bytes_copy_from_slice
.. doxygenfunction:: z_bytes_from_slice
.. doxygenfunction:: z_bytes_copy_from_buf
.. doxygenfunction:: z_bytes_from_buf
.. doxygenfunction:: z_bytes_from_static_buf
.. doxygenfunction:: z_bytes_copy_from_string
.. doxygenfunction:: z_bytes_from_string
.. doxygenfunction:: z_bytes_copy_from_str
.. doxygenfunction:: z_bytes_from_str
.. doxygenfunction:: z_bytes_from_static_str
.. doxygenfunction:: z_bytes_to_slice
.. doxygenfunction:: z_bytes_to_string

.. doxygenfunction:: ze_serialize_slice
.. doxygenfunction:: ze_serialize_buf
.. doxygenfunction:: ze_serialize_string
Expand Down Expand Up @@ -855,23 +880,6 @@ Functions
.. doxygenfunction:: ze_deserialize_double
.. doxygenfunction:: ze_deserialize_bool

.. doxygenfunction:: z_bytes_empty
.. doxygenfunction:: z_bytes_clone
.. doxygenfunction:: z_bytes_loan
.. doxygenfunction:: z_bytes_loan_mut
.. doxygenfunction:: z_bytes_drop

.. doxygenfunction:: z_bytes_get_reader
.. doxygenfunction:: z_bytes_reader_read
.. doxygenfunction:: z_bytes_reader_seek
.. doxygenfunction:: z_bytes_reader_tell
.. doxygenfunction:: z_bytes_reader_remaining

.. doxygenfunction:: z_bytes_writer_empty
.. doxygenfunction:: z_bytes_writer_finish
.. doxygenfunction:: z_bytes_writer_write_all
.. doxygenfunction:: z_bytes_writer_append

.. doxygenfunction:: ze_serializer_empty
.. doxygenfunction:: ze_serializer_finish
.. doxygenfunction:: ze_serializer_serialize_slice
Expand Down
1 change: 0 additions & 1 deletion examples/z_advanced_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ int main(int argc, char** argv) {
printf("Unable to declare advanced subscriber.\n");
exit(-1);
}

ze_owned_closure_miss_t miss_callback;
z_closure(&miss_callback, miss_handler, NULL, NULL);
ze_advanced_subscriber_declare_background_sample_miss_listener(z_loan(sub), z_move(miss_callback));
Expand Down
3 changes: 3 additions & 0 deletions src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ pub extern "C" fn ze_declare_advanced_publisher(
if options.publisher_detection {
p = p.publisher_detection();
}
if options.sample_miss_detection {
p = p.sample_miss_detection();
}
if let Some(pub_detection_metadata) = &options.publisher_detection_metadata {
p = p.publisher_detection_metadata(pub_detection_metadata.as_rust_type_ref());
}
Expand Down
6 changes: 4 additions & 2 deletions src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig {
h = h.max_samples(val.max_samples)
}
if val.max_age_ms > 0 {
h = h.max_age(val.max_age_ms as f64 * 1000.0f64)
h = h.max_age(val.max_age_ms as f64 / 1000.0f64)
}
h
}
Expand Down Expand Up @@ -154,7 +154,9 @@ fn _declare_advanced_subscriber_inner(
);
let mut sub = sub.advanced();
if let Some(options) = options {
sub = sub.query_timeout(Duration::from_millis(options.query_timeout_ms));
if options.query_timeout_ms > 0 {
sub = sub.query_timeout(Duration::from_millis(options.query_timeout_ms));
}
if options.subscriber_detection {
sub = sub.subscriber_detection()
}
Expand Down
7 changes: 7 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ foreach(file ${files})
endif()
endif()

# Exclude zenoh-ext tests if unstable api feature is disabled
if(NOT(ZENOHC_BUILD_WITH_UNSTABLE_API))
if(${target} MATCHES "^.*_advanced_pub_sub.*$")
continue()
endif()
endif()

if (BUILD_SHARED_LIBS AND (${target} MATCHES "^.*_build_static.*$"))
continue()
endif()
Expand Down
187 changes: 187 additions & 0 deletions tests/z_int_advanced_pub_sub_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include "z_int_helpers.h"

#ifdef VALID_PLATFORM

#include <string.h>

#include "zenoh.h"

const char *const SEM_NAME_PUB = "/z_int_test_sync_sem_pub";
sem_t *sem_pub;
const char *const SEM_NAME_SUB = "/z_int_test_sync_sem_sub";
sem_t *sem_sub;

const char *const keyexpr = "test/key";
const char *const values[] = {"test_value_1", "test_value_2", "test_value_3",
"test_value_4", "test_value_5", "test_value_6"};
const size_t values_count = sizeof(values) / sizeof(values[0]);

int run_publisher() {
z_owned_config_t config;
z_config_default(&config);
if (zc_config_insert_json5(z_loan_mut(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) {
perror("Unable to configure timestamps!");
return -1;
}

z_owned_session_t s;
if (z_open(&s, z_move(config), NULL), 0) {
perror("Unable to open session!");
return -1;
}

printf("Declaring AdvancedPublisher on '%s'...\n", keyexpr);
ze_owned_advanced_publisher_t pub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, keyexpr);

ze_advanced_publisher_options_t pub_opts;
ze_advanced_publisher_options_default(&pub_opts);
ze_advanced_publisher_cache_options_t cache_options;
ze_advanced_publisher_cache_options_default(&cache_options);
cache_options.max_samples = values_count;
pub_opts.cache = &cache_options;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection = true;

if (ze_declare_advanced_publisher(z_loan(s), &pub, z_loan(ke), &pub_opts) < 0) {
printf("Unable to declare AdvancedPublisher for key expression!\n");
exit(-1);
}

// values for cache
for (int i = 0; i < values_count / 2; ++i) {
z_owned_bytes_t payload;
z_bytes_from_static_str(&payload, values[i]);
ze_advanced_publisher_put(z_loan(pub), z_move(payload), NULL);
}

SEM_POST(sem_pub);
printf("wait: sem_sub\n");
SEM_WAIT(sem_sub);

// values for subscribe
for (int i = 0; i < values_count / 2; ++i) {
z_owned_bytes_t payload;
z_bytes_from_static_str(&payload, values[values_count / 2 + i]);
ze_advanced_publisher_put(z_loan(pub), z_move(payload), NULL);
}

printf("wait: sem_sub\n");
SEM_WAIT(sem_sub);

z_drop(z_move(pub));
z_drop(z_move(s));

return 0;
}

void data_handler(z_loaned_sample_t *sample, void *arg) {
static int val_num = 0;
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
if (strncmp(keyexpr, z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)))) {
perror("Unexpected key received");
exit(-1);
}
z_owned_string_t payload_str;
z_bytes_to_string(z_sample_payload(sample), &payload_str);

const z_loaned_source_info_t *info = z_sample_source_info(sample);
z_owned_string_t id_string;
z_entity_global_id_t global_id = z_source_info_id(info);
z_id_t z_id = z_entity_global_id_zid(&global_id);
z_id_to_string(&z_id, &id_string);

printf("Received: '%.*s'; id=%.*s; sn=%u\n", (int)z_string_len(z_loan(payload_str)),
z_string_data(z_loan(payload_str)), (int)z_string_len(z_loan(id_string)), z_string_data(z_loan(id_string)),
z_source_info_sn(info));
ASSERT_STR_STRING_EQUAL(values[val_num], z_loan(payload_str));
z_drop(z_move(payload_str));
z_drop(z_move(id_string));

printf("data_handler: %i\n", val_num);
if (++val_num == values_count) {
SEM_POST(sem_sub);
exit(0);
};
}

int run_subscriber() {
printf("wait: sem_pub\n");
SEM_WAIT(sem_pub);

z_owned_config_t config;
z_config_default(&config);

z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
perror("Unable to open session!");
return -1;
}

z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, keyexpr);

ze_advanced_subscriber_options_t sub_opts;
ze_advanced_subscriber_options_default(&sub_opts);

ze_advanced_subscriber_history_options_t sub_history_options;
ze_advanced_subscriber_history_options_default(&sub_history_options);
sub_history_options.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_t sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_recovery_options);
sub_recovery_options.periodic_queries_period_ms = 1000;
sub_opts.history = &sub_history_options;
sub_opts.recovery = &sub_recovery_options;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
z_closure(&callback, data_handler, NULL, NULL);
printf("Declaring AdvancedSubscriber on '%s'...\n", keyexpr);
ze_owned_advanced_subscriber_t sub;
if (ze_declare_advanced_subscriber(z_loan(s), &sub, z_loan(ke), z_move(callback), &sub_opts) < 0) {
printf("Unable to declare advanced subscriber.\n");
exit(-1);
}

SEM_POST(sem_sub);
z_sleep_s(10);

z_drop(z_move(sub));
z_drop(z_move(s));

return -1;
}

int main() {
SEM_INIT(sem_pub, SEM_NAME_PUB);
SEM_INIT(sem_sub, SEM_NAME_SUB);

func_ptr_t funcs[] = {run_publisher, run_subscriber};
assert(run_timeouted_test(funcs, 2, 10) == 0);

SEM_DROP(sem_pub, SEM_NAME_PUB);
SEM_DROP(sem_sub, SEM_NAME_SUB);

return 0;
}

#else
int main() { return 0; }
#endif // VALID_PLATFORM
9 changes: 9 additions & 0 deletions tests/z_int_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,12 @@ int run_timeouted_test(func_ptr_t functions[], int num_functions, int timeout_se
exit(-1); \
} \
} while (0)

#define ASSERT_STR_STRING_EQUAL(str, string) \
do { \
if (strlen(str) != z_string_len(string) || \
strncmp(str, (const char *)z_string_data(string), (int)z_string_len(string))) { \
fprintf(stderr, "Check failed: '%s' != '%.*s'\n", str, (int)z_string_len(string), z_string_data(string)); \
exit(-1); \
} \
} while (0)

0 comments on commit f916d92

Please sign in to comment.