From 2a767bcbed775cc28d6c393739231119dc807f30 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Mon, 29 Apr 2024 11:50:41 +0200 Subject: [PATCH] Add query/reply attachment (#403) * fix: add and call function to drop encoded attachments * feat: add n arg for z_queryable example * feat: add attachment to queries * feat: add query attachment to examples * feat: add attachment to query replies * feat: add attachment to query reply examples * fix: compile error when attachment deactivated * test: add attachment support in modularity test * build: add a ci stage for no attachment case * feat: drop sample attachment with z_sample_drop * fix: init dst in z_bytes_copy * fix: allocate attachment z_bytes to avoid going out of scope * chore: auto format python script * fix: remove unused function * feat: add query attachment accessor * fix: free attachment when query is dropped * fix: drop sample attachment automatically * fix: set attachment_drop as a private function * fix: replace private function _z_bytes_wrap * fix: compilation error when attachment is off --- .github/workflows/build-check.yaml | 37 +++- examples/unix/c11/z_get.c | 23 +++ examples/unix/c11/z_put.c | 2 +- examples/unix/c11/z_queryable.c | 44 ++++- include/zenoh-pico/api/primitives.h | 18 +- include/zenoh-pico/api/types.h | 6 +- include/zenoh-pico/net/primitives.h | 4 +- include/zenoh-pico/net/query.h | 5 +- include/zenoh-pico/protocol/core.h | 23 ++- .../zenoh-pico/protocol/definitions/message.h | 2 - include/zenoh-pico/session/queryable.h | 3 +- src/api/api.c | 14 +- src/collections/bytes.c | 2 +- src/net/memory.c | 3 + src/net/primitives.c | 8 +- src/net/query.c | 11 +- src/protocol/core.c | 33 +++- src/session/query.c | 3 + src/session/queryable.c | 5 +- src/session/rx.c | 3 +- src/session/subscription.c | 4 +- tests/modularity.py | 182 +++++++++++------- 22 files changed, 318 insertions(+), 117 deletions(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 7785bd8b4..e67242d4b 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -103,7 +103,7 @@ jobs: run: | sudo apt install -y ninja-build CMAKE_GENERATOR=Ninja make - python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY + python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY --attachment 1 timeout-minutes: 5 env: Z_FEATURE_PUBLICATION: ${{ matrix.feature_publication }} @@ -202,3 +202,38 @@ jobs: - name: Kill Zenoh router if: always() run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }} + + no_attachment_test: + needs: zenoh_build + name: Build and test without attachment on ubuntu-latest + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Download Zenoh artifacts + uses: actions/download-artifact@v4 + with: + name: ${{ needs.zenoh_build.outputs.artifact-name }} + + - name: Unzip Zenoh artifacts + run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone + + - id: run-zenoh + name: Run Zenoh router + run: | + RUST_LOG=debug ./zenoh-standalone/zenohd & + echo "zenohd-pid=$!" >> $GITHUB_OUTPUT + + - name: Build project and run test + run: | + sudo apt install -y ninja-build + CMAKE_GENERATOR=Ninja make + python3 ./build/tests/modularity.py --pub 1 --sub 1 --queryable 1 --query 1 --attachment 0 + timeout-minutes: 5 + env: + Z_FEATURE_ATTACHMENT: 0 + + - name: Kill Zenoh router + if: always() + run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }} diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index f98426a15..d6c4f8ead 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -29,12 +29,26 @@ void reply_dropper(void *ctx) { z_condvar_free(&cond); } +#if Z_FEATURE_ATTACHMENT == 1 +int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) { + (void)ctx; + printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start); + return 0; +} +#endif + void reply_handler(z_owned_reply_t *reply, void *ctx) { (void)(ctx); if (z_reply_is_ok(reply)) { z_sample_t sample = z_reply_ok(reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); +#if Z_FEATURE_ATTACHMENT == 1 + if (z_attachment_check(&sample.attachment)) { + printf("Attachement found\n"); + z_attachment_iterate(sample.attachment, attachment_handler, NULL); + } +#endif z_drop(z_move(keystr)); } else { printf(">> Received an error\n"); @@ -116,6 +130,11 @@ int main(int argc, char **argv) { if (value != NULL) { opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); } +#if Z_FEATURE_ATTACHMENT == 1 + z_owned_bytes_map_t map = z_bytes_map_new(); + z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there")); + opts.attachment = z_bytes_map_as_attachment(&map); +#endif z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { printf("Unable to send query.\n"); @@ -130,6 +149,10 @@ int main(int argc, char **argv) { z_close(z_move(s)); +#if Z_FEATURE_ATTACHMENT == 1 + z_bytes_map_drop(&map); +#endif + return 0; } #else diff --git a/examples/unix/c11/z_put.c b/examples/unix/c11/z_put.c index 19c6b051a..ecf715104 100644 --- a/examples/unix/c11/z_put.c +++ b/examples/unix/c11/z_put.c @@ -99,7 +99,7 @@ int main(int argc, char **argv) { options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); #if Z_FEATURE_ATTACHMENT == 1 z_owned_bytes_map_t map = z_bytes_map_new(); - z_bytes_map_insert_by_alias(&map, _z_bytes_wrap((uint8_t *)"hi", 2), _z_bytes_wrap((uint8_t *)"there", 5)); + z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there")); options.attachment = z_bytes_map_as_attachment(&map); #endif if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options) < 0) { diff --git a/examples/unix/c11/z_queryable.c b/examples/unix/c11/z_queryable.c index 47001ec1f..d9a794438 100644 --- a/examples/unix/c11/z_queryable.c +++ b/examples/unix/c11/z_queryable.c @@ -21,29 +21,60 @@ #if Z_FEATURE_QUERYABLE == 1 const char *keyexpr = "demo/example/zenoh-pico-queryable"; const char *value = "Queryable from Pico!"; +static int msg_nb = 0; + +#if Z_FEATURE_ATTACHMENT == 1 +int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) { + (void)ctx; + printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start); + return 0; +} +#endif void query_handler(const z_query_t *query, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); z_bytes_t pred = z_query_parameters(query); z_value_t payload_value = z_query_value(query); - printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); + printf(">> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); if (payload_value.payload.len > 0) { printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start); } +#if Z_FEATURE_ATTACHMENT == 1 + z_attachment_t attachment = z_query_attachment(query); + if (z_attachment_check(&attachment)) { + printf("Attachement found\n"); + z_attachment_iterate(attachment, attachment_handler, NULL); + } +#endif + z_query_reply_options_t options = z_query_reply_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + +#if Z_FEATURE_ATTACHMENT == 1 + // Add attachment + z_owned_bytes_map_t map = z_bytes_map_new(); + z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hello"), z_bytes_from_str("world")); + options.attachment = z_bytes_map_as_attachment(&map); +#endif + z_query_reply(query, z_keyexpr(keyexpr), (const unsigned char *)value, strlen(value), &options); z_drop(z_move(keystr)); + msg_nb++; + +#if Z_FEATURE_ATTACHMENT == 1 + z_bytes_map_drop(&map); +#endif } int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; + int n = 0; int opt; - while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) { + while ((opt = getopt(argc, argv, "k:e:m:v:l:n:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -60,8 +91,12 @@ int main(int argc, char **argv) { case 'v': value = optarg; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' || + optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -111,6 +146,9 @@ int main(int argc, char **argv) { printf("Press CTRL-C to quit...\n"); while (1) { + if ((n != 0) && (msg_nb >= n)) { + break; + } sleep(1); } diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index b9ebafdcd..51ee18c2d 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -519,13 +519,27 @@ z_bytes_t z_query_parameters(const z_query_t *query); * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. * * Parameters: - * query: Pointer to the query to get the value selector from. + * query: Pointer to the query to get the payload from. * * Returns: - * Returns the payload value wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation. + * Returns the payload wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation. */ z_value_t z_query_value(const z_query_t *query); +#if Z_FEATURE_ATTACHMENT == 1 +/** + * Get a query's attachment value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * query: Pointer to the query to get the attachment from. + * + * Returns: + * Returns the attachment wrapped as a :c:type:`z_attachment_t`, since attachment is a user-defined representation. + */ +z_attachment_t z_query_attachment(const z_query_t *query); +#endif + /** * Get a query's key by aliasing it. * diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index d0d72f7f6..dd58ea809 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -264,9 +264,7 @@ typedef struct { */ typedef struct { z_encoding_t encoding; -#if Z_FEATURE_ATTACHMENT == 1 - // TODO:ATT z_attachment_t attachment; -#endif + z_attachment_t attachment; } z_query_reply_options_t; /** @@ -337,7 +335,7 @@ typedef struct { z_query_target_t target; uint32_t timeout_ms; #if Z_FEATURE_ATTACHMENT == 1 -// TODO:ATT z_attachment_t attachment; + z_attachment_t attachment; #endif } z_get_options_t; diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 7000044e1..b0f74a1f4 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -199,9 +199,11 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle); * query: The query to reply to. The caller keeps its ownership. * key: The resource key of this reply. The caller keeps the ownership. * payload: The value of this reply, the caller keeps ownership. + * kind: The type of operation. + * att: The optional attachment to the sample. */ int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload, - const z_sample_kind_t kind); + const z_sample_kind_t kind, z_attachment_t att); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 0dafb9f4d..8a6da3fa2 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -28,6 +28,9 @@ typedef struct _z_query_t { _z_keyexpr_t _key; uint32_t _request_id; _z_session_t *_zn; +#if Z_FEATURE_ATTACHMENT == 1 + z_attachment_t attachment; +#endif char *_parameters; _Bool _anyke; } _z_query_t; @@ -52,7 +55,7 @@ typedef struct { #if Z_FEATURE_QUERYABLE == 1 _z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters, - _z_session_t *zn, uint32_t request_id); + _z_session_t *zn, uint32_t request_id, z_attachment_t att); void _z_queryable_clear(_z_queryable_t *qbl); void _z_queryable_free(_z_queryable_t **qbl); #endif diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 2d8e8ebdb..133c30acc 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -71,7 +71,6 @@ typedef struct { uint64_t time; } _z_timestamp_t; -#if Z_FEATURE_ATTACHMENT == 1 /** * The body of a loop over an attachment's key-value pairs. * @@ -110,11 +109,6 @@ typedef struct z_attachment_t { z_attachment_iter_driver_t iteration_driver; } z_attachment_t; -z_attachment_t z_attachment_null(void); -_Bool z_attachment_check(const z_attachment_t *attachment); -int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx); -_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key); - typedef struct { union { z_attachment_t decoded; @@ -122,12 +116,25 @@ typedef struct { } body; _Bool is_encoded; } _z_owned_encoded_attachment_t; + +z_attachment_t z_attachment_null(void); +z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att); + +#if Z_FEATURE_ATTACHMENT == 1 + +_Bool z_attachment_check(const z_attachment_t *attachment); +int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx); +_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key); + /** * Estimate the length of an attachment once encoded. */ size_t _z_attachment_estimate_length(z_attachment_t att); -z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att); -void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att); + +/** + * Drop an attachment that was decoded from a received message + */ +void _z_attachment_drop(z_attachment_t *att); #endif _z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp); diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index f1bfed6b5..4ff9faaca 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -101,9 +101,7 @@ typedef struct { _z_source_info_t _ext_info; _z_value_t _ext_value; z_consolidation_mode_t _consolidation; -#if Z_FEATURE_ATTACHMENT == 1 _z_owned_encoded_attachment_t _ext_attachment; -#endif } _z_msg_query_t; typedef struct { _Bool info; diff --git a/include/zenoh-pico/session/queryable.h b/include/zenoh-pico/session/queryable.h index b51a0c90d..dbce05646 100644 --- a/include/zenoh-pico/session/queryable.h +++ b/include/zenoh-pico/session/queryable.h @@ -28,7 +28,8 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons _z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key); _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q); -int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid); +int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid, + z_attachment_t att); void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q); void _z_flush_session_queryable(_z_session_t *zn); #endif diff --git a/src/api/api.c b/src/api/api.c index 6101a94d1..fa80589ec 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -298,6 +298,10 @@ z_bytes_t z_query_parameters(const z_query_t *query) { z_value_t z_query_value(const z_query_t *query) { return query->_val._rc.in->val._value; } +#if Z_FEATURE_ATTACHMENT == 1 +z_attachment_t z_query_attachment(const z_query_t *query) { return query->_val._rc.in->val.attachment; } +#endif + z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_val._rc.in->val._key; } _Bool z_value_is_initialized(z_value_t *value) { @@ -834,7 +838,7 @@ z_get_options_t z_get_options_default(void) { .target = z_query_target_default(), .consolidation = z_query_consolidation_default(), .value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()}, #if Z_FEATURE_ATTACHMENT == 1 - // TODO:ATT.attachment = z_attachment_null() + .attachment = z_attachment_null(), #endif .timeout_ms = Z_GET_TIMEOUT_DEFAULT }; @@ -864,6 +868,9 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne opt.consolidation = options->consolidation; opt.target = options->target; opt.value = options->value; +#if Z_FEATURE_ATTACHMENT == 1 + opt.attachment = options->attachment; +#endif } if (opt.consolidation.mode == Z_CONSOLIDATION_MODE_AUTO) { @@ -888,8 +895,7 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne __z_reply_handler, wrapped_ctx, callback->drop, ctx, opt.timeout_ms #if Z_FEATURE_ATTACHMENT == 1 , - z_attachment_null() - // TODO:ATT opt.attachment + opt.attachment #endif ); return ret; @@ -969,7 +975,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui .len = payload_len, }, .encoding = {.id = opts.encoding.id, .schema = opts.encoding.schema}}; - return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT); + return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT, opts.attachment); return _Z_ERR_GENERIC; } #endif diff --git a/src/collections/bytes.c b/src/collections/bytes.c index 40e67bd36..82ec1903e 100644 --- a/src/collections/bytes.c +++ b/src/collections/bytes.c @@ -94,7 +94,7 @@ void _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src) { } _z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src) { - _z_bytes_t dst; + _z_bytes_t dst = _z_bytes_empty(); _z_bytes_copy(&dst, src); return dst; } diff --git a/src/net/memory.c b/src/net/memory.c index 6db3ef506..553d47999 100644 --- a/src/net/memory.c +++ b/src/net/memory.c @@ -36,6 +36,9 @@ void _z_sample_clear(_z_sample_t *sample) { _z_bytes_clear(&sample->payload); _z_bytes_clear(&sample->encoding.schema); // FIXME: call the z_encoding_clear _z_timestamp_clear(&sample->timestamp); +#if Z_FEATURE_ATTACHMENT == 1 + _z_attachment_drop(&sample->attachment); +#endif } void _z_sample_free(_z_sample_t **sample) { diff --git a/src/net/primitives.c b/src/net/primitives.c index 4fa322980..dd66050e3 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -323,7 +323,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { } int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload, - const z_sample_kind_t kind) { + const z_sample_kind_t kind, z_attachment_t att) { int8_t ret = _Z_RES_OK; _z_keyexpr_t q_ke; @@ -360,10 +360,12 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val z_msg._body._response._body._reply._body._is_put = true; z_msg._body._response._body._reply._body._body._put._payload = payload.payload; z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding; - z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = z_attachment_null(); - z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false; z_msg._body._response._body._reply._body._body._put._commons._timestamp = _z_timestamp_null(); z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null(); +#if Z_FEATURE_ATTACHMENT == 1 + z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = att; + z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false; +#endif break; } if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { diff --git a/src/net/query.c b/src/net/query.c index 74b1ad88c..e2e73c7b4 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -27,12 +27,14 @@ void _z_query_clear(_z_query_t *q) { z_free(q->_parameters); _z_keyexpr_clear(&q->_key); _z_value_clear(&q->_value); - // Ideally free session rc if you have one +#if Z_FEATURE_ATTACHMENT == 1 + _z_attachment_drop(&q->attachment); +#endif } #if Z_FEATURE_QUERYABLE == 1 _z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters, - _z_session_t *zn, uint32_t request_id) { + _z_session_t *zn, uint32_t request_id, z_attachment_t att) { _z_query_t q; q._request_id = request_id; q._zn = zn; // Ideally would have been an rc @@ -40,6 +42,11 @@ _z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, con memcpy(q._parameters, parameters->start, parameters->len); q._parameters[parameters->len] = 0; q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; +#if Z_FEATURE_ATTACHMENT == 1 + q.attachment = att; +#else + _ZP_UNUSED(att); +#endif _z_keyexpr_copy(&q._key, key); _z_value_copy(&q._value, value); return q; diff --git a/src/protocol/core.c b/src/protocol/core.c index d8f3729f1..ada2c0c38 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -78,6 +78,8 @@ void _z_value_copy(_z_value_t *dst, const _z_value_t *src) { _z_bytes_copy(&dst->payload, &src->payload); } +z_attachment_t z_attachment_null(void) { return (z_attachment_t){.data = NULL, .iteration_driver = NULL}; } + #if Z_FEATURE_ATTACHMENT == 1 struct _z_seeker_t { _z_bytes_t key; @@ -125,19 +127,34 @@ int8_t _z_encoded_attachment_iteration_driver(const void *this_, z_attachment_it z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att) { if (att->is_encoded) { - return (z_attachment_t){.data = &att->body.encoded, .iteration_driver = _z_encoded_attachment_iteration_driver}; + // Recopy z_bytes data in allocated memory to avoid it going out of scope + z_bytes_t *att_data = (_z_bytes_t *)z_malloc(sizeof(_z_bytes_t)); + if (att_data == NULL) { + return att->body.decoded; + } + *att_data = att->body.encoded; + return (z_attachment_t){.data = att_data, .iteration_driver = _z_encoded_attachment_iteration_driver}; } else { return att->body.decoded; } } -void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att) { - if (att->is_encoded) { - _z_bytes_clear(&att->body.encoded); - } -} + _Bool z_attachment_check(const z_attachment_t *attachment) { return attachment->iteration_driver != NULL; } + int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx) { return this_.iteration_driver(this_.data, body, ctx); } -z_attachment_t z_attachment_null(void) { return (z_attachment_t){.data = NULL, .iteration_driver = NULL}; } -#endif \ No newline at end of file + +void _z_attachment_drop(z_attachment_t *att) { + if (att->iteration_driver == _z_encoded_attachment_iteration_driver) { + _z_bytes_clear((z_bytes_t *)att->data); + z_free((z_bytes_t *)att->data); + } +} + +#else // Z_FEATURE_ATTACHMENT == 1 +z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att) { + _ZP_UNUSED(att); + return z_attachment_null(); +} +#endif diff --git a/src/session/query.c b/src/session/query.c index d4cef44b4..fcfa7cc9f 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -165,6 +165,9 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons _z_bytes_copy(&reply.data.sample.encoding.schema, &msg->_encoding.schema); reply.data.sample.kind = Z_SAMPLE_KIND_PUT; reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp); +#if Z_FEATURE_ATTACHMENT == 1 + reply.data.sample.attachment = _z_encoded_as_attachment(&msg->_attachment); +#endif // Verify if this is a newer reply, free the old one in case it is if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) || diff --git a/src/session/queryable.c b/src/session/queryable.c index fc587cccf..771dcfa3c 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -132,7 +132,8 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se return ret; } -int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid) { +int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid, + z_attachment_t att) { int8_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -145,7 +146,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const // Build the z_query z_query_t query = {._val = {._rc = _z_query_rc_new()}}; - query._val._rc.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid); + query._val._rc.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, att); // Parse session_queryable list _z_session_queryable_rc_list_t *xs = qles; while (xs != NULL) { diff --git a/src/session/rx.c b/src/session/rx.c index 629437e44..1ce925afa 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -95,7 +95,8 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_REQUEST_QUERY: { #if Z_FEATURE_QUERYABLE == 1 _z_msg_query_t *query = &req._body._query; - ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid); + z_attachment_t att = _z_encoded_as_attachment(&req._body._query._ext_attachment); + ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid, att); #else _Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported"); #endif diff --git a/src/session/subscription.c b/src/session/subscription.c index b9faf89f3..28670ba29 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -192,7 +192,9 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co sub->in->val._callback(&s, sub->in->val._arg); xs = _z_subscription_rc_list_tail(xs); } - +#if Z_FEEATURE_ATTACHMENT == 1 + _z_attachment_drop(&s.attachment); +#endif _z_keyexpr_clear(&key); _z_subscription_rc_list_free(&subs); } else { diff --git a/tests/modularity.py b/tests/modularity.py index 000b6b90a..0567ee5c6 100644 --- a/tests/modularity.py +++ b/tests/modularity.py @@ -4,10 +4,23 @@ import subprocess import sys import time +import difflib # Specify the directory for the binaries DIR_EXAMPLES = "build/examples" + +def print_string_diff(str_a, str_b): + for i, s in enumerate(difflib.ndiff(str_a, str_b)): + if s[0] == " ": + continue + elif s[0] == "-": + print('Delete "{}" from position {}'.format(s[-1], i)) + elif s[0] == "+": + print('Add "{}" to position {}'.format(s[-1], i)) + print() + + def pub_and_sub(args): print("*** Pub & sub test ***") test_status = 0 @@ -15,7 +28,7 @@ def pub_and_sub(args): # Expected z_pub output & status if args.pub == 1: z_pub_expected_status = 0 - z_pub_expected_output = '''Opening session... + z_pub_expected_output = """Opening session... Declaring publisher for 'demo/example/zenoh-pico-pub'... Press CTRL-C to quit... Putting Data ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!')... @@ -27,17 +40,16 @@ def pub_and_sub(args): Putting Data ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!')... Putting Data ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!')... Putting Data ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')...''' - else : +Putting Data ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')...""" + else: z_pub_expected_status = 254 - z_pub_expected_output = ("ERROR: Zenoh pico was compiled without " - "Z_FEATURE_PUBLICATION but this example requires it.") + z_pub_expected_output = "ERROR: Zenoh pico was compiled without " "Z_FEATURE_PUBLICATION but this example requires it." # Expected z_sub output & status if args.sub == 1: z_sub_expected_status = -2 if args.pub == 1: - z_sub_expected_output = '''Opening session... + z_sub_expected_output = """Opening session... Declaring Subscriber on 'demo/example/**'... Press CTRL-C to quit... >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!') @@ -49,26 +61,27 @@ def pub_and_sub(args): >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!') >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!') >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')''' +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')""" else: - z_sub_expected_output = '''Opening session... + z_sub_expected_output = """Opening session... Declaring Subscriber on 'demo/example/**'... -Press CTRL-C to quit...''' - else : +Press CTRL-C to quit...""" + else: z_sub_expected_status = 254 - z_sub_expected_output = ("ERROR: Zenoh pico was compiled without " - "Z_FEATURE_SUBSCRIPTION but this example requires it.") + z_sub_expected_output = "ERROR: Zenoh pico was compiled without " "Z_FEATURE_SUBSCRIPTION but this example requires it." print("Start subscriber") # Start z_sub in the background z_sub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub" - z_sub_process = subprocess.Popen(z_sub_command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - start_new_session=True, - text=True) + z_sub_process = subprocess.Popen( + z_sub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + text=True, + ) # Introduce a delay to ensure z_sub starts time.sleep(2) @@ -76,12 +89,14 @@ def pub_and_sub(args): print("Start publisher") # Start z_pub z_pub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub -n 10" - z_pub_process = subprocess.Popen(z_pub_command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) + z_pub_process = subprocess.Popen( + z_pub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) # Wait for z_pub to finish z_pub_process.wait() @@ -110,8 +125,8 @@ def pub_and_sub(args): print("z_pub output valid") else: print("z_pub output invalid:") - print(f"Expected: \"{z_pub_expected_output}\"") - print(f"Received: \"{z_pub_output}\"") + print(f'Expected: "{z_pub_expected_output}"') + print(f'Received: "{z_pub_output}"') test_status = 1 print("Check subscriber status & output") @@ -129,12 +144,13 @@ def pub_and_sub(args): print("z_sub output valid") else: print("z_sub output invalid:") - print(f"Expected: \"{z_sub_expected_output}\"") - print(f"Received: \"{z_sub_output}\"") + print(f'Expected: "{z_sub_expected_output}"') + print(f'Received: "{z_sub_output}"') test_status = 1 # Return value return test_status + def query_and_queryable(args): print("*** Query & queryable test ***") test_status = 0 @@ -143,46 +159,66 @@ def query_and_queryable(args): if args.query == 1: z_query_expected_status = 0 if args.queryable == 1: - z_query_expected_output = '''Opening session... + if args.attachment == 1: + z_query_expected_output = """Opening session... +Sending Query 'demo/example/**'... +>> Received ('demo/example/zenoh-pico-queryable': 'Queryable from Pico!') +Attachement found +>>> hello: world +>> Received query final notification""" + else: + z_query_expected_output = """Opening session... Sending Query 'demo/example/**'... >> Received ('demo/example/zenoh-pico-queryable': 'Queryable from Pico!') ->> Received query final notification''' +>> Received query final notification""" else: - z_query_expected_output = '''Opening session... + z_query_expected_output = """Opening session... Sending Query 'demo/example/**'... ->> Received query final notification''' - else : +>> Received query final notification""" + else: z_query_expected_status = 254 - z_query_expected_output = ("ERROR: Zenoh pico was compiled without " - "Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires them.") + z_query_expected_output = ( + "ERROR: Zenoh pico was compiled without " "Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires them." + ) # Expected z_queryable output & status if args.queryable == 1: z_queryable_expected_status = -2 if args.query == 1: - z_queryable_expected_output = '''Opening session... + if args.attachment == 1: + z_queryable_expected_output = """Opening session... +Creating Queryable on 'demo/example/zenoh-pico-queryable'... +Press CTRL-C to quit... +>> [Queryable handler] Received Query 'demo/example/**?' +Attachement found +>>> hi: there +""" + else: + z_queryable_expected_output = """Opening session... Creating Queryable on 'demo/example/zenoh-pico-queryable'... Press CTRL-C to quit... - >> [Queryable handler] Received Query 'demo/example/**?''' +>> [Queryable handler] Received Query 'demo/example/**?' +""" else: - z_queryable_expected_output = '''Opening session... + z_queryable_expected_output = """Opening session... Creating Queryable on 'demo/example/zenoh-pico-queryable'... -Press CTRL-C to quit...''' - else : +Press CTRL-C to quit...""" + else: z_queryable_expected_status = 254 - z_queryable_expected_output = ("ERROR: Zenoh pico was compiled without " - "Z_FEATURE_QUERYABLE but this example requires it.") + z_queryable_expected_output = "ERROR: Zenoh pico was compiled without " "Z_FEATURE_QUERYABLE but this example requires it." print("Start queryable") # Start z_queryable in the background z_queryable_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_queryable" - z_queryable_process = subprocess.Popen(z_queryable_command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - start_new_session=True, - text=True) + z_queryable_process = subprocess.Popen( + z_queryable_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + text=True, + ) # Introduce a delay to ensure z_queryable starts time.sleep(2) @@ -190,12 +226,14 @@ def query_and_queryable(args): print("Start query") # Start z_query z_query_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_get" - z_query_process = subprocess.Popen(z_query_command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) + z_query_process = subprocess.Popen( + z_query_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) # Wait for z_query to finish z_query_process.wait() @@ -215,8 +253,7 @@ def query_and_queryable(args): if z_query_status == z_query_expected_status: print("z_query status valid") else: - print(f"z_query status invalid, expected: {z_query_expected_status}," - f" received: {z_query_status}") + print(f"z_query status invalid, expected: {z_query_expected_status}," f" received: {z_query_status}") test_status = 1 # Check output of z_query @@ -225,8 +262,8 @@ def query_and_queryable(args): print("z_query output valid") else: print("z_query output invalid:") - print(f"Expected: \"{z_query_expected_output}\"") - print(f"Received: \"{z_query_output}\"") + print(f'Expected: "{z_query_expected_output}"') + print(f'Received: "{z_query_output}"') test_status = 1 print("Check queryable status & output") @@ -235,8 +272,7 @@ def query_and_queryable(args): if z_queryable_status == z_queryable_expected_status: print("z_queryable status valid") else: - print(f"z_queryable status invalid, expected: {z_queryable_expected_status}," - f" received: {z_queryable_status}") + print(f"z_queryable status invalid, expected: {z_queryable_expected_status}," f" received: {z_queryable_status}") test_status = 1 # Check output of z_queryable @@ -245,25 +281,29 @@ def query_and_queryable(args): print("z_queryable output valid") else: print("z_queryable output invalid:") - print(f"Expected: \"{z_queryable_expected_output}\"") - print(f"Received: \"{z_queryable_output}\"") + print(f'Expected: "{z_queryable_expected_output}"') + print(f'Received: "{z_queryable_output}"') + print_string_diff(z_queryable_expected_output, z_queryable_output) test_status = 1 # Return status return test_status + if __name__ == "__main__": - parser = argparse.ArgumentParser(description="This script runs zenoh-pico examples" - " and checks them according to the given configuration") + parser = argparse.ArgumentParser( + description="This script runs zenoh-pico examples" " and checks them according to the given configuration" + ) parser.add_argument("--pub", type=int, choices=[0, 1], help="Z_FEATURE_PUBLICATION (0 or 1)") parser.add_argument("--sub", type=int, choices=[0, 1], help="Z_FEATURE_SUBSCRIPTION (0 or 1)") - parser.add_argument("--queryable", type=int, choices=[0, 1], - help="Z_FEATURE_QUERYABLE (0 or 1)") + parser.add_argument("--queryable", type=int, choices=[0, 1], help="Z_FEATURE_QUERYABLE (0 or 1)") parser.add_argument("--query", type=int, choices=[0, 1], help="Z_FEATURE_QUERY (0 or 1)") - + parser.add_argument("--attachment", type=int, choices=[0, 1], help="Z_FEATURE_ATTACHMENT (0 or 1)") EXIT_STATUS = 0 prog_args = parser.parse_args() - print(f"Args value, pub:{prog_args.pub}, sub:{prog_args.sub}, " - f"queryable:{prog_args.queryable}, query:{prog_args.query}") + print( + f"Args value, pub:{prog_args.pub}, sub:{prog_args.sub}, " + f"queryable:{prog_args.queryable}, query:{prog_args.query}, attachment:{prog_args.attachment}" + ) # Test pub and sub examples if pub_and_sub(prog_args) == 1: