Skip to content

Commit

Permalink
- shm to session API integration
Browse files Browse the repository at this point in the history
- shm to payload API integration
- examples
- some improvements
  • Loading branch information
yellowhatter committed May 28, 2024
1 parent e3d38d3 commit 1f99ad7
Show file tree
Hide file tree
Showing 13 changed files with 812 additions and 27 deletions.
2 changes: 2 additions & 0 deletions build-resources/opaque-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ get_opaque_type_data!(Vec<(ProtocolID, Arc<dyn SharedMemoryClient>)>, zc_loaned_

/// An owned SHM Client Storage
get_opaque_type_data!(Option<Arc<SharedMemoryClientStorage>>, z_owned_shared_memory_client_storage_t);
/// A loaned SHM Client Storage
get_opaque_type_data!(Arc<SharedMemoryClientStorage>, z_loaned_shared_memory_client_storage_t);

/// An owned MemoryLayout
get_opaque_type_data!(Option<MemoryLayout>, z_owned_memory_layout_t);
Expand Down
126 changes: 126 additions & 0 deletions examples/z_get_shm.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>

#include "zenoh.h"

int main(int argc, char** argv) {
char* expr = "demo/example/**";
char* value = NULL;
switch (argc) {
default:
case 3:
value = argv[2];
case 2:
expr = argv[1];
break;
case 1:
value = "Test Value";
break;
}
size_t value_len = value ? strlen(value) : 0;

z_view_keyexpr_t keyexpr;
if (z_view_keyexpr_from_string(&keyexpr, expr) < 0) {
printf("%s is not a valid key expression", expr);
exit(-1);
}
z_owned_config_t config;
z_config_default(&config);
if (argc > 3) {
if (zc_config_insert_json(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config))) {
printf("Unable to open session!\n");
exit(-1);
}

// Create SHM Provider
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, value_len, alignment);
z_owned_shared_memory_provider_t provider;
z_posix_shared_memory_provider_new(&provider, z_loan(layout));
z_owned_buf_alloc_result_t alloc;
z_shared_memory_provider_alloc(&alloc, z_loan(provider), value_len, alignment);

// Allocate SHM Buffer
z_owned_shm_mut_t shm_mut;
z_alloc_error_t shm_error;
z_buf_alloc_result_unwrap(z_move(alloc), &shm_mut, &shm_error);
if (!z_check(shm_mut)) {
printf("Unexpected failure during SHM buffer allocation...");
return -1;
}
// Fill SHM Buffer with data
uint8_t* data = z_shm_mut_data_mut(z_loan_mut(shm_mut));
memcpy(data, value, value_len);
// Convert mutable SHM Buffer into immutable one (to be able to make it's ref copies)
z_owned_shm_t shm;
z_shm_from_mut(&shm, z_move(shm_mut));
const z_loaned_shm_t* loaned_shm = z_loan(shm);

printf("Sending Query '%s'...\n", expr);
z_owned_reply_channel_t channel;
zc_reply_fifo_new(&channel, 16);

z_get_options_t opts;
z_get_options_default(&opts);

z_owned_bytes_t payload;
if (value != NULL) {
z_bytes_encode_from_shm_copy(&payload, z_loan(shm));
opts.payload = &payload;
}
z_get(z_loan(s), z_loan(keyexpr), "", z_move(channel.send),
z_move(opts)); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply;

for (z_call(z_loan(channel.recv), &reply); z_check(reply); z_call(z_loan(channel.recv), &reply)) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t* sample = z_reply_ok(z_loan(reply));

z_view_str_t key_str;
z_keyexpr_to_string(z_sample_keyexpr(sample), &key_str);

z_owned_str_t reply_str;
z_bytes_decode_into_string(z_sample_payload(sample), &reply_str);

printf(">> Received ('%.*s': '%.*s')\n", (int)z_str_len(z_loan(key_str)), z_str_data(z_loan(key_str)),
(int)z_str_len(z_loan(reply_str)), z_str_data(z_loan(reply_str)));
z_drop(z_move(reply_str));
} else {
printf("Received an error\n");
}
z_drop(z_move(reply));
}

z_drop(z_move(channel));
z_close(z_move(s));

z_drop(z_move(shm));
z_drop(z_move(provider));
z_drop(z_move(layout));
return 0;
}
180 changes: 180 additions & 0 deletions examples/z_ping_shm.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>

#include "zenoh.h"

#define DEFAULT_PKT_SIZE 8
#define DEFAULT_PING_NB 100
#define DEFAULT_WARMUP_MS 1000
#define PING_TIMEOUT_SEC 1

#define handle_error_en(en, msg) \
do { \
errno = en; \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)

z_owned_condvar_t cond;
z_owned_mutex_t mutex;

void callback(const z_loaned_sample_t* sample, void* context) { z_condvar_signal(z_loan(cond)); }
void drop(void* context) { z_drop(z_move(cond)); }

struct args_t {
unsigned int size; // -s
unsigned int number_of_pings; // -n
unsigned int warmup_ms; // -w
char* config_path; // -c
uint8_t help_requested; // -h
};
struct args_t parse_args(int argc, char** argv);

