Skip to content

Commit

Permalink
Remove pull subscriber. Todo: add ring handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 28, 2024
1 parent db9733e commit 415dd62
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 562 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ serde_yaml = "0.9.19"

[lib]
path="src/lib.rs"
name = "zenohc"
name = "zenohcd"
crate-type = ["cdylib", "staticlib"]
doctest = false

Expand Down
7 changes: 0 additions & 7 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ Types

.. autocstruct:: zenoh_concrete.h::z_owned_subscriber_t

.. autocstruct:: zenoh_concrete.h::z_owned_pull_subscriber_t

.. autocstruct:: zenoh_commons.h::z_owned_closure_sample_t

.. autocenum:: zenoh_commons.h::z_reliability_t
Expand All @@ -219,11 +217,6 @@ Functions
.. autocfunction:: zenoh_commons.h::z_subscriber_check
.. autocfunction:: zenoh_commons.h::z_undeclare_subscriber

.. autocfunction:: zenoh_commons.h::z_declare_pull_subscriber
.. autocfunction:: zenoh_commons.h::z_subscriber_pull
.. autocfunction:: zenoh_commons.h::z_pull_subscriber_check
.. autocfunction:: zenoh_commons.h::z_undeclare_pull_subscriber

.. autocfunction:: zenoh_commons.h::z_closure_sample_call
.. autocfunction:: zenoh_commons.h::z_closure_sample_drop

Expand Down
114 changes: 63 additions & 51 deletions examples/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,73 +11,85 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include "zenoh.h"

const char *kind_to_str(z_sample_kind_t kind);

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr),
(int)sample->payload.len, sample->payload.start);
z_drop(z_move(keystr));
}
#include <stdlib.h>
#include <unistd.h>
#include <zenoh.h>

