diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index be3e52d7e..6339ef357 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -30,7 +30,7 @@ jobs: - name: Build & run tests run: | sudo apt install -y ninja-build - CMAKE_GENERATOR=Ninja make test + CMAKE_GENERATOR=Ninja ASAN=ON make test check_format: name: Check codebase format with clang-format @@ -272,7 +272,7 @@ jobs: - name: Build project and run test run: | sudo apt install -y ninja-build - CMAKE_GENERATOR=Ninja make + Z_FEATURE_UNSTABLE_API=1 Z_FEATURE_LIVELINESS=1 CMAKE_GENERATOR=Ninja make python3 ./build/tests/memory_leak.py timeout-minutes: 5 @@ -290,6 +290,6 @@ jobs: - name: Build & test pico run: | sudo apt install -y ninja-build - CMAKE_GENERATOR=Ninja make + CMAKE_GENERATOR=Ninja ASAN=ON make python3 ./build/tests/no_router.py timeout-minutes: 5 diff --git a/.github/workflows/cpp-check.yaml b/.github/workflows/cpp-check.yaml new file mode 100644 index 000000000..1e5a57e41 --- /dev/null +++ b/.github/workflows/cpp-check.yaml @@ -0,0 +1,78 @@ +# +# Copyright (c) 2024 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale zenoh Team, +# +name: cpp-check + +on: + push: + branches: + - main + pull_request: + branches: + - main + workflow_dispatch: + inputs: + zenoh_cpp_branch: + description: 'Branch of zenoh-cpp to use' + required: false + default: 'main' + +jobs: + build-and-test: + name: Build and test zenoh-cpp on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macOS-latest, windows-latest] + unstable: [0, 1] + + steps: + - name: checkout zenoh-pico + uses: actions/checkout@v3 + + - name: build zenoh-pico + run: | + mkdir build && cd build + cmake .. -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=~/local -DZ_FEATURE_UNSTABLE_API=${{ matrix.unstable }} -DZ_FEATURE_LIVELINESS=1 -DASAN=ON + cmake --build . --target install --config Release + + - name: clone zenoh-cpp + run: | + git clone https://github.com/eclipse-zenoh/zenoh-cpp.git + cd zenoh-cpp + git fetch --all + git checkout ${{ github.event.inputs.zenoh_cpp_branch || 'main' }} + git submodule update --init --recursive + + - name: build zenoh-cpp + run: | + cd zenoh-cpp + mkdir build && cd build + cmake .. -DCMAKE_INSTALL_PREFIX=~/local -DCMAKE_BUILD_TYPE=Release -DZENOHCXX_ZENOHPICO=ON -DZENOHCXX_ZENOHC=OFF + cmake --build . --config Release + + - name: build examples + run: | + cd zenoh-cpp/build + cmake --build . --target examples --config Release + + - name: build tests + run: | + cd zenoh-cpp/build + cmake --build . --target tests --config Release + + - name: run tests + run: | + cd zenoh-cpp/build + ctest -C Release --output-on-failure diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e72186dbd..1caf68fe4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -174,8 +174,8 @@ jobs: version: ${{ needs.tag.outputs.version }} ssh-host: genie.zenoh@projects-storage.eclipse.org ssh-host-path: /home/data/httpd/download.eclipse.org/zenoh/zenoh-pico - ssh-private-key: ${{ secrets.ORG_GPG_PRIVATE_KEY }} - ssh-passphrase: ${{ secrets.ORG_GPG_PASSPHRASE }} + ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }} + ssh-passphrase: ${{ secrets.SSH_PASSPHRASE }} archive-patterns: '.*\.zip' github: diff --git a/CMakeLists.txt b/CMakeLists.txt index 669880d11..7a9fb6d10 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -97,7 +97,6 @@ else() add_compile_options(-Wpedantic) endif() # add_compile_options(-Wconversion) - # add_link_options(-fsanitize=address) elseif(MSVC) add_compile_options(/W4 /WX /Od /wd4127) elseif(CMAKE_SYSTEM_NAME MATCHES "Generic") @@ -216,7 +215,7 @@ set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature") set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") -set(Z_FEATURE_LIVELINESS 1 CACHE STRING "Toggle liveliness feature") +set(Z_FEATURE_LIVELINESS 0 CACHE STRING "Toggle liveliness feature") set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interests") set(Z_FEATURE_FRAGMENTATION 1 CACHE STRING "Toggle fragmentation") set(Z_FEATURE_ENCODING_VALUES 1 CACHE STRING "Toggle encoding values") @@ -235,6 +234,12 @@ set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport") set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport") set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY") +# Add a warning message if someone tries to enable Z_FEATURE_LIVELINESS directly +if(Z_FEATURE_LIVELINESS AND NOT Z_FEATURE_UNSTABLE_API) + message(WARNING "Z_FEATURE_LIVELINESS can only be enabled when Z_FEATURE_UNSTABLE_API is also enabled. Disabling Z_FEATURE_LIVELINESS.") + set(Z_FEATURE_LIVELINESS 0 CACHE STRING "Toggle liveliness feature" FORCE) +endif() + add_compile_definitions("Z_BUILD_DEBUG=$") message(STATUS "Building with feature confing:\n\ * UNSTABLE_API: ${Z_FEATURE_UNSTABLE_API}\n\ @@ -373,12 +378,14 @@ option(BUILD_EXAMPLES "Use this to also build the examples." ON) option(BUILD_TOOLS "Use this to also build the tools." OFF) option(BUILD_TESTING "Use this to also build tests." ON) option(BUILD_INTEGRATION "Use this to also build integration tests." OFF) +option(ASAN "Enable AddressSanitizer." OFF) message(STATUS "Produce Debian and RPM packages: ${PACKAGING}") message(STATUS "Build examples: ${BUILD_EXAMPLES}") message(STATUS "Build tools: ${BUILD_TOOLS}") message(STATUS "Build tests: ${BUILD_TESTING}") message(STATUS "Build integration: ${BUILD_INTEGRATION}") +message(STATUS "AddressSanitizer: ${ASAN}") set(PICO_LIBS "") if(PICO_STATIC) @@ -447,6 +454,11 @@ if(BUILD_EXAMPLES) add_subdirectory(examples) endif() +if(ASAN) + add_compile_options(-fsanitize=address) + add_link_options(-fsanitize=address) +endif() + if(UNIX OR MSVC) if(BUILD_TOOLS) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/tools) diff --git a/GNUmakefile b/GNUmakefile index 0a254575c..12a97bf06 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -41,6 +41,10 @@ BUILD_TOOLS?=OFF # Accepted values: ON, OFF FORCE_C99?=OFF +# Enable AddressSanitizer. +# Accepted values: ON, OFF +ASAN?=OFF + # Debug level. This sets the ZENOH_DEBUG variable. # Accepted values: # 0: NONE @@ -82,7 +86,7 @@ CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAK -DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) -DZ_FEATURE_INTEREST=$(Z_FEATURE_INTEREST) -DZ_FEATURE_UNSTABLE_API=$(Z_FEATURE_UNSTABLE_API)\ -DZ_FEATURE_PUBLICATION=$(Z_FEATURE_PUBLICATION) -DZ_FEATURE_SUBSCRIPTION=$(Z_FEATURE_SUBSCRIPTION) -DZ_FEATURE_QUERY=$(Z_FEATURE_QUERY) -DZ_FEATURE_QUERYABLE=$(Z_FEATURE_QUERYABLE)\ -DZ_FEATURE_RAWETH_TRANSPORT=$(Z_FEATURE_RAWETH_TRANSPORT) -DFRAG_MAX_SIZE=$(FRAG_MAX_SIZE) -DBATCH_UNICAST_SIZE=$(BATCH_UNICAST_SIZE) -DBATCH_MULTICAST_SIZE=$(BATCH_MULTICAST_SIZE)\ - -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. + -DASAN=$(ASAN) -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. ifeq ($(FORCE_C99), ON) CMAKE_OPT += -DCMAKE_C_STANDARD=99 diff --git a/PackageConfig.cmake.in b/PackageConfig.cmake.in index 55339d35e..29d48905f 100644 --- a/PackageConfig.cmake.in +++ b/PackageConfig.cmake.in @@ -8,6 +8,7 @@ set(ZENOHPICO_FEATURE_QUERY @Z_FEATURE_QUERY@) set(ZENOHPICO_FEATURE_QUERYABLE @Z_FEATURE_QUERYABLE@) set(ZENOHPICO_FEATURE_RAWETH_TRANSPORT @Z_FEATURE_RAWETH_TRANSPORT@) set(ZENOHPICO_FEATURE_INTEREST @Z_FEATURE_INTEREST@) +set(ZENOHPICO_FEATURE_LIVELINESS @Z_FEATURE_LIVELINESS@) if(@CHECK_THREADS@) find_dependency(Threads REQUIRED) diff --git a/docs/api.rst b/docs/api.rst index 7271209eb..1a3893a0b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1289,7 +1289,7 @@ See details at :ref:`owned_types_concept` Functions --------- -.. autocfunction:: liveliness.h::z_liveliness_token_options_t_default +.. autocfunction:: liveliness.h::z_liveliness_token_options_default .. autocfunction:: liveliness.h::z_liveliness_declare_token .. autocfunction:: liveliness.h::z_liveliness_undeclare_token .. autocfunction:: liveliness.h::z_liveliness_subscriber_options_default diff --git a/examples/unix/c11/z_get_liveliness.c b/examples/unix/c11/z_get_liveliness.c index cefa20175..99bfc06d4 100644 --- a/examples/unix/c11/z_get_liveliness.c +++ b/examples/unix/c11/z_get_liveliness.c @@ -99,9 +99,10 @@ int main(int argc, char **argv) { } else { printf("Received an error\n"); } + z_drop(z_move(reply)); } - z_drop(z_move(reply)); + z_drop(z_move(closure)); z_drop(z_move(handler)); z_drop(z_move(s)); return 0; diff --git a/examples/unix/c11/z_sub_liveliness.c b/examples/unix/c11/z_sub_liveliness.c index a302d10a3..36fb02fda 100644 --- a/examples/unix/c11/z_sub_liveliness.c +++ b/examples/unix/c11/z_sub_liveliness.c @@ -21,6 +21,8 @@ #if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_LIVELINESS == 1 +static volatile int msg_nb = 0; + void data_handler(z_loaned_sample_t *sample, void *ctx) { (void)(ctx); z_view_string_t key_string; @@ -35,6 +37,7 @@ void data_handler(z_loaned_sample_t *sample, void *ctx) { z_string_data(z_loan(key_string))); break; } + msg_nb++; } int main(int argc, char **argv) { @@ -43,6 +46,7 @@ int main(int argc, char **argv) { char *clocator = NULL; char *llocator = NULL; bool history = false; + int n = 0; int opt; while ((opt = getopt(argc, argv, "k:e:m:l:n:h")) != -1) { @@ -62,8 +66,11 @@ int main(int argc, char **argv) { case 'h': history = true; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -116,6 +123,9 @@ int main(int argc, char **argv) { printf("Press CTRL-C to quit...\n"); while (1) { + if (n != 0 && msg_nb >= n) { + break; + } z_sleep_s(1); } diff --git a/include/zenoh-pico/api/liveliness.h b/include/zenoh-pico/api/liveliness.h index 43e3012fc..ee2ac9d2f 100644 --- a/include/zenoh-pico/api/liveliness.h +++ b/include/zenoh-pico/api/liveliness.h @@ -31,6 +31,7 @@ typedef struct { _z_session_weak_t _zn; } _z_liveliness_token_t; +_z_liveliness_token_t _z_liveliness_token_null(void); _Z_OWNED_TYPE_VALUE(_z_liveliness_token_t, liveliness_token) _Z_OWNED_FUNCTIONS_DEF(liveliness_token) @@ -48,7 +49,7 @@ typedef struct z_liveliness_token_options_t { /** * Constructs default value for :c:type:`z_liveliness_token_options_t`. */ -z_result_t z_liveliness_token_options_t_default(z_liveliness_token_options_t *options); +z_result_t z_liveliness_token_options_default(z_liveliness_token_options_t *options); /** * Constructs and declares a liveliness token on the network. diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 14c2d50cc..1578b1f50 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -27,7 +27,7 @@ #define Z_FEATURE_SUBSCRIPTION 1 #define Z_FEATURE_QUERY 1 #define Z_FEATURE_QUERYABLE 1 -#define Z_FEATURE_LIVELINESS 1 +#define Z_FEATURE_LIVELINESS 0 #define Z_FEATURE_RAWETH_TRANSPORT 0 #define Z_FEATURE_INTEREST 1 #define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0 diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 261c61fc6..1320de7d3 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -91,6 +91,16 @@ extern "C" { // Z Extensions if Z==1 then Zenoh extensions are present #define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5 +/*=============================*/ +/* Patch */ +/*=============================*/ +/// Used to negotiate the patch version of the protocol +/// if not present (or 0), then protocol as released with 1.0.0 +/// if >= 1, then fragmentation start/stop marker +#define _Z_NO_PATCH 0x00 +#define _Z_CURRENT_PATCH 0x01 +#define _Z_PATCH_HAS_FRAGMENT_MARKERS(patch) (patch >= 1) + /*=============================*/ /* Transport Messages */ /*=============================*/ @@ -235,6 +245,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_join_t; void _z_t_msg_join_clear(_z_t_msg_join_t *msg); @@ -315,6 +328,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_init_t; void _z_t_msg_init_clear(_z_t_msg_init_t *msg); @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; + bool first; + bool drop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); -#define _Z_FRAGMENT_HEADER_SIZE 12 - /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; @@ -514,9 +530,10 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool first, bool drop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last); + bool is_last, bool first, bool drop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index eb70ede42..a8afd0029 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -43,7 +43,11 @@ extern "C" { /*=============================*/ /* Extension IDs */ /*=============================*/ -// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID) +#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) +#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_FRAGMENT_FIRST (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_DROP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ @@ -58,6 +62,7 @@ extern "C" { #define _Z_MSG_EXT_FLAG_M 0x10 #define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0) #define _Z_MSG_EXT_FLAG_Z 0x80 +#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0) typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior diff --git a/include/zenoh-pico/system/platform/mbed.h b/include/zenoh-pico/system/platform/mbed.h index 87fdb4626..9d9d91aeb 100644 --- a/include/zenoh-pico/system/platform/mbed.h +++ b/include/zenoh-pico/system/platform/mbed.h @@ -33,7 +33,7 @@ typedef void *_z_mutex_t; // Workaround as MBED is a C++ library typedef void *_z_condvar_t; // Workaround as MBED is a C++ library #endif // Z_FEATURE_MULTI_THREAD == 1 -typedef void *z_clock_t; // Not defined +typedef struct timespec z_clock_t; typedef struct timeval z_time_t; typedef struct BufferedSerial BufferedSerial; // Forward declaration to be used in _z_sys_net_socket_t diff --git a/include/zenoh-pico/system/platform/zephyr.h b/include/zenoh-pico/system/platform/zephyr.h index 1f25e839c..addf67f31 100644 --- a/include/zenoh-pico/system/platform/zephyr.h +++ b/include/zenoh-pico/system/platform/zephyr.h @@ -19,10 +19,10 @@ #if KERNEL_VERSION_MAJOR == 2 #include -#elif KERNEL_VERSION_MAJOR == 3 +#elif KERNEL_VERSION_MAJOR == 3 || KERNEL_VERSION_MAJOR == 4 #include #else -#pragma "This Zephyr version might not be supported." +#pragma GCC warning "This Zephyr version might not be supported." #include #endif diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 05d22a89e..e70c8fc9e 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool first); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 1671786df..02c304ea6 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -47,6 +47,11 @@ typedef struct { uint16_t _peer_id; volatile bool _received; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_peer_entry_t; size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src); @@ -108,6 +113,11 @@ typedef struct { volatile bool _received; volatile bool _transmitted; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_unicast_t; typedef struct _z_transport_multicast_t { @@ -175,6 +185,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; bool _is_qos; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_transport_unicast_establish_param_t; typedef struct { diff --git a/include/zenoh-pico/transport/utils.h b/include/zenoh-pico/transport/utils.h index 62fa319b4..25862370b 100644 --- a/include/zenoh-pico/transport/utils.h +++ b/include/zenoh-pico/transport/utils.h @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits); _z_zint_t _z_sn_half(_z_zint_t sn); _z_zint_t _z_sn_modulo_mask(uint8_t bits); bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn); _z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn); diff --git a/src/api/liveliness.c b/src/api/liveliness.c index 4f4139396..7cc7f74b3 100644 --- a/src/api/liveliness.c +++ b/src/api/liveliness.c @@ -36,19 +36,27 @@ _z_liveliness_token_t _z_liveliness_token_null(void) { return s; } -void _z_liveliness_token_clear(_z_liveliness_token_t *token) { +z_result_t _z_liveliness_token_clear(_z_liveliness_token_t *token) { + z_result_t ret = _Z_RES_OK; + if (_Z_RC_IS_NULL(&token->_zn)) { + return ret; + } _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&token->_zn); if (!_Z_RC_IS_NULL(&sess_rc)) { - _z_undeclare_liveliness_token(token); + ret = _z_undeclare_liveliness_token(token); _z_session_rc_drop(&sess_rc); } + _z_session_weak_drop(&token->_zn); _z_keyexpr_clear(&token->_key); + *token = _z_liveliness_token_null(); + + return ret; } _Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_liveliness_token_t, liveliness_token, _z_liveliness_token_check, _z_liveliness_token_null, _z_liveliness_token_clear) -z_result_t z_liveliness_token_options_t_default(z_liveliness_token_options_t *options) { +z_result_t z_liveliness_token_options_default(z_liveliness_token_options_t *options) { options->__dummy = 0; return _Z_RES_OK; } @@ -63,7 +71,7 @@ z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_live } z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token) { - return _z_undeclare_liveliness_token(&token->_this._val); + return _z_liveliness_token_clear(&token->_this._val); } /**************** Liveliness Subscriber ****************/ @@ -132,7 +140,8 @@ z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr opt = *options; } - ret = _z_liveliness_query(_Z_RC_IN_VAL(zs), *keyexpr, callback->_this._val.call, callback->_this._val.drop, ctx, + _z_keyexpr_t ke = _z_keyexpr_duplicate(*keyexpr); + ret = _z_liveliness_query(_Z_RC_IN_VAL(zs), ke, callback->_this._val.call, callback->_this._val.drop, ctx, opt.timeout_ms); z_internal_closure_reply_null( diff --git a/src/collections/refcount.c b/src/collections/refcount.c index 608548fae..7a44ab4e6 100644 --- a/src/collections/refcount.c +++ b/src/collections/refcount.c @@ -198,7 +198,8 @@ bool _z_rc_decrease_strong(void** cnt) { if (_ZP_RC_OP_DECR_AND_CMP_STRONG(c, 1)) { return _z_rc_decrease_weak(cnt); } - return _z_rc_decrease_weak(cnt); + _z_rc_decrease_weak(cnt); + return true; } bool _z_rc_decrease_weak(void** cnt) { diff --git a/src/net/liveliness.c b/src/net/liveliness.c index 45966698f..872c4d9c7 100644 --- a/src/net/liveliness.c +++ b/src/net/liveliness.c @@ -163,6 +163,7 @@ z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closur } else { _z_liveliness_pending_query_clear(pq); + z_free(pq); } } diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 7eff8715b..bdc3d17f6 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort)); +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg->_patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif if (msg->_next_sn._is_qos) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + @@ -82,6 +87,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } +#if Z_FEATURE_FRAGMENTATION == 1 + if (has_patch) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif return ret; } @@ -89,14 +105,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == - (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) { msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -147,6 +166,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } + msg->_patch = _Z_NO_PATCH; if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_NO_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif + + return ret; +} + +z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx; + if (false) { +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } return ret; } @@ -222,8 +268,9 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_empty(); } - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01); + msg->_patch = _Z_NO_PATCH; + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } return ret; @@ -388,16 +435,42 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT"); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn)) - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + if (msg->first) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop))); + } else { + _Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + if (msg->drop) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP)); + } else { + _Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } } - if (ret == _Z_RES_OK && _z_slice_check(&msg->_payload)) { + if (_z_slice_check(&msg->_payload)) { _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_payload.start, 0, msg->_payload.len)); } return ret; } +z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx; + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_FIRST) { + msg->first = true; + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_DROP) { + msg->drop = true; + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } + return ret; +} + z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) { z_result_t ret = _Z_RES_OK; *msg = (_z_t_msg_fragment_t){0}; @@ -405,8 +478,10 @@ z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t _Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT"); ret |= _z_zsize_decode(&msg->_sn, zbf); + msg->first = false; + msg->drop = false; if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05); + ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg); } _z_slice_t slice = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf)); diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2af77cb97..486aa43fd 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._join._patch = _Z_CURRENT_PATCH; +#endif if ((lease % 1000) == 0) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } - if (next_sn._is_qos) { +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (next_sn._is_qos || has_patch) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } @@ -131,6 +139,9 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_slice_reset(&msg._body._init._cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -138,6 +149,12 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg._body._init._patch != _Z_NO_PATCH) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } +#endif + return msg; } @@ -153,6 +170,9 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -160,6 +180,11 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg._body._init._patch != _Z_NO_PATCH) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } +#endif return msg; } @@ -247,11 +272,12 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) { - return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool first, bool drop) { + return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, first, drop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last) { + bool is_last, bool first, bool drop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -263,12 +289,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; + if (first || drop) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + msg._body._fragment.first = first; + msg._body._fragment.drop = drop; return msg; } void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { + clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); + clone->first = msg->first; + clone->drop = msg->drop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { @@ -279,6 +313,9 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; +#if Z_FEATURE_FRAGMENTATION == 1 + clone->_patch = msg->_patch; +#endif memcpy(clone->_zid.id, msg->_zid.id, 16); } @@ -290,6 +327,9 @@ void _z_t_msg_copy_init(_z_t_msg_init_t *clone, _z_t_msg_init_t *msg) { clone->_batch_size = msg->_batch_size; memcpy(clone->_zid.id, msg->_zid.id, 16); _z_slice_copy(&clone->_cookie, &msg->_cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + clone->_patch = msg->_patch; +#endif } void _z_t_msg_copy_open(_z_t_msg_open_t *clone, _z_t_msg_open_t *msg) { diff --git a/src/protocol/keyexpr.c b/src/protocol/keyexpr.c index e27180f63..ce185f512 100644 --- a/src/protocol/keyexpr.c +++ b/src/protocol/keyexpr.c @@ -146,7 +146,7 @@ zp_keyexpr_canon_status_t __zp_canon_prefix(const char *start, size_t *len) { char const *next_slash; do { - next_slash = strchr(chunk_start, '/'); + next_slash = memchr(chunk_start, '/', _z_ptr_char_diff(end, chunk_start)); const char *chunk_end = next_slash ? next_slash : end; size_t chunk_len = _z_ptr_char_diff(chunk_end, chunk_start); switch (chunk_len) { @@ -291,7 +291,7 @@ void __zp_ke_write_chunk(char **writer, const char *chunk, size_t len, const cha writer[0] = _z_ptr_char_offset(writer[0], 1); } - (void)memcpy(writer[0], chunk, len); + (void)memmove(writer[0], chunk, len); writer[0] = _z_ptr_char_offset(writer[0], (ptrdiff_t)len); } @@ -766,10 +766,9 @@ zp_keyexpr_canon_status_t _z_keyexpr_canonize(char *start, size_t *len) { } else { assert(false); // anything before "$*" or "**" must be part of the canon prefix } - while (next_slash != NULL) { reader = _z_ptr_char_offset(next_slash, 1); - next_slash = strchr(reader, '/'); + next_slash = memchr(reader, '/', _z_ptr_char_diff(end, reader)); chunk_end = next_slash ? next_slash : end; switch (_z_ptr_char_diff(chunk_end, reader)) { case 0: { diff --git a/src/session/liveliness.c b/src/session/liveliness.c index a02a99cc3..11d883243 100644 --- a/src/session/liveliness.c +++ b/src/session/liveliness.c @@ -107,7 +107,7 @@ z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, c if (key != NULL) { ret = _z_trigger_liveliness_subscriptions_undeclare(zn, *key, timestamp); - _z_keyexpr_clear(key); + _z_keyexpr_free(&key); } return ret; @@ -182,8 +182,7 @@ z_result_t _z_liveliness_register_pending_query(_z_session_t *zn, uint32_t id, _ _Z_ERROR("Duplicate liveliness query id %i", (int)id); ret = _Z_ERR_ENTITY_DECLARATION_FAILED; } else { - _z_liveliness_pending_query_intmap_insert(&zn->_liveliness_pending_queries, id, - _z_liveliness_pending_query_clone(pen_qry)); + _z_liveliness_pending_query_intmap_insert(&zn->_liveliness_pending_queries, id, pen_qry); } _zp_session_unlock_mutex(zn); diff --git a/src/system/mbed/system.cpp b/src/system/mbed/system.cpp index e127d3518..9b8378ea0 100644 --- a/src/system/mbed/system.cpp +++ b/src/system/mbed/system.cpp @@ -136,23 +136,35 @@ z_result_t z_sleep_s(size_t time) { /*------------------ Instant ------------------*/ z_clock_t z_clock_now(void) { - // Not supported by default - return NULL; + auto now = Kernel::Clock::now(); + auto duration = now.time_since_epoch(); + auto secs = std::chrono::duration_cast(duration); + auto nanos = std::chrono::duration_cast(duration - secs); + + z_clock_t ts; + ts.tv_sec = secs.count(); + ts.tv_nsec = nanos.count(); + return ts; } unsigned long z_clock_elapsed_us(z_clock_t *instant) { - // Not supported by default - return -1; + z_clock_t now = z_clock_now(); + unsigned long elapsed = + (unsigned long)(1000000 * (now.tv_sec - instant->tv_sec) + (now.tv_nsec - instant->tv_nsec) / 1000); + return elapsed; } unsigned long z_clock_elapsed_ms(z_clock_t *instant) { - // Not supported by default - return -1; + z_clock_t now = z_clock_now(); + unsigned long elapsed = + (unsigned long)(1000 * (now.tv_sec - instant->tv_sec) + (now.tv_nsec - instant->tv_nsec) / 1000000); + return elapsed; } unsigned long z_clock_elapsed_s(z_clock_t *instant) { - // Not supported by default - return -1; + z_clock_t now = z_clock_now(); + unsigned long elapsed = (unsigned long)(now.tv_sec - instant->tv_sec); + return elapsed; } /*------------------ Time ------------------*/ diff --git a/src/system/zephyr/network.c b/src/system/zephyr/network.c index d8ac405b3..e01b89c82 100644 --- a/src/system/zephyr/network.c +++ b/src/system/zephyr/network.c @@ -65,8 +65,8 @@ z_result_t _z_open_tcp(_z_sys_net_socket_t *sock, const _z_sys_net_endpoint_t re tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } #if Z_FEATURE_TCP_NODELAY == 1 @@ -188,8 +188,8 @@ z_result_t _z_open_udp_unicast(_z_sys_net_socket_t *sock, const _z_sys_net_endpo tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if (ret != _Z_RES_OK) { @@ -297,8 +297,8 @@ z_result_t _z_open_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_end tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if ((ret == _Z_RES_OK) && (bind(sock->_fd, lsockaddr, addrlen) < 0)) { @@ -395,8 +395,8 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if ((ret == _Z_RES_OK) && (bind(sock->_fd, lsockaddr, addrlen) < 0)) { @@ -416,7 +416,7 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e if (!mcast) { ret = _Z_ERR_GENERIC; } -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv4_maddr_join(ifa, mcast); #else net_if_ipv4_maddr_join(mcast); @@ -427,7 +427,7 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e if (!mcast) { ret = _Z_ERR_GENERIC; } -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv6_maddr_join(ifa, mcast); #else net_if_ipv6_maddr_join(mcast); @@ -460,7 +460,7 @@ void _z_close_udp_multicast(_z_sys_net_socket_t *sockrecv, _z_sys_net_socket_t * if (rep._iptcp->ai_family == AF_INET) { mcast = net_if_ipv4_maddr_add(ifa, &((struct sockaddr_in *)rep._iptcp->ai_addr)->sin_addr); if (mcast != NULL) { -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv4_maddr_leave(ifa, mcast); #else net_if_ipv4_maddr_leave(mcast); @@ -472,7 +472,7 @@ void _z_close_udp_multicast(_z_sys_net_socket_t *sockrecv, _z_sys_net_socket_t * } else if (rep._iptcp->ai_family == AF_INET6) { mcast = net_if_ipv6_maddr_add(ifa, &((struct sockaddr_in6 *)rep._iptcp->ai_addr)->sin6_addr); if (mcast != NULL) { -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv6_maddr_leave(ifa, mcast); #else net_if_ipv6_maddr_leave(mcast); @@ -508,7 +508,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ struct sockaddr_in *a = ((struct sockaddr_in *)lep._iptcp->ai_addr); struct sockaddr_in *b = ((struct sockaddr_in *)&raddr); if (!((a->sin_port == b->sin_port) && (a->sin_addr.s_addr == b->sin_addr.s_addr))) { - // If addr is not NULL, it means that the raddr was requested by the upper-layers + // If addr is not NULL, it means that the raddr was requested by the + // upper-layers if (addr != NULL) { *addr = _z_slice_make(sizeof(uint32_t) + sizeof(uint16_t)); (void)memcpy((uint8_t *)addr->start, &b->sin_addr.s_addr, sizeof(uint32_t)); @@ -521,7 +522,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ struct sockaddr_in6 *b = ((struct sockaddr_in6 *)&raddr); if (!((a->sin6_port == b->sin6_port) && (memcmp(a->sin6_addr.s6_addr, b->sin6_addr.s6_addr, sizeof(uint32_t) * 4UL) == 0))) { - // If addr is not NULL, it means that the raddr was requested by the upper-layers + // If addr is not NULL, it means that the raddr was requested by the + // upper-layers if (addr != NULL) { *addr = _z_slice_make((sizeof(uint32_t) * 4UL) + sizeof(uint16_t)); (void)memcpy((uint8_t *)addr->start, &b->sin6_addr.s6_addr, sizeof(uint32_t) * 4UL); @@ -530,7 +532,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ break; } } else { - continue; // FIXME: support error report on invalid packet to the upper layer + continue; // FIXME: support error report on invalid packet to the upper + // layer } } while (1); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 75a837cbe..6ab0af73e 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -135,7 +135,8 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool first) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -144,7 +145,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, first, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index d531c6199..40e9f0b02 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -188,9 +188,54 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } entry->_received = true; - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &entry->_dbuf_reliable - : &entry->_dbuf_best_effort; // Select the right defragmentation buffer + bool consecutive; + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn) == true) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; + dbuf = &entry->_dbuf_reliable; + } else { + _z_wbuf_clear(&entry->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; + dbuf = &entry->_dbuf_best_effort; + } else { + _z_wbuf_clear(&entry->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) { + if (t_msg->_body._fragment.first) { + _z_wbuf_reset(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the first marker"); + break; + } + if (t_msg->_body._fragment.drop) { + _z_wbuf_reset(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { @@ -280,6 +325,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 + entry->_patch = + t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index df5b3bcdc..bbb555edd 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -126,13 +126,12 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_multicast_get_sn(ztm, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); @@ -144,6 +143,7 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } // Clear the buffer as it's no longer required diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index 7d72604e3..b363b2eab 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -30,6 +30,8 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); + + dst->_patch = src->_patch; #endif dst->_sn_res = src->_sn_res; diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index b4ddc5def..538c52e1a 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -292,13 +292,12 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Get the fragment sequence number sn = __unsafe_z_raweth_get_sn(ztm, reliability); } - is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first), _zp_raweth_unlock_tx_mutex(ztm)); // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), @@ -307,6 +306,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; + is_first = false; } // Clear the expandable buffer _z_wbuf_clear(&fbf); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 23c6859b9..e0b8e0c68 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -141,10 +141,50 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_INFO("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &ztu->_dbuf_reliable - : &ztu->_dbuf_best_effort; // Select the right defragmentation buffer - + bool consecutive; + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; + dbuf = &ztu->_dbuf_reliable; + } else { + _z_wbuf_clear(&ztu->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; + dbuf = &ztu->_dbuf_best_effort; + } else { + _z_wbuf_clear(&ztu->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) { + if (t_msg->_body._fragment.first) { + _z_wbuf_reset(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.drop) { + _z_wbuf_reset(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { // Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 01c19ce91..c7ef12aca 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -36,6 +36,10 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, z_result_t ret = _Z_RES_OK; zt->_type = _Z_TRANSPORT_UNICAST_TYPE; +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + zt->_transport._unicast._patch = param->_patch; +#endif #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes @@ -199,6 +203,13 @@ z_result_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, } else { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch > ism._body._init._patch) { + // TODO: Use a better error code? + ret = _Z_ERR_GENERIC; + } + param->_patch = iam._body._init._patch; +#endif if (ret == _Z_RES_OK) { param->_key_id_res = 0x08 << param->_key_id_res; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 5ed555794..561c232c7 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -135,13 +135,12 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_unicast_get_sn(ztu, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); @@ -153,6 +152,7 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } diff --git a/src/transport/utils.c b/src/transport/utils.c index 510a0f19c..dc0697043 100644 --- a/src/transport/utils.c +++ b/src/transport/utils.c @@ -82,6 +82,11 @@ bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, cons return ((distance <= _z_sn_half(sn_resolution)) && (distance != 0)); } +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right) { + _z_zint_t distance = (sn_right - sn_left) & sn_resolution; + return distance == 1; +} + _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn) { _z_zint_t ret = sn + 1; return (ret &= sn_resolution); diff --git a/tests/memory_leak.py b/tests/memory_leak.py index 34c49c9e5..2191d2295 100644 --- a/tests/memory_leak.py +++ b/tests/memory_leak.py @@ -1,22 +1,22 @@ -import argparse import os -from signal import SIGINT +import signal import subprocess import sys import time -import re # Specify the directory for the binaries DIR_EXAMPLES = "build/examples" NO_LEAK_OUTPUT = "All heap blocks were freed -- no leaks are possible" +VALGRIND_CMD = f"stdbuf -oL -eL valgrind --leak-check=full ./{DIR_EXAMPLES}/" + def failure_mode(fail_cmd): test_status = 0 # Start binary print("Start binary") - z_pub_command = f"stdbuf -oL -eL valgrind ./{DIR_EXAMPLES}/" + fail_cmd + z_pub_command = VALGRIND_CMD + fail_cmd z_pub_process = subprocess.Popen( z_pub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) @@ -38,9 +38,9 @@ def failure_mode(fail_cmd): def pub_and_sub(pub_cmd, sub_cmd): test_status = 0 - print("Start subscriber") + print(f"Start {sub_cmd}") # Start z_sub in the background - z_sub_command = f"stdbuf -oL -eL valgrind ./{DIR_EXAMPLES}/" + sub_cmd + z_sub_command = VALGRIND_CMD + sub_cmd z_sub_process = subprocess.Popen( z_sub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True @@ -48,21 +48,21 @@ def pub_and_sub(pub_cmd, sub_cmd): # Introduce a delay to ensure z_sub starts time.sleep(2) - print("Start publisher") + print(f"Start {pub_cmd}") # Start z_pub - z_pub_command = f"stdbuf -oL -eL valgrind ./{DIR_EXAMPLES}/" + pub_cmd + z_pub_command = VALGRIND_CMD + pub_cmd z_pub_process = subprocess.Popen( z_pub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for z_pub to finish z_pub_process.wait() - print("Stop subscriber") + print(f"Stop {sub_cmd}") time.sleep(2) if z_sub_process.poll() is None: # send SIGINT to group z_sub_process_gid = os.getpgid(z_sub_process.pid) - os.killpg(z_sub_process_gid, SIGINT) + os.killpg(z_sub_process_gid, signal.SIGINT) # Wait for z_sub to finish z_sub_process.wait() @@ -71,18 +71,18 @@ def pub_and_sub(pub_cmd, sub_cmd): # Check output of z_pub z_pub_output = z_pub_process.stderr.read() if NO_LEAK_OUTPUT in z_pub_output: - print("z_pub output valid") + print(f"{pub_cmd} output valid") else: - print("z_pub output invalid:") + print(f"{pub_cmd} output invalid:") print(f"Received: \"{z_pub_output}\"") test_status = 1 # Check output of z_sub z_sub_output = z_sub_process.stderr.read() if NO_LEAK_OUTPUT in z_sub_output: - print("z_sub output valid") + print(f"{sub_cmd} output valid") else: - print("z_sub output invalid:") + print(f"{sub_cmd} output invalid:") print(f"Received: \"{z_sub_output}\"") test_status = 1 # Return value @@ -91,9 +91,9 @@ def pub_and_sub(pub_cmd, sub_cmd): def query_and_queryable(query_cmd, queryable_cmd): test_status = 0 - print("Start queryable") + print(f"Start {queryable_cmd}") # Start z_queryable in the background - z_queryable_command = f"stdbuf -oL -eL valgrind ./{DIR_EXAMPLES}/" + queryable_cmd + z_queryable_command = VALGRIND_CMD + queryable_cmd z_queryable_process = subprocess.Popen( z_queryable_command, shell=True, @@ -106,9 +106,9 @@ def query_and_queryable(query_cmd, queryable_cmd): # Introduce a delay to ensure z_queryable starts time.sleep(2) - print("Start query") + print(f"Start {query_cmd}") # Start z_query - z_query_command = f"stdbuf -oL -eL valgrind ./{DIR_EXAMPLES}/" + query_cmd + z_query_command = VALGRIND_CMD + query_cmd z_query_process = subprocess.Popen( z_query_command, shell=True, @@ -120,12 +120,12 @@ def query_and_queryable(query_cmd, queryable_cmd): # Wait for z_query to finish z_query_process.wait() - print("Stop queryable") + print(f"Stop {queryable_cmd}") time.sleep(2) if z_queryable_process.poll() is None: # send SIGINT to group z_quaryable_process_gid = os.getpgid(z_queryable_process.pid) - os.killpg(z_quaryable_process_gid, SIGINT) + os.killpg(z_quaryable_process_gid, signal.SIGINT) # Wait for z_queryable to finish z_queryable_process.wait() @@ -134,18 +134,18 @@ def query_and_queryable(query_cmd, queryable_cmd): # Check output of z_query z_query_output = z_query_process.stderr.read() if NO_LEAK_OUTPUT in z_query_output: - print("z_query output valid") + print(f"{query_cmd} output valid") else: - print("z_query output invalid:") + print(f"{query_cmd} output invalid:") print(f'Received: "{z_query_output}"') test_status = 1 # Check output of z_queryable z_queryable_output = z_queryable_process.stderr.read() if NO_LEAK_OUTPUT in z_queryable_output: - print("z_queryable output valid") + print(f"{queryable_cmd} output valid") else: - print("z_queryable output invalid:") + print(f"{queryable_cmd} output invalid:") print(f'Received: "{z_queryable_output}"') test_status = 1 # Return status @@ -153,25 +153,34 @@ def query_and_queryable(query_cmd, queryable_cmd): if __name__ == "__main__": + signal.signal(signal.SIGINT, signal.SIG_IGN) EXIT_STATUS = 0 # Test failure mode print("*** Failure mode ***") - if failure_mode(f'z_pub -m peer') == 1: + if failure_mode('z_pub -m peer') == 1: EXIT_STATUS = 1 # Test pub and sub examples print("*** Pub & sub test ***") - if pub_and_sub(f'z_pub -n 1', f'z_sub -n 1') == 1: + if pub_and_sub('z_pub -n 1', 'z_sub -n 1') == 1: EXIT_STATUS = 1 print("*** Pub & sub attachment test ***") - if pub_and_sub(f'z_pub_attachment -n 1', f'z_sub_attachment -n 1') == 1: + if pub_and_sub('z_pub_attachment -n 1', 'z_sub_attachment -n 1') == 1: EXIT_STATUS = 1 # Test query and queryable examples print("*** Query & queryable test ***") - if query_and_queryable(f'z_get', f'z_queryable -n 1') == 1: + if query_and_queryable('z_get', 'z_queryable -n 1') == 1: EXIT_STATUS = 1 print("*** Query & queryable attachment test ***") - if query_and_queryable(f'z_get_attachment -v Something', f'z_queryable_attachment -n 1') == 1: + if query_and_queryable('z_get_attachment -v Something', 'z_queryable_attachment -n 1') == 1: + EXIT_STATUS = 1 + # Test liveliness query + print("*** Get liveliness test ***") + if query_and_queryable('z_get_liveliness', 'z_liveliness') == 1: + EXIT_STATUS = 1 + # Test liveliness subscriber + print("*** Liveliness subscriber test ***") + if query_and_queryable('z_sub_liveliness -h -n 1', 'z_liveliness') == 1: EXIT_STATUS = 1 # Exit sys.exit(EXIT_STATUS) diff --git a/tests/z_api_encoding_test.c b/tests/z_api_encoding_test.c index 03a3b0df9..98895a003 100644 --- a/tests/z_api_encoding_test.c +++ b/tests/z_api_encoding_test.c @@ -85,6 +85,7 @@ void test_with_schema(void) { z_encoding_to_string(z_encoding_loan_mut(&e), &s); assert(strncmp("zenoh/bytes;my_schema", z_string_data(z_string_loan(&s)), z_string_len(z_string_loan(&s))) == 0); z_encoding_drop(z_encoding_move(&e)); + z_string_drop(z_string_move(&s)); z_encoding_from_str(&e, "zenoh/string;"); z_encoding_set_schema_from_substr(z_encoding_loan_mut(&e), "my_schema", 3); diff --git a/tests/z_api_liveliness_test.c b/tests/z_api_liveliness_test.c index 2cf4c350e..c37715a91 100644 --- a/tests/z_api_liveliness_test.c +++ b/tests/z_api_liveliness_test.c @@ -139,6 +139,9 @@ void test_liveliness_sub(bool multicast, bool history) { z_sleep_s(1); assert(context.token2_drop); + z_closure_sample_drop(z_closure_sample_move(&closure)); + z_subscriber_drop(z_subscriber_move(&sub)); + assert_ok(zp_stop_read_task(z_loan_mut(s1))); assert_ok(zp_stop_read_task(z_loan_mut(s2))); assert_ok(zp_stop_lease_task(z_loan_mut(s1))); @@ -199,6 +202,7 @@ void test_liveliness_get(void) { assert(z_fifo_handler_reply_recv(z_fifo_handler_reply_loan(&handler), &reply) == Z_CHANNEL_DISCONNECTED); z_fifo_handler_reply_drop(z_fifo_handler_reply_move(&handler)); + z_closure_reply_drop(z_closure_reply_move(&cb)); assert_ok(zp_stop_read_task(z_loan_mut(s1))); assert_ok(zp_stop_read_task(z_loan_mut(s2))); diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index 5bd241d53..80ea7e9f4 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -13,12 +13,10 @@ // #include #include -#include -#include #include "zenoh-pico/api/handlers.h" #include "zenoh-pico/api/macros.h" -#include "zenoh-pico/net/sample.h" +#include "zenoh-pico/collections/bytes.h" #undef NDEBUG #include @@ -37,6 +35,7 @@ .attachment = _z_bytes_null(), \ }; \ z_call(*z_loan(closure), &sample); \ + _z_bytes_drop(&payload); \ } while (0); #define _RECV(handler, method, buf) \ @@ -192,11 +191,13 @@ void zero_size_test(void) { assert(z_fifo_channel_sample_new(&closure, &fifo_handler, 0) != Z_OK); assert(z_fifo_channel_sample_new(&closure, &fifo_handler, 1) == Z_OK); z_drop(z_move(fifo_handler)); + z_drop(z_move(closure)); z_owned_ring_handler_sample_t ring_handler; assert(z_ring_channel_sample_new(&closure, &ring_handler, 0) != Z_OK); assert(z_ring_channel_sample_new(&closure, &ring_handler, 1) == Z_OK); z_drop(z_move(ring_handler)); + z_drop(z_move(closure)); } int main(void) { diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c index 8bb54007f..0fe9bab17 100644 --- a/tests/z_collections_test.c +++ b/tests/z_collections_test.c @@ -18,7 +18,6 @@ #include "zenoh-pico/collections/fifo.h" #include "zenoh-pico/collections/lifo.h" -#include "zenoh-pico/collections/list.h" #include "zenoh-pico/collections/ring.h" #include "zenoh-pico/collections/string.h" @@ -313,10 +312,10 @@ void int_map_iterator_test(void) { _z_str_intmap_t map; map = _z_str_intmap_make(); - _z_str_intmap_insert(&map, 10, "A"); - _z_str_intmap_insert(&map, 20, "B"); - _z_str_intmap_insert(&map, 30, "C"); - _z_str_intmap_insert(&map, 40, "D"); + _z_str_intmap_insert(&map, 10, _z_str_clone("A")); + _z_str_intmap_insert(&map, 20, _z_str_clone("B")); + _z_str_intmap_insert(&map, 30, _z_str_clone("C")); + _z_str_intmap_insert(&map, 40, _z_str_clone("D")); #define TEST_MAP(map) \ { \ @@ -346,6 +345,9 @@ void int_map_iterator_test(void) { TEST_MAP(map2); + _z_str_intmap_clear(&map); + _z_str_intmap_clear(&map2); + #undef TEST_MAP } diff --git a/tests/z_data_struct_test.c b/tests/z_data_struct_test.c index e3dff0fa5..045f832e8 100644 --- a/tests/z_data_struct_test.c +++ b/tests/z_data_struct_test.c @@ -15,6 +15,7 @@ #include #include #include +#include #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" @@ -29,6 +30,7 @@ void entry_list_test(void) { _z_transport_peer_entry_list_t *root = _z_transport_peer_entry_list_new(); for (int i = 0; i < 10; i++) { _z_transport_peer_entry_t *entry = (_z_transport_peer_entry_t *)z_malloc(sizeof(_z_transport_peer_entry_t)); + memset(entry, 0, sizeof(_z_transport_peer_entry_t)); root = _z_transport_peer_entry_list_insert(root, entry); } _z_transport_peer_entry_list_t *list = root; @@ -39,6 +41,7 @@ void entry_list_test(void) { for (int i = 0; i < 11; i++) { _z_transport_peer_entry_t *entry = (_z_transport_peer_entry_t *)z_malloc(sizeof(_z_transport_peer_entry_t)); + memset(entry, 0, sizeof(_z_transport_peer_entry_t)); root = _z_transport_peer_entry_list_insert(root, entry); } assert(_z_transport_peer_entry_list_head(root)->_peer_id == _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE - 1); @@ -140,6 +143,8 @@ void str_vec_list_intmap_test(void) { _z_str_intmap_clear(&map); assert(_z_str_intmap_is_empty(&map) == true); + + z_free(s); } void _z_slice_custom_deleter(void *data, void *context) { @@ -243,6 +248,7 @@ void z_id_to_string_test(void) { assert(z_string_len(z_string_loan(&id_str)) == 32); assert(strncmp("0f0e0d0c0b0a09080706050403020100", z_string_data(z_string_loan(&id_str)), z_string_len(z_string_loan(&id_str))) == 0); + z_string_drop(z_string_move(&id_str)); } int main(void) { diff --git a/tests/z_endpoint_test.c b/tests/z_endpoint_test.c index 4252b0665..c2f97d6ad 100644 --- a/tests/z_endpoint_test.c +++ b/tests/z_endpoint_test.c @@ -42,25 +42,32 @@ int main(void) { str = _z_string_alias_str(""); assert(_z_locator_from_string(&lc, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_locator_clear(&lc); str = _z_string_alias_str("/"); assert(_z_locator_from_string(&lc, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_locator_clear(&lc); str = _z_string_alias_str("tcp"); assert(_z_locator_from_string(&lc, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_locator_clear(&lc); str = _z_string_alias_str("tcp/"); assert(_z_locator_from_string(&lc, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_locator_clear(&lc); str = _z_string_alias_str("127.0.0.1:7447"); assert(_z_locator_from_string(&lc, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_locator_clear(&lc); str = _z_string_alias_str("tcp/127.0.0.1:7447?"); assert(_z_locator_from_string(&lc, &str) == _Z_RES_OK); + _z_locator_clear(&lc); // No metadata defined so far... but this is a valid syntax in principle str = _z_string_alias_str("tcp/127.0.0.1:7447?invalid=ctrl"); assert(_z_locator_from_string(&lc, &str) == _Z_RES_OK); + _z_locator_clear(&lc); // Endpoint printf(">>> Testing endpoints...\n"); @@ -80,25 +87,32 @@ int main(void) { str = _z_string_alias_str(""); assert(_z_endpoint_from_string(&ep, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_endpoint_clear(&ep); str = _z_string_alias_str("/"); assert(_z_endpoint_from_string(&ep, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_endpoint_clear(&ep); str = _z_string_alias_str("tcp"); assert(_z_endpoint_from_string(&ep, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_endpoint_clear(&ep); str = _z_string_alias_str("tcp/"); assert(_z_endpoint_from_string(&ep, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_endpoint_clear(&ep); str = _z_string_alias_str("127.0.0.1:7447"); assert(_z_endpoint_from_string(&ep, &str) == _Z_ERR_CONFIG_LOCATOR_INVALID); + _z_endpoint_clear(&ep); str = _z_string_alias_str("tcp/127.0.0.1:7447?"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); + _z_endpoint_clear(&ep); // No metadata defined so far... but this is a valid syntax in principle str = _z_string_alias_str("tcp/127.0.0.1:7447?invalid=ctrl"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); + _z_endpoint_clear(&ep); str = _z_string_alias_str("udp/127.0.0.1:7447#iface=eth0"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); @@ -116,12 +130,15 @@ int main(void) { str = _z_string_alias_str("udp/127.0.0.1:7447#invalid=eth0"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); + _z_endpoint_clear(&ep); str = _z_string_alias_str("udp/127.0.0.1:7447?invalid=ctrl#iface=eth0"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); + _z_endpoint_clear(&ep); str = _z_string_alias_str("udp/127.0.0.1:7447?invalid=ctrl#invalid=eth0"); assert(_z_endpoint_from_string(&ep, &str) == _Z_RES_OK); + _z_endpoint_clear(&ep); return 0; } diff --git a/tests/z_keyexpr_test.c b/tests/z_keyexpr_test.c index 96fc7df93..58b0bdd8f 100644 --- a/tests/z_keyexpr_test.c +++ b/tests/z_keyexpr_test.c @@ -314,6 +314,7 @@ void test_canonize(void) { printf(" Match: %.*s : %s\n", (int)canon_len, canon, canonized[i]); assert(strncmp(canonized[i], canon, canon_len) == 0); } + free(canon); } for (int i = 0; i < N; i++) { @@ -331,6 +332,7 @@ void test_canonize(void) { assert(strncmp(canonized[i], canon, canon_len) == 0); } printf("\n"); + free(canon); } } diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 59237b3b2..1c227d194 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -221,18 +221,23 @@ char *gen_str(size_t size) { return str; } +_z_string_t gen_string(size_t len) { + char *str = gen_str(len); + _z_string_t ret = _z_string_copy_from_str(str); + z_free(str); + return ret; +} + _z_string_svec_t gen_str_array(size_t size) { _z_string_svec_t sa = _z_string_svec_make(size); for (size_t i = 0; i < size; i++) { - _z_string_t s = _z_string_copy_from_str(gen_str(16)); + _z_string_t s = gen_string(16); _z_string_svec_append(&sa, &s); } return sa; } -_z_string_t gen_string(size_t len) { return _z_string_alias_str(gen_str(len)); } - _z_locator_array_t gen_locator_array(size_t size) { _z_locator_array_t la = _z_locator_array_make(size); for (size_t i = 0; i < size; i++) { @@ -341,7 +346,7 @@ void assert_eq_locator_array(const _z_locator_array_t *left, const _z_locator_ar _z_string_t ls = _z_locator_to_string(l); _z_string_t rs = _z_locator_to_string(r); - printf("%s:%s", _z_string_data(&ls), _z_string_data(&rs)); + printf("%.*s:%.*s", (int)_z_string_len(&ls), _z_string_data(&ls), (int)_z_string_len(&rs), _z_string_data(&rs)); if (i < left->_len - 1) printf(" "); _z_string_clear(&ls); @@ -1820,7 +1825,7 @@ void frame_message(void) { } _z_transport_message_t gen_fragment(void) { - return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool()); + return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool(), gen_bool(), gen_bool()); } void assert_eq_fragment(const _z_t_msg_fragment_t *left, const _z_t_msg_fragment_t *right) { assert(left->_sn == right->_sn); diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index 27cb84d9a..fc8b322cc 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -141,7 +141,7 @@ void test_rc_clone_as_weak(void) { assert(_z_rc_weak_count(dwk1._cnt) == 2); assert(dwk1._val->foo == 42); - assert(!_dummy_rc_drop(&drc1)); + assert(_dummy_rc_drop(&drc1)); assert(_z_rc_strong_count(dwk1._cnt) == 0); assert(_z_rc_weak_count(dwk1._cnt) == 1); assert(_dummy_weak_drop(&dwk1)); @@ -156,7 +156,7 @@ void test_rc_clone_as_weak_ptr(void) { assert(_z_rc_strong_count(dwk1->_cnt) == 1); assert(_z_rc_weak_count(dwk1->_cnt) == 2); - assert(!_dummy_rc_drop(&drc1)); + assert(_dummy_rc_drop(&drc1)); assert(_z_rc_strong_count(dwk1->_cnt) == 0); assert(_z_rc_weak_count(dwk1->_cnt) == 1); assert(_dummy_weak_drop(dwk1)); @@ -180,7 +180,7 @@ void test_weak_clone(void) { assert(_z_rc_strong_count(dwk2._cnt) == 1); assert(_z_rc_weak_count(dwk2._cnt) == 3); - assert(!_dummy_rc_drop(&drc1)); + assert(_dummy_rc_drop(&drc1)); assert(_z_rc_strong_count(dwk2._cnt) == 0); assert(_z_rc_weak_count(dwk2._cnt) == 2); @@ -208,7 +208,7 @@ void test_weak_copy(void) { void test_weak_upgrade(void) { _dummy_t val = {.foo = 42}; - _dummy_rc_t drc1 = _dummy_rc_new(&val); + _dummy_rc_t drc1 = _dummy_rc_new_from_val(&val); _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); // Valid upgrade @@ -217,7 +217,7 @@ void test_weak_upgrade(void) { assert(_z_rc_strong_count(drc2._cnt) == 2); assert(_z_rc_weak_count(drc2._cnt) == 3); assert(!_dummy_rc_drop(&drc1)); - assert(!_dummy_rc_drop(&drc2)); + assert(_dummy_rc_drop(&drc2)); // Failed upgrade _dummy_rc_t drc3 = _dummy_weak_upgrade(&dwk1); @@ -240,6 +240,10 @@ void test_overflow(void) { _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); assert(_Z_RC_IS_NULL(&dwk1)); + + // Manual free to make asan happy, without long decresing + free(drc1._val); + free(drc1._cnt); } void test_decr(void) { @@ -248,6 +252,7 @@ void test_decr(void) { _dummy_rc_t drc2 = _dummy_rc_clone(&drc1); assert(!_dummy_rc_decr(&drc2)); assert(_dummy_rc_decr(&drc1)); + free(drc1._val); } int main(void) {