int main(int argc, char** argv) {
struct args_t args = parse_args(argc, argv);
if (args.help_requested) {
printf(
"\
-n (optional, int, default=%d): the number of pings to be attempted\n\
-s (optional, int, default=%d): the size of the payload embedded in the ping and repeated by the pong\n\
-w (optional, int, default=%d): the warmup time in ms during which pings will be emitted but not measured\n\
-c (optional, string): the path to a configuration file for the session. If this option isn't passed, the default configuration will be used.\n\
",
DEFAULT_PKT_SIZE, DEFAULT_PING_NB, DEFAULT_WARMUP_MS);
return 1;
}
z_mutex_init(&mutex);
z_condvar_init(&cond);
z_owned_config_t config;
if (args.config_path) {
zc_config_from_file(&config, args.config_path);
} else {
z_config_default(&config);
}
z_owned_session_t session;
z_open(&session, z_move(config));
z_view_keyexpr_t ping;
z_view_keyexpr_from_string_unchecked(&ping, "test/ping");
z_view_keyexpr_t pong;
z_view_keyexpr_from_string_unchecked(&pong, "test/pong");
z_owned_publisher_t pub;
z_declare_publisher(&pub, z_loan(session), z_loan(ping), NULL);
z_owned_closure_sample_t respond;
z_closure(&respond, callback, drop, (void*)(&pub));
z_owned_subscriber_t sub;
z_declare_subscriber(&sub, z_loan(session), z_loan(pong), z_move(respond), NULL);

// Create SHM Provider
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, args.size, alignment);
z_owned_shared_memory_provider_t provider;
z_posix_shared_memory_provider_new(&provider, z_loan(layout));
z_owned_buf_alloc_result_t alloc;
z_shared_memory_provider_alloc(&alloc, z_loan(provider), args.size, alignment);

// Allocate SHM Buffer
z_owned_shm_mut_t shm_mut;
z_alloc_error_t shm_error;
z_buf_alloc_result_unwrap(z_move(alloc), &shm_mut, &shm_error);
if (!z_check(shm_mut)) {
printf("Unexpected failure during SHM buffer allocation...");
return -1;
}
// Fill SHM Buffer with data
uint8_t* data = z_shm_mut_data_mut(z_loan_mut(shm_mut));
for (int i = 0; i < args.size; i++) {
data[i] = i % 10;
}
// Convert mutable SHM Buffer into immutable one (to be able to make it's ref copies)
z_owned_shm_t shm;
z_shm_from_mut(&shm, z_move(shm_mut));
const z_loaned_shm_t* loaned_shm = z_loan(shm);

z_owned_bytes_t payload;

z_mutex_lock(z_loan_mut(mutex));
if (args.warmup_ms) {
printf("Warming up for %dms...\n", args.warmup_ms);
z_clock_t warmup_start = z_clock_now();

unsigned long elapsed_us = 0;
while (elapsed_us < args.warmup_ms * 1000) {
z_bytes_encode_from_shm_copy(&payload, loaned_shm);
z_publisher_put(z_loan(pub), z_move(payload), NULL);
int s = z_condvar_wait(z_loan(cond), z_loan_mut(mutex));
if (s != 0) {
handle_error_en(s, "z_condvar_wait");
}
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings);
for (int i = 0; i < args.number_of_pings; i++) {
z_clock_t measure_start = z_clock_now();
z_publisher_put(z_loan(pub), z_move(payload), NULL);
int s = z_condvar_wait(z_loan(cond), z_loan_mut(mutex));
if (s != 0) {
handle_error_en(s, "z_condvar_wait");
}
results[i] = z_clock_elapsed_us(&measure_start);
}
for (int i = 0; i < args.number_of_pings; i++) {
printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2);
}
z_mutex_unlock(z_loan_mut(mutex));
z_free(results);
z_free(data);
z_undeclare_subscriber(z_move(sub));
z_undeclare_publisher(z_move(pub));
z_drop(z_move(mutex));
z_close(z_move(session));

z_drop(z_move(shm));
z_drop(z_move(provider));
z_drop(z_move(layout));
}

char* getopt(int argc, char** argv, char option) {
for (int i = 0; i < argc; i++) {
size_t len = strlen(argv[i]);
if (len >= 2 && argv[i][0] == '-' && argv[i][1] == option) {
if (len > 2 && argv[i][2] == '=') {
return argv[i] + 3;
} else if (i + 1 < argc) {
return argv[i + 1];
}
}
}
return NULL;
}

struct args_t parse_args(int argc, char** argv) {
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0) {
return (struct args_t){.help_requested = 1};
}
}
char* arg = getopt(argc, argv, 's');
unsigned int size = DEFAULT_PKT_SIZE;
if (arg) {
size = atoi(arg);
}
arg = getopt(argc, argv, 'n');
unsigned int number_of_pings = DEFAULT_PING_NB;
if (arg) {
number_of_pings = atoi(arg);
}
arg = getopt(argc, argv, 'w');
unsigned int warmup_ms = DEFAULT_WARMUP_MS;
if (arg) {
warmup_ms = atoi(arg);
}
return (struct args_t){.help_requested = 0,
.size = size,
.number_of_pings = number_of_pings,
.warmup_ms = warmup_ms,
.config_path = getopt(argc, argv, 'c')};
}
Loading

0 comments on commit 1f99ad7

Please sign in to comment.