int main(int argc, char **argv) {
char *expr = "demo/example/**";
if (argc > 1) {
expr = argv[1];
const char *keyexpr = "demo/example/**";
char *locator = NULL;
size_t interval = 5000;
size_t size = 3;

int opt;
while ((opt = getopt(argc, argv, "k:e:i:s:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
locator = optarg;
break;
case 'i':
interval = (size_t)atoi(optarg);
break;
case 's':
size = (size_t)atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'i' || optopt == 's') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
if (locator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(locator));
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
return -1;
}

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", expr);
z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
printf("Pull functionality not implemented!\n");
// @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new
// printf("Declaring Subscriber on '%s'...\n", keyexpr);
// z_owned_sample_channel_t channel = z_sample_channel_ring_new(size);
// z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return -1;
// }

printf("Press <enter> to pull data...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
z_sleep_s(1);
} else {
z_subscriber_pull(z_loan(sub));
}
}
// printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
// z_owned_sample_t sample = z_sample_null();
// while (true) {
// for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
// z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
// printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
// sample.payload.start);
// z_drop(z_move(keystr));
// z_drop(z_move(sample));
// }
// printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval);
// zp_sleep_ms(interval);
// }

// z_undeclare_subscriber(z_move(sub));

z_undeclare_pull_subscriber(z_move(sub));
z_close(z_move(s));

return 0;
}

const char *kind_to_str(z_sample_kind_t kind) {
switch (kind) {
case Z_SAMPLE_KIND_PUT:
return "PUT";
case Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}
83 changes: 1 addition & 82 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,16 +591,6 @@ typedef struct z_publisher_options_t {
enum z_congestion_control_t congestion_control;
enum z_priority_t priority;
} z_publisher_options_t;
/**
* Represents the set of options that can be applied to a pull subscriber,
* upon its declaration via :c:func:`z_declare_pull_subscriber`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct z_pull_subscriber_options_t {
enum z_reliability_t reliability;
} z_pull_subscriber_options_t;
/**
* Options passed to the :c:func:`z_declare_queryable` function.
*
Expand All @@ -611,7 +601,7 @@ typedef struct z_queryable_options_t {
bool complete;
} z_queryable_options_t;
/**
* Options passed to the :c:func:`z_declare_subscriber` or :c:func:`z_declare_pull_subscriber` function.
* Options passed to the :c:func:`z_declare_subscriber` function.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
Expand Down Expand Up @@ -736,9 +726,6 @@ typedef struct z_publisher_put_options_t {
struct z_encoding_t encoding;
struct z_attachment_t attachment;
} z_publisher_put_options_t;
typedef struct z_pull_subscriber_t {
const struct z_owned_pull_subscriber_t *_0;
} z_pull_subscriber_t;
/**
* Options passed to the :c:func:`z_put` function.
*
Expand Down Expand Up @@ -1363,44 +1350,6 @@ ZENOHC_API
struct z_owned_publisher_t z_declare_publisher(struct z_session_t session,
struct z_keyexpr_t keyexpr,
const struct z_publisher_options_t *options);
/**
* Declares a pull subscriber for a given key expression.
*
* Parameters:
* session: The zenoh session.
* keyexpr: The key expression to subscribe.
* callback: The callback function that will be called each time a data matching the subscribed expression is received.
* opts: additional options for the pull subscriber.
*
* Returns:
* A :c:type:`z_owned_subscriber_t`.
*
* To check if the subscription succeeded and if the pull subscriber is still valid,
* you may use `z_pull_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*
* Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* Example:
* Declaring a subscriber passing ``NULL`` for the options:
*
* .. code-block:: C
*
* z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL);
*
* is equivalent to initializing and passing the default subscriber options:
*
* .. code-block:: C
*
* z_subscriber_options_t opts = z_subscriber_options_default();
* z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
*/
ZENOHC_API
struct z_owned_pull_subscriber_t z_declare_pull_subscriber(struct z_session_t session,
struct z_keyexpr_t keyexpr,
struct z_owned_closure_sample_t *callback,
const struct z_pull_subscriber_options_t *opts);
/**
* Creates a Queryable for the given key expression.
*
Expand Down Expand Up @@ -1778,23 +1727,6 @@ int8_t z_publisher_put(struct z_publisher_t publisher,
* Constructs the default value for :c:type:`z_publisher_put_options_t`.
*/
ZENOHC_API struct z_publisher_put_options_t z_publisher_put_options_default(void);
/**
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API bool z_pull_subscriber_check(const struct z_owned_pull_subscriber_t *sub);
/**
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API
struct z_pull_subscriber_t z_pull_subscriber_loan(const struct z_owned_pull_subscriber_t *sub);
/**
* Constructs a null safe-to-drop value of 'z_owned_pull_subscriber_t' type
*/
ZENOHC_API struct z_owned_pull_subscriber_t z_pull_subscriber_null(void);
/**
* Constructs the default value for :c:type:`z_pull_subscriber_options_t`.
*/
ZENOHC_API struct z_pull_subscriber_options_t z_pull_subscriber_options_default(void);
/**
* Put data.
*
Expand Down Expand Up @@ -2128,14 +2060,6 @@ ZENOHC_API struct z_owned_subscriber_t z_subscriber_null(void);
* Constructs the default value for :c:type:`z_subscriber_options_t`.
*/
ZENOHC_API struct z_subscriber_options_t z_subscriber_options_default(void);
/**
* Pull data for :c:type:`z_owned_pull_subscriber_t`. The pulled data will be provided
* by calling the **callback** function provided to the :c:func:`z_declare_subscriber` function.
*
* Parameters:
* sub: The :c:type:`z_owned_pull_subscriber_t` to pull from.
*/
ZENOHC_API int8_t z_subscriber_pull(struct z_pull_subscriber_t sub);
ZENOHC_API
int8_t z_task_init(struct z_task_t *task,
const struct z_task_attr_t *_attr,
Expand All @@ -2160,11 +2084,6 @@ ZENOHC_API int8_t z_undeclare_keyexpr(struct z_session_t session, struct z_owned
*/
ZENOHC_API
int8_t z_undeclare_publisher(struct z_owned_publisher_t *publisher);
/**
* Undeclares the given :c:type:`z_owned_pull_subscriber_t`, droping it and invalidating it for double-drop safety.
*/
ZENOHC_API
int8_t z_undeclare_pull_subscriber(struct z_owned_pull_subscriber_t *sub);
/**
* Undeclares a `z_owned_queryable_t`, droping it and invalidating it for doube-drop safety.
*
Expand Down
22 changes: 0 additions & 22 deletions include/zenoh_concrete.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,6 @@ typedef struct z_query_t {
typedef struct z_session_t {
size_t _0;
} z_session_t;
/**
* An owned zenoh pull subscriber. Destroying the subscriber cancels the subscription.
*
* Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`.
* The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`.
*
* Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*/
#if !defined(TARGET_ARCH_ARM)
typedef struct ALIGN(8) z_owned_pull_subscriber_t {
uint64_t _0[1];
} z_owned_pull_subscriber_t;
#endif
#if defined(TARGET_ARCH_ARM)
typedef struct ALIGN(4) z_owned_pull_subscriber_t {
uint32_t _0[1];
} z_owned_pull_subscriber_t;
#endif
/**
* An owned zenoh queryable.
*
Expand Down
Loading

0 comments on commit 415dd62

Please sign in to comment.