From 1f99ad7906a1add9eb4a11f221709bc7e174192d Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 28 May 2024 11:37:38 +0300 Subject: [PATCH] - shm to session API integration - shm to payload API integration - examples - some improvements --- build-resources/opaque-types/src/lib.rs | 2 + examples/z_get_shm.c | 126 +++++++++++++++++ examples/z_ping_shm.c | 180 ++++++++++++++++++++++++ examples/z_pub_shm.c | 118 ++++++++++++++++ examples/z_pub_shm_thr.c | 92 ++++++++++++ include/zenoh_commons.h | 98 +++++++++++-- include/zenoh_macros.h | 4 + src/lib.rs | 2 +- src/payload.rs | 118 +++++++++++++++- src/session.rs | 46 +++++- src/shm/buffer/zshm.rs | 13 ++ src/shm/buffer/zshmmut.rs | 20 ++- src/shm/client_storage/mod.rs | 20 ++- 13 files changed, 812 insertions(+), 27 deletions(-) create mode 100644 examples/z_get_shm.c create mode 100644 examples/z_ping_shm.c create mode 100644 examples/z_pub_shm.c create mode 100644 examples/z_pub_shm_thr.c diff --git a/build-resources/opaque-types/src/lib.rs b/build-resources/opaque-types/src/lib.rs index 30a3b9c20..8b4235cc0 100644 --- a/build-resources/opaque-types/src/lib.rs +++ b/build-resources/opaque-types/src/lib.rs @@ -254,6 +254,8 @@ get_opaque_type_data!(Vec<(ProtocolID, Arc)>, zc_loaned_ /// An owned SHM Client Storage get_opaque_type_data!(Option>, z_owned_shared_memory_client_storage_t); +/// A loaned SHM Client Storage +get_opaque_type_data!(Arc, z_loaned_shared_memory_client_storage_t); /// An owned MemoryLayout get_opaque_type_data!(Option, z_owned_memory_layout_t); diff --git a/examples/z_get_shm.c b/examples/z_get_shm.c new file mode 100644 index 000000000..8b6488162 --- /dev/null +++ b/examples/z_get_shm.c @@ -0,0 +1,126 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include + +#include "zenoh.h" + +int main(int argc, char** argv) { + char* expr = "demo/example/**"; + char* value = NULL; + switch (argc) { + default: + case 3: + value = argv[2]; + case 2: + expr = argv[1]; + break; + case 1: + value = "Test Value"; + break; + } + size_t value_len = value ? strlen(value) : 0; + + z_view_keyexpr_t keyexpr; + if (z_view_keyexpr_from_string(&keyexpr, expr) < 0) { + printf("%s is not a valid key expression", expr); + exit(-1); + } + z_owned_config_t config; + z_config_default(&config); + if (argc > 3) { + if (zc_config_insert_json(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config))) { + printf("Unable to open session!\n"); + exit(-1); + } + + // Create SHM Provider + z_alloc_alignment_t alignment = {0}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, value_len, alignment); + z_owned_shared_memory_provider_t provider; + z_posix_shared_memory_provider_new(&provider, z_loan(layout)); + z_owned_buf_alloc_result_t alloc; + z_shared_memory_provider_alloc(&alloc, z_loan(provider), value_len, alignment); + + // Allocate SHM Buffer + z_owned_shm_mut_t shm_mut; + z_alloc_error_t shm_error; + z_buf_alloc_result_unwrap(z_move(alloc), &shm_mut, &shm_error); + if (!z_check(shm_mut)) { + printf("Unexpected failure during SHM buffer allocation..."); + return -1; + } + // Fill SHM Buffer with data + uint8_t* data = z_shm_mut_data_mut(z_loan_mut(shm_mut)); + memcpy(data, value, value_len); + // Convert mutable SHM Buffer into immutable one (to be able to make it's ref copies) + z_owned_shm_t shm; + z_shm_from_mut(&shm, z_move(shm_mut)); + const z_loaned_shm_t* loaned_shm = z_loan(shm); + + printf("Sending Query '%s'...\n", expr); + z_owned_reply_channel_t channel; + zc_reply_fifo_new(&channel, 16); + + z_get_options_t opts; + z_get_options_default(&opts); + + z_owned_bytes_t payload; + if (value != NULL) { + z_bytes_encode_from_shm_copy(&payload, z_loan(shm)); + opts.payload = &payload; + } + z_get(z_loan(s), z_loan(keyexpr), "", z_move(channel.send), + z_move(opts)); // here, the send is moved and will be dropped by zenoh when adequate + z_owned_reply_t reply; + + for (z_call(z_loan(channel.recv), &reply); z_check(reply); z_call(z_loan(channel.recv), &reply)) { + if (z_reply_is_ok(z_loan(reply))) { + const z_loaned_sample_t* sample = z_reply_ok(z_loan(reply)); + + z_view_str_t key_str; + z_keyexpr_to_string(z_sample_keyexpr(sample), &key_str); + + z_owned_str_t reply_str; + z_bytes_decode_into_string(z_sample_payload(sample), &reply_str); + + printf(">> Received ('%.*s': '%.*s')\n", (int)z_str_len(z_loan(key_str)), z_str_data(z_loan(key_str)), + (int)z_str_len(z_loan(reply_str)), z_str_data(z_loan(reply_str))); + z_drop(z_move(reply_str)); + } else { + printf("Received an error\n"); + } + z_drop(z_move(reply)); + } + + z_drop(z_move(channel)); + z_close(z_move(s)); + + z_drop(z_move(shm)); + z_drop(z_move(provider)); + z_drop(z_move(layout)); + return 0; +} \ No newline at end of file diff --git a/examples/z_ping_shm.c b/examples/z_ping_shm.c new file mode 100644 index 000000000..35f12647c --- /dev/null +++ b/examples/z_ping_shm.c @@ -0,0 +1,180 @@ +#include +#include +#include +#include + +#include "zenoh.h" + +#define DEFAULT_PKT_SIZE 8 +#define DEFAULT_PING_NB 100 +#define DEFAULT_WARMUP_MS 1000 +#define PING_TIMEOUT_SEC 1 + +#define handle_error_en(en, msg) \ + do { \ + errno = en; \ + perror(msg); \ + exit(EXIT_FAILURE); \ + } while (0) + +z_owned_condvar_t cond; +z_owned_mutex_t mutex; + +void callback(const z_loaned_sample_t* sample, void* context) { z_condvar_signal(z_loan(cond)); } +void drop(void* context) { z_drop(z_move(cond)); } + +struct args_t { + unsigned int size; // -s + unsigned int number_of_pings; // -n + unsigned int warmup_ms; // -w + char* config_path; // -c + uint8_t help_requested; // -h +}; +struct args_t parse_args(int argc, char** argv); + +int main(int argc, char** argv) { + struct args_t args = parse_args(argc, argv); + if (args.help_requested) { + printf( + "\ + -n (optional, int, default=%d): the number of pings to be attempted\n\ + -s (optional, int, default=%d): the size of the payload embedded in the ping and repeated by the pong\n\ + -w (optional, int, default=%d): the warmup time in ms during which pings will be emitted but not measured\n\ + -c (optional, string): the path to a configuration file for the session. If this option isn't passed, the default configuration will be used.\n\ + ", + DEFAULT_PKT_SIZE, DEFAULT_PING_NB, DEFAULT_WARMUP_MS); + return 1; + } + z_mutex_init(&mutex); + z_condvar_init(&cond); + z_owned_config_t config; + if (args.config_path) { + zc_config_from_file(&config, args.config_path); + } else { + z_config_default(&config); + } + z_owned_session_t session; + z_open(&session, z_move(config)); + z_view_keyexpr_t ping; + z_view_keyexpr_from_string_unchecked(&ping, "test/ping"); + z_view_keyexpr_t pong; + z_view_keyexpr_from_string_unchecked(&pong, "test/pong"); + z_owned_publisher_t pub; + z_declare_publisher(&pub, z_loan(session), z_loan(ping), NULL); + z_owned_closure_sample_t respond; + z_closure(&respond, callback, drop, (void*)(&pub)); + z_owned_subscriber_t sub; + z_declare_subscriber(&sub, z_loan(session), z_loan(pong), z_move(respond), NULL); + + // Create SHM Provider + z_alloc_alignment_t alignment = {0}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, args.size, alignment); + z_owned_shared_memory_provider_t provider; + z_posix_shared_memory_provider_new(&provider, z_loan(layout)); + z_owned_buf_alloc_result_t alloc; + z_shared_memory_provider_alloc(&alloc, z_loan(provider), args.size, alignment); + + // Allocate SHM Buffer + z_owned_shm_mut_t shm_mut; + z_alloc_error_t shm_error; + z_buf_alloc_result_unwrap(z_move(alloc), &shm_mut, &shm_error); + if (!z_check(shm_mut)) { + printf("Unexpected failure during SHM buffer allocation..."); + return -1; + } + // Fill SHM Buffer with data + uint8_t* data = z_shm_mut_data_mut(z_loan_mut(shm_mut)); + for (int i = 0; i < args.size; i++) { + data[i] = i % 10; + } + // Convert mutable SHM Buffer into immutable one (to be able to make it's ref copies) + z_owned_shm_t shm; + z_shm_from_mut(&shm, z_move(shm_mut)); + const z_loaned_shm_t* loaned_shm = z_loan(shm); + + z_owned_bytes_t payload; + + z_mutex_lock(z_loan_mut(mutex)); + if (args.warmup_ms) { + printf("Warming up for %dms...\n", args.warmup_ms); + z_clock_t warmup_start = z_clock_now(); + + unsigned long elapsed_us = 0; + while (elapsed_us < args.warmup_ms * 1000) { + z_bytes_encode_from_shm_copy(&payload, loaned_shm); + z_publisher_put(z_loan(pub), z_move(payload), NULL); + int s = z_condvar_wait(z_loan(cond), z_loan_mut(mutex)); + if (s != 0) { + handle_error_en(s, "z_condvar_wait"); + } + elapsed_us = z_clock_elapsed_us(&warmup_start); + } + } + unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings); + for (int i = 0; i < args.number_of_pings; i++) { + z_clock_t measure_start = z_clock_now(); + z_publisher_put(z_loan(pub), z_move(payload), NULL); + int s = z_condvar_wait(z_loan(cond), z_loan_mut(mutex)); + if (s != 0) { + handle_error_en(s, "z_condvar_wait"); + } + results[i] = z_clock_elapsed_us(&measure_start); + } + for (int i = 0; i < args.number_of_pings; i++) { + printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2); + } + z_mutex_unlock(z_loan_mut(mutex)); + z_free(results); + z_free(data); + z_undeclare_subscriber(z_move(sub)); + z_undeclare_publisher(z_move(pub)); + z_drop(z_move(mutex)); + z_close(z_move(session)); + + z_drop(z_move(shm)); + z_drop(z_move(provider)); + z_drop(z_move(layout)); +} + +char* getopt(int argc, char** argv, char option) { + for (int i = 0; i < argc; i++) { + size_t len = strlen(argv[i]); + if (len >= 2 && argv[i][0] == '-' && argv[i][1] == option) { + if (len > 2 && argv[i][2] == '=') { + return argv[i] + 3; + } else if (i + 1 < argc) { + return argv[i + 1]; + } + } + } + return NULL; +} + +struct args_t parse_args(int argc, char** argv) { + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0) { + return (struct args_t){.help_requested = 1}; + } + } + char* arg = getopt(argc, argv, 's'); + unsigned int size = DEFAULT_PKT_SIZE; + if (arg) { + size = atoi(arg); + } + arg = getopt(argc, argv, 'n'); + unsigned int number_of_pings = DEFAULT_PING_NB; + if (arg) { + number_of_pings = atoi(arg); + } + arg = getopt(argc, argv, 'w'); + unsigned int warmup_ms = DEFAULT_WARMUP_MS; + if (arg) { + warmup_ms = atoi(arg); + } + return (struct args_t){.help_requested = 0, + .size = size, + .number_of_pings = number_of_pings, + .warmup_ms = warmup_ms, + .config_path = getopt(argc, argv, 'c')}; +} \ No newline at end of file diff --git a/examples/z_pub_shm.c b/examples/z_pub_shm.c new file mode 100644 index 000000000..e8962fb35 --- /dev/null +++ b/examples/z_pub_shm.c @@ -0,0 +1,118 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include "zenoh.h" + +void matching_status_handler(const zcu_matching_status_t *matching_status, void *arg) { + if (matching_status->matching) { + printf("Subscriber matched\n"); + } else { + printf("No Subscribers matched\n"); + } +} + +int main(int argc, char **argv) { + char *keyexpr = "demo/example/zenoh-c-pub"; + char *value = "Pub from C!"; + bool add_matching_listener = false; + + if (argc > 1) keyexpr = argv[1]; + if (argc > 2) value = argv[2]; + if (argc > 3) add_matching_listener = atoi(argv[3]); + + z_owned_config_t config; + z_config_default(&config); + if (argc > 4) { + if (zc_config_insert_json(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, argv[4]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[4], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Declaring Publisher on '%s'...\n", keyexpr); + z_owned_publisher_t pub; + z_view_keyexpr_t ke; + z_view_keyexpr_from_string(&ke, keyexpr); + if (z_declare_publisher(&pub, z_loan(s), z_loan(ke), NULL) < 0) { + printf("Unable to declare Publisher for key expression!\n"); + exit(-1); + } + + zcu_owned_matching_listener_t listener; + if (add_matching_listener) { + zcu_owned_closure_matching_status_t callback; + z_closure(&callback, matching_status_handler, NULL, NULL); + zcu_publisher_matching_listener_callback(&listener, z_loan(pub), z_move(callback)); + } + + printf("Creating POSIX SHM Provider...\n"); + const size_t total_size = 4096; + const size_t buf_ok_size = total_size / 4; + + z_alloc_alignment_t alignment = {0}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, total_size, alignment); + + z_owned_shared_memory_provider_t provider; + z_posix_shared_memory_provider_new(&provider, z_loan(layout)); + + for (int idx = 0; 1; ++idx) { + z_sleep_s(1); + + z_owned_buf_alloc_result_t alloc; + z_shared_memory_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), buf_ok_size, alignment); + + z_owned_shm_mut_t shm_buf; + z_alloc_error_t shm_error; + z_buf_alloc_result_unwrap(z_move(alloc), &shm_buf, &shm_error); + if (z_check(shm_buf)) { + { + uint8_t *buf = z_shm_mut_data_mut(z_loan_mut(shm_buf)); + sprintf(buf, "SHM [%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + } + + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + + z_owned_bytes_t payload; + z_bytes_encode_from_shm_mut(&payload, &shm_buf); + + z_publisher_put(z_loan(pub), z_move(payload), &options); + } else { + printf("Unexpected failure during SHM buffer allocation..."); + break; + } + } + + z_undeclare_publisher(z_move(pub)); + z_close(z_move(s)); + + z_drop(z_move(provider)); + z_drop(z_move(layout)); + + return 0; +} diff --git a/examples/z_pub_shm_thr.c b/examples/z_pub_shm_thr.c new file mode 100644 index 000000000..e6c5ccb42 --- /dev/null +++ b/examples/z_pub_shm_thr.c @@ -0,0 +1,92 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include "zenoh.h" + +int main(int argc, char **argv) { + if (argc < 2) { + printf("USAGE:\n\tz_pub_thr []\n\n"); + exit(-1); + } + + char *keyexpr = "test/thr"; + size_t len = atoi(argv[1]); + + z_owned_config_t config; + z_config_default(&config); + if (argc > 2) { + if (zc_config_insert_json(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + exit(-1); + } + + z_publisher_options_t options; + z_publisher_options_default(&options); + options.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + + z_owned_publisher_t pub; + z_view_keyexpr_t ke; + z_view_keyexpr_from_string(&ke, keyexpr); + if (z_declare_publisher(&pub, z_loan(s), z_loan(ke), &options)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + + printf("Creating POSIX SHM Provider...\n"); + z_alloc_alignment_t alignment = {0}; + z_owned_memory_layout_t layout; + z_memory_layout_new(&layout, len, alignment); + z_owned_shared_memory_provider_t provider; + z_posix_shared_memory_provider_new(&provider, z_loan(layout)); + z_owned_buf_alloc_result_t alloc; + z_shared_memory_provider_alloc(&alloc, z_loan(provider), len, alignment); + + printf("Allocating single SHM buffer\n"); + z_owned_shm_mut_t shm_mut; + z_alloc_error_t shm_error; + z_buf_alloc_result_unwrap(z_move(alloc), &shm_mut, &shm_error); + if (!z_check(shm_mut)) { + printf("Unexpected failure during SHM buffer allocation..."); + return -1; + } + memset(z_shm_mut_data_mut(z_loan_mut(shm_mut)), 1, len); + z_owned_shm_t shm; + z_shm_from_mut(&shm, z_move(shm_mut)); + const z_loaned_shm_t *loaned_shm = z_loan(shm); + + z_owned_bytes_t payload; + while (1) { + z_bytes_encode_from_shm_copy(&payload, loaned_shm); + z_publisher_put(z_loan(pub), z_move(payload), NULL); + } + + z_undeclare_publisher(z_move(pub)); + z_close(z_move(s)); + + z_drop(z_move(shm)); + z_drop(z_move(provider)); + z_drop(z_move(layout)); +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 80d0c1ca3..4bcf593a0 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -287,6 +287,18 @@ typedef struct ALIGN(8) z_owned_bytes_t { typedef struct ALIGN(8) z_loaned_bytes_t { uint8_t _0[40]; } z_loaned_bytes_t; +/** + * A loaned ZShm slice + */ +typedef struct ALIGN(8) z_loaned_shm_t { + uint8_t _0[80]; +} z_loaned_shm_t; +/** + * An owned ZShm slice + */ +typedef struct ALIGN(8) z_owned_shm_t { + uint8_t _0[80]; +} z_owned_shm_t; /** * A contiguous owned sequence of bytes allocated by Zenoh. */ @@ -788,6 +800,12 @@ typedef struct ALIGN(8) z_loaned_memory_layout_t { typedef struct ALIGN(8) z_owned_mutex_t { uint8_t _0[24]; } z_owned_mutex_t; +/** + * A loaned SHM Client Storage + */ +typedef struct ALIGN(8) z_loaned_shared_memory_client_storage_t { + uint8_t _0[8]; +} z_loaned_shared_memory_client_storage_t; /** * An owned SHM Client */ @@ -1122,18 +1140,6 @@ typedef struct zc_shared_memory_provider_backend_callbacks_t { size_t (*available_fn)(void*); void (*layout_for_fn)(void*, struct z_owned_memory_layout_t*); } zc_shared_memory_provider_backend_callbacks_t; -/** - * An owned ZShm slice - */ -typedef struct ALIGN(8) z_owned_shm_t { - uint8_t _0[80]; -} z_owned_shm_t; -/** - * A loaned ZShm slice - */ -typedef struct ALIGN(8) z_loaned_shm_t { - uint8_t _0[80]; -} z_loaned_shm_t; /** * A loaned ZShmMut slice */ @@ -1429,6 +1435,24 @@ z_error_t z_bytes_decode_into_iter(const struct z_loaned_bytes_t *this_, z_error_t (*iterator_body)(const struct z_loaned_bytes_t *data, void *context), void *context); +/** + * Decodes data into a loaned SHM buffer + * + * @param this_: Data to decode. + * @param dst: An unitialized memory location where to construct a decoded string. + */ +ZENOHC_API +z_error_t z_bytes_decode_into_loaned_shm(const struct z_loaned_bytes_t *this_, + const struct z_loaned_shm_t **dst); +/** + * Decodes data into an owned SHM buffer by copying it's shared reference + * + * @param this_: Data to decode. + * @param dst: An unitialized memory location where to construct a decoded string. + */ +ZENOHC_API +z_error_t z_bytes_decode_into_owned_shm(const struct z_loaned_bytes_t *this_, + struct z_owned_shm_t *dst); /** * Decodes into a pair of `z_owned_bytes` objects. * @return 0 in case of success, negative error code otherwise. @@ -1488,6 +1512,24 @@ ZENOHC_API z_error_t z_bytes_encode_from_pair(struct z_owned_bytes_t *this_, struct z_owned_bytes_t *first, struct z_owned_bytes_t *second); +/** + * Encodes from an immutable SHM buffer consuming it + */ +ZENOHC_API +z_error_t z_bytes_encode_from_shm(struct z_owned_bytes_t *this_, + struct z_owned_shm_t *shm); +/** + * Encodes from an immutable SHM buffer copying it + */ +ZENOHC_API +void z_bytes_encode_from_shm_copy(struct z_owned_bytes_t *this_, + const struct z_loaned_shm_t *shm); +/** + * Encodes from a mutable SHM buffer consuming it + */ +ZENOHC_API +z_error_t z_bytes_encode_from_shm_mut(struct z_owned_bytes_t *this_, + struct z_owned_shm_mut_t *shm); /** * Encodes a slice by aliasing. */ @@ -2282,6 +2324,15 @@ z_error_t z_mutex_unlock(struct z_loaned_mutex_t *this_); ZENOHC_API z_error_t z_open(struct z_owned_session_t *this_, struct z_owned_config_t *config); +/** + * Constructs and opens a new Zenoh session with specified client storage. + * + * @return 0 in case of success, negative error code otherwise (in this case the session will be in its gravestone state). + */ +ZENOHC_API +z_error_t z_open_with_custom_shm_clients(struct z_owned_session_t *this_, + struct z_owned_config_t *config, + const struct z_loaned_shared_memory_client_storage_t *shm_clients); /** * Creates a new POSIX SHM Client */ @@ -2776,6 +2827,11 @@ bool z_shared_memory_client_storage_check(const struct z_owned_shared_memory_cli */ ZENOHC_API void z_shared_memory_client_storage_drop(struct z_owned_shared_memory_client_storage_t *this_); +/** + * Borrows SHM Client Storage + */ +ZENOHC_API +const struct z_loaned_shared_memory_client_storage_t *z_shared_memory_client_storage_loan(const struct z_owned_shared_memory_client_storage_t *this_); ZENOHC_API z_error_t z_shared_memory_client_storage_new(struct z_owned_shared_memory_client_storage_t *this_, const struct zc_loaned_shared_memory_client_list_t *clients, @@ -2874,6 +2930,10 @@ ZENOHC_API bool z_shm_check(const struct z_owned_shm_t *this_); * Converts borrowed ZShm slice as owned ZShm slice by performing shared memory handle copy */ ZENOHC_API void z_shm_copy(struct z_owned_shm_t *this_, const struct z_loaned_shm_t *loaned); +/** + * @return the pointer of the ZShm slice + */ +ZENOHC_API const unsigned char *z_shm_data(const struct z_loaned_shm_t *this_); /** * Deletes ZShm slice */ @@ -2882,6 +2942,10 @@ ZENOHC_API void z_shm_drop(struct z_owned_shm_t *this_); * Constructs ZShm slice from ZShmMut slice */ ZENOHC_API void z_shm_from_mut(struct z_owned_shm_t *this_, struct z_owned_shm_mut_t *that); +/** + * @return the length of the ZShm slice + */ +ZENOHC_API size_t z_shm_len(const struct z_loaned_shm_t *this_); /** * Borrows ZShm slice */ @@ -2895,9 +2959,17 @@ ZENOHC_API struct z_loaned_shm_t *z_shm_loan_mut(struct z_owned_shm_t *this_); */ ZENOHC_API bool z_shm_mut_check(const struct z_owned_shm_mut_t *this_); /** - * Deletes ZShm slice + * @return the mutable pointer of the ZShmMut slice + */ +ZENOHC_API unsigned char *z_shm_mut_data_mut(struct z_loaned_shm_mut_t *this_); +/** + * Deletes ZShmMut slice */ ZENOHC_API void z_shm_mut_drop(struct z_owned_shm_mut_t *this_); +/** + * @return the length of the ZShmMut slice + */ +ZENOHC_API size_t z_shm_mut_len(const struct z_loaned_shm_mut_t *this_); /** * Borrows ZShmMut slice */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index e048d6cf0..128d9a6cd 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -31,6 +31,7 @@ z_owned_reply_t : z_reply_loan, \ z_owned_sample_t : z_sample_loan, \ z_owned_session_t : z_session_loan, \ + z_owned_shared_memory_client_storage_t : z_shared_memory_client_storage_loan, \ z_owned_shared_memory_provider_t : z_shared_memory_provider_loan, \ z_owned_shm_t : z_shm_loan, \ z_owned_slice_t : z_slice_loan, \ @@ -255,6 +256,7 @@ inline const z_loaned_reply_channel_closure_t* z_loan(const z_owned_reply_channe inline const z_loaned_reply_t* z_loan(const z_owned_reply_t& this_) { return z_reply_loan(&this_); }; inline const z_loaned_sample_t* z_loan(const z_owned_sample_t& this_) { return z_sample_loan(&this_); }; inline const z_loaned_session_t* z_loan(const z_owned_session_t& this_) { return z_session_loan(&this_); }; +inline const z_loaned_shared_memory_client_storage_t* z_loan(const z_owned_shared_memory_client_storage_t& this_) { return z_shared_memory_client_storage_loan(&this_); }; inline const z_loaned_shared_memory_provider_t* z_loan(const z_owned_shared_memory_provider_t& this_) { return z_shared_memory_provider_loan(&this_); }; inline const z_loaned_shm_t* z_loan(const z_owned_shm_t& this_) { return z_shm_loan(&this_); }; inline const z_loaned_slice_t* z_loan(const z_owned_slice_t& this_) { return z_slice_loan(&this_); }; @@ -634,6 +636,8 @@ template<> struct z_loaned_to_owned_type_t { typedef z_owned_ template<> struct z_owned_to_loaned_type_t { typedef z_loaned_sample_t type; }; template<> struct z_loaned_to_owned_type_t { typedef z_owned_session_t type; }; template<> struct z_owned_to_loaned_type_t { typedef z_loaned_session_t type; }; +template<> struct z_loaned_to_owned_type_t { typedef z_owned_shared_memory_client_storage_t type; }; +template<> struct z_owned_to_loaned_type_t { typedef z_loaned_shared_memory_client_storage_t type; }; template<> struct z_loaned_to_owned_type_t { typedef z_owned_shared_memory_provider_t type; }; template<> struct z_owned_to_loaned_type_t { typedef z_loaned_shared_memory_provider_t type; }; template<> struct z_loaned_to_owned_type_t { typedef z_owned_shm_t type; }; diff --git a/src/lib.rs b/src/lib.rs index d08bec426..ca5930ea7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,7 +62,7 @@ mod querying_subscriber; pub use platform::*; pub use querying_subscriber::*; pub mod platform; -//#[cfg(all(feature = "shared-memory", feature = "unstable"))] +#[cfg(all(feature = "shared-memory", feature = "unstable"))] pub mod context; #[cfg(all(feature = "shared-memory", feature = "unstable"))] pub mod shm; diff --git a/src/payload.rs b/src/payload.rs index 8f427a007..bde211776 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -15,6 +15,11 @@ use std::slice::from_raw_parts_mut; use zenoh::buffers::{ZBuf, ZSlice, ZSliceBuffer}; use zenoh::bytes::{ZBytes, ZBytesReader}; +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +use crate::errors::Z_ENULL; +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +use crate::{z_loaned_shm_t, z_owned_shm_mut_t, z_owned_shm_t}; + pub use crate::opaque_types::z_owned_bytes_t; decl_transmute_owned!(Option, z_owned_bytes_t); @@ -136,6 +141,59 @@ pub unsafe extern "C" fn z_bytes_decode_into_slice( } } +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Decodes data into an owned SHM buffer by copying it's shared reference +/// +/// @param this_: Data to decode. +/// @param dst: An unitialized memory location where to construct a decoded string. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_decode_into_owned_shm( + this: &z_loaned_bytes_t, + dst: *mut MaybeUninit, +) -> z_error_t { + use zenoh::shm::zshm; + + let payload = this.transmute_ref(); + match payload.deserialize::<&zshm>() { + Ok(s) => { + Inplace::init(dst.transmute_uninit_ptr(), Some(s.clone().to_owned())); + errors::Z_OK + } + Err(e) => { + log::error!("Failed to decode the payload: {}", e); + Inplace::empty(dst.transmute_uninit_ptr()); + errors::Z_EIO + } + } +} + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Decodes data into a loaned SHM buffer +/// +/// @param this_: Data to decode. +/// @param dst: An unitialized memory location where to construct a decoded string. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_decode_into_loaned_shm( + this: &z_loaned_bytes_t, + dst: *mut MaybeUninit<&'static z_loaned_shm_t>, +) -> z_error_t { + use zenoh::shm::zshm; + + let payload = this.transmute_ref(); + match payload.deserialize::<&zshm>() { + Ok(s) => { + (*dst).write(s.transmute_handle()); + errors::Z_OK + } + Err(e) => { + log::error!("Failed to decode the payload: {}", e); + errors::Z_EIO + } + } +} + unsafe impl Send for CSlice {} unsafe impl Sync for CSlice {} @@ -266,7 +324,7 @@ pub extern "C" fn z_bytes_encode_from_pair( }; let b = ZBytes::serialize((first, second)); Inplace::init(this.transmute_uninit_ptr(), Some(b)); - return Z_OK; + Z_OK } /// Decodes into a pair of `z_owned_bytes` objects. @@ -281,13 +339,13 @@ pub extern "C" fn z_bytes_decode_into_pair( Ok((a, b)) => { Inplace::init(first.transmute_uninit_ptr(), Some(a)); Inplace::init(second.transmute_uninit_ptr(), Some(b)); - return Z_OK; + Z_OK }, Err(e) => { log::error!("Failed to decode the payload: {}", e); - return Z_EPARSE; + Z_EPARSE } - }; + } } struct ZBytesInIterator { @@ -332,7 +390,7 @@ pub extern "C" fn z_bytes_encode_from_iter( let b = ZBytes::from_iter(it); Inplace::init(this.transmute_uninit_ptr(), Some(b)); - return Z_OK; + Z_OK } /// Decodes payload into an iterator to `z_loaned_bytes_t`. @@ -358,7 +416,55 @@ pub extern "C" fn z_bytes_decode_into_iter( } } - return res; + res +} + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Encodes from an immutable SHM buffer consuming it +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_encode_from_shm( + this: *mut MaybeUninit, + shm: &mut z_owned_shm_t, +) -> z_error_t { + match shm.transmute_mut().take() { + Some(shm) => { + let this = this.transmute_uninit_ptr(); + Inplace::init(this, Some(shm.into())); + Z_OK + } + None => Z_ENULL, + } +} + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Encodes from an immutable SHM buffer copying it +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_encode_from_shm_copy( + this: *mut MaybeUninit, + shm: &z_loaned_shm_t, +) { + let this = this.transmute_uninit_ptr(); + Inplace::init(this, Some(shm.transmute_ref().to_owned().into())); +} + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Encodes from a mutable SHM buffer consuming it +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_encode_from_shm_mut( + this: *mut MaybeUninit, + shm: &mut z_owned_shm_mut_t, +) -> z_error_t { + match shm.transmute_mut().take() { + Some(shm) => { + let this = this.transmute_uninit_ptr(); + Inplace::init(this, Some(shm.into())); + Z_OK + } + None => Z_ENULL, + } } pub use crate::opaque_types::z_owned_bytes_reader_t; diff --git a/src/session.rs b/src/session.rs index 154b9b5b3..8a5d51dfb 100644 --- a/src/session.rs +++ b/src/session.rs @@ -19,8 +19,11 @@ use crate::transmute::{ use crate::{errors, z_owned_config_t, zc_init_logger}; use std::mem::MaybeUninit; use std::sync::Arc; -use zenoh::session::Session; use zenoh::core::Wait; +use zenoh::session::Session; + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +use crate::z_loaned_shared_memory_client_storage_t; use crate::opaque_types::z_owned_session_t; decl_transmute_owned!(Option>, z_owned_session_t); @@ -78,6 +81,45 @@ pub extern "C" fn z_open( } } +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +/// Constructs and opens a new Zenoh session with specified client storage. +/// +/// @return 0 in case of success, negative error code otherwise (in this case the session will be in its gravestone state). +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn z_open_with_custom_shm_clients( + this: *mut MaybeUninit, + config: &mut z_owned_config_t, + shm_clients: &z_loaned_shared_memory_client_storage_t, +) -> errors::z_error_t { + let this = this.transmute_uninit_ptr(); + if cfg!(feature = "logger-autoinit") { + zc_init_logger(); + } + let config = match config.transmute_mut().extract() { + Some(c) => c, + None => { + log::error!("Config not provided"); + Inplace::empty(this); + return errors::Z_EINVAL; + } + }; + match zenoh::open(config) + .with_shm_clients(shm_clients.transmute_ref().clone()) + .wait() + { + Ok(s) => { + Inplace::init(this, Some(Arc::new(s))); + errors::Z_OK + } + Err(e) => { + log::error!("Error opening session: {}", e); + Inplace::empty(this); + errors::Z_ENETWORK + } + } +} + /// Returns ``true`` if `session` is valid, ``false`` otherwise. #[allow(clippy::missing_safety_doc)] #[no_mangle] @@ -105,7 +147,7 @@ pub extern "C" fn z_close(this: &mut z_owned_session_t) -> errors::z_error_t { Err(e) => { log::error!("Error closing session: {}", e); errors::Z_EGENERIC - }, + } Ok(_) => errors::Z_OK, } } diff --git a/src/shm/buffer/zshm.rs b/src/shm/buffer/zshm.rs index 38d321b23..8e67dbc7c 100644 --- a/src/shm/buffer/zshm.rs +++ b/src/shm/buffer/zshm.rs @@ -112,3 +112,16 @@ pub extern "C" fn z_shm_try_reloan_mut(this: &mut z_loaned_shm_t) -> *mut z_loan Err(_) => std::ptr::null_mut(), } } + +/// @return the length of the ZShm slice +#[no_mangle] +pub extern "C" fn z_shm_len(this: &z_loaned_shm_t) -> usize { + this.transmute_ref().len() +} + +/// @return the pointer of the ZShm slice +#[no_mangle] +pub extern "C" fn z_shm_data(this: &z_loaned_shm_t) -> *const libc::c_uchar { + let s = this.transmute_ref(); + s.as_ref().as_ptr() +} diff --git a/src/shm/buffer/zshmmut.rs b/src/shm/buffer/zshmmut.rs index a9f6c0aaf..6768b0aa1 100644 --- a/src/shm/buffer/zshmmut.rs +++ b/src/shm/buffer/zshmmut.rs @@ -18,7 +18,8 @@ use zenoh::shm::{zshmmut, ZShmMut}; use crate::{ transmute::{ - unwrap_ref_unchecked_mut, Inplace, TransmuteIntoHandle, TransmuteRef, TransmuteUninitPtr, + unwrap_ref_unchecked_mut, Inplace, TransmuteFromHandle, TransmuteIntoHandle, TransmuteRef, + TransmuteUninitPtr, }, z_loaned_shm_mut_t, z_owned_shm_mut_t, z_owned_shm_t, }; @@ -61,8 +62,21 @@ pub extern "C" fn z_shm_mut_loan_mut(this: &mut z_owned_shm_mut_t) -> &mut z_loa shmmut.transmute_handle_mut() } -/// Deletes ZShm slice +/// Deletes ZShmMut slice #[no_mangle] pub extern "C" fn z_shm_mut_drop(this: &mut z_owned_shm_mut_t) { - let _ = this.transmute_mut().take(); + let _ = this.transmute_mut().take(); +} + +/// @return the length of the ZShmMut slice +#[no_mangle] +pub extern "C" fn z_shm_mut_len(this: &z_loaned_shm_mut_t) -> usize { + this.transmute_ref().len() +} + +/// @return the mutable pointer of the ZShmMut slice +#[no_mangle] +pub extern "C" fn z_shm_mut_data_mut(this: &mut z_loaned_shm_mut_t) -> *mut libc::c_uchar { + let s = this.transmute_mut(); + s.as_mut().as_mut_ptr() } diff --git a/src/shm/client_storage/mod.rs b/src/shm/client_storage/mod.rs index 4b5271e7f..2f8be8399 100644 --- a/src/shm/client_storage/mod.rs +++ b/src/shm/client_storage/mod.rs @@ -24,8 +24,9 @@ use crate::{ unwrap_ref_unchecked, unwrap_ref_unchecked_mut, Inplace, TransmuteFromHandle, TransmuteIntoHandle, TransmuteRef, TransmuteUninitPtr, }, - z_owned_shared_memory_client_storage_t, z_owned_shared_memory_client_t, - zc_loaned_shared_memory_client_list_t, zc_owned_shared_memory_client_list_t, + z_loaned_shared_memory_client_storage_t, z_owned_shared_memory_client_storage_t, + z_owned_shared_memory_client_t, zc_loaned_shared_memory_client_list_t, + zc_owned_shared_memory_client_list_t, }; use super::common::types::z_protocol_id_t; @@ -114,6 +115,11 @@ decl_transmute_owned!( z_owned_shared_memory_client_storage_t ); +decl_transmute_handle!( + Arc, + z_loaned_shared_memory_client_storage_t +); + #[no_mangle] pub extern "C" fn z_ref_shared_memory_client_storage_global( this: *mut MaybeUninit, @@ -184,3 +190,13 @@ pub extern "C" fn z_shared_memory_client_storage_drop( ) { let _ = this.transmute_mut().take(); } + +/// Borrows SHM Client Storage +#[no_mangle] +pub extern "C" fn z_shared_memory_client_storage_loan( + this: &z_owned_shared_memory_client_storage_t, +) -> &z_loaned_shared_memory_client_storage_t { + let this = this.transmute_ref(); + let this = unwrap_ref_unchecked(this); + this.transmute_handle() +}