From 31aaa7a8562e2d054ac7e814289fb5b02702f500 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 19 Dec 2024 17:14:42 +0100 Subject: [PATCH] Implement connection restoring (#799) --- .github/workflows/ci.yml | 25 +- CMakeLists.txt | 3 + include/zenoh-pico/collections/list.h | 41 +-- include/zenoh-pico/config.h | 3 +- include/zenoh-pico/config.h.in | 1 + include/zenoh-pico/link/endpoint.h | 4 +- include/zenoh-pico/link/link.h | 4 +- include/zenoh-pico/net/session.h | 42 ++- .../zenoh-pico/protocol/definitions/network.h | 2 + include/zenoh-pico/session/utils.h | 2 +- .../zenoh-pico/transport/common/transport.h | 30 ++ include/zenoh-pico/transport/manager.h | 3 +- .../transport/multicast/transport.h | 2 +- include/zenoh-pico/transport/transport.h | 1 + .../zenoh-pico/transport/unicast/transport.h | 2 +- src/api/api.c | 64 +++- src/collections/list.c | 4 +- src/link/endpoint.c | 12 +- src/link/link.c | 4 +- src/net/primitives.c | 43 ++- src/net/session.c | 275 +++++++++++++----- src/protocol/definitions/network.c | 6 + src/protocol/definitions/transport.c | 1 - src/session/rx.c | 1 - src/session/utils.c | 48 +-- src/system/rpi_pico/system.c | 1 - src/transport/common/transport.c | 60 ++++ src/transport/manager.c | 8 +- src/transport/multicast/lease.c | 9 + src/transport/multicast/transport.c | 32 +- src/transport/transport.c | 67 ++--- src/transport/unicast/lease.c | 25 +- src/transport/unicast/transport.c | 35 +-- tests/connection_restore.py | 203 +++++++++++++ 34 files changed, 791 insertions(+), 272 deletions(-) create mode 100644 include/zenoh-pico/transport/common/transport.h create mode 100644 src/transport/common/transport.c create mode 100644 tests/connection_restore.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fbd6ca374..783336bc6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -297,6 +297,29 @@ jobs: CMAKE_GENERATOR=Ninja ASAN=ON make python3 ./build/tests/no_router.py timeout-minutes: 5 + + connection_restore_test: + needs: zenoh_build + name: Connection restore test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Download Zenoh artifacts + uses: actions/download-artifact@v4 + with: + name: ${{ needs.zenoh_build.outputs.artifact-name }} + + - name: Unzip Zenoh artifacts + run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone + + - name: Build project and run test + run: | + sudo apt install -y ninja-build + CMAKE_GENERATOR=Ninja ASAN=ON CMAKE_BUILD_TYPE=Debug ZENOH_DEBUG=3 make + RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd + timeout-minutes: 15 markdown_lint: runs-on: ubuntu-latest @@ -331,7 +354,7 @@ jobs: ci: name: CI status checks runs-on: ubuntu-latest - needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, markdown_lint, build_shared, build_static, integration, multicast] + needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, connection_restore_test, markdown_lint, build_shared, build_static, integration, multicast] if: always() steps: - name: Check whether all jobs pass diff --git a/CMakeLists.txt b/CMakeLists.txt index 1295f5a5d..2a8233b12 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -244,6 +244,7 @@ set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions") set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check") set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching") set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE") +set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection") # Add a warning message if someone tries to enable Z_FEATURE_LINK_SERIAL_USB directly if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API) @@ -261,6 +262,7 @@ message(STATUS "Building with feature confing:\n\ * QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\ * LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\ * INTEREST: ${Z_FEATURE_INTEREST}\n\ +* AUTO_RECONNECT: ${Z_FEATURE_AUTO_RECONNECT}\n\ * RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}") configure_file( @@ -527,6 +529,7 @@ if(UNIX OR MSVC) configure_file(${PROJECT_SOURCE_DIR}/tests/attachment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/attachment.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/no_router.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/no_router.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/memory_leak.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/memory_leak.py COPYONLY) + configure_file(${PROJECT_SOURCE_DIR}/tests/connection_restore.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/connection_restore.py COPYONLY) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) diff --git a/include/zenoh-pico/collections/list.h b/include/zenoh-pico/collections/list.h index 590d0f32d..a6741fc52 100644 --- a/include/zenoh-pico/collections/list.h +++ b/include/zenoh-pico/collections/list.h @@ -49,31 +49,32 @@ _z_list_t *_z_list_push(_z_list_t *xs, void *x); _z_list_t *_z_list_push_back(_z_list_t *xs, void *x); _z_list_t *_z_list_pop(_z_list_t *xs, z_element_free_f f_f, void **x); -_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, void *e); +_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, const void *e); -_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, void *left); +_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, const void *left); _z_list_t *_z_list_clone(const _z_list_t *xs, z_element_clone_f d_f); void _z_list_free(_z_list_t **xs, z_element_free_f f_f); -#define _Z_LIST_DEFINE(name, type) \ - typedef _z_list_t name##_list_t; \ - static inline name##_list_t *name##_list_new(void) { return NULL; } \ - static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \ - static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \ - static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \ - static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \ - static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \ - static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \ - return _z_list_pop(l, name##_elem_free, (void **)x); \ - } \ - static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, type *e) { \ - return _z_list_find(l, (z_element_eq_f)c_f, e); \ - } \ - static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, type *e) { \ - return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \ - } \ - static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \ +#define _Z_LIST_DEFINE(name, type) \ + typedef _z_list_t name##_list_t; \ + static inline name##_list_t *name##_list_new(void) { return NULL; } \ + static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \ + static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \ + static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \ + static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \ + static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \ + static inline name##_list_t *name##_list_push_back(name##_list_t *l, type *e) { return _z_list_push_back(l, e); } \ + static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \ + return _z_list_pop(l, name##_elem_free, (void **)x); \ + } \ + static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, const type *e) { \ + return _z_list_find(l, (z_element_eq_f)c_f, e); \ + } \ + static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, const type *e) { \ + return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \ + } \ + static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \ static inline void name##_list_free(name##_list_t **l) { _z_list_free(l, name##_elem_free); } #ifdef __cplusplus diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 1d22d110b..7ec06acdf 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -27,7 +27,7 @@ #define Z_FEATURE_SUBSCRIPTION 1 #define Z_FEATURE_QUERY 1 #define Z_FEATURE_QUERYABLE 1 -#define Z_FEATURE_LIVELINESS 0 +#define Z_FEATURE_LIVELINESS 1 #define Z_FEATURE_RAWETH_TRANSPORT 0 #define Z_FEATURE_INTEREST 1 #define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0 @@ -48,6 +48,7 @@ #define Z_FEATURE_PUBLISHER_SESSION_CHECK 1 #define Z_FEATURE_BATCHING 1 #define Z_FEATURE_RX_CACHE 0 +#define Z_FEATURE_AUTO_RECONNECT 1 // End of CMake generation /*------------------ Runtime configuration properties ------------------*/ diff --git a/include/zenoh-pico/config.h.in b/include/zenoh-pico/config.h.in index a01c6f668..92b7006ea 100644 --- a/include/zenoh-pico/config.h.in +++ b/include/zenoh-pico/config.h.in @@ -48,6 +48,7 @@ #define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@ #define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@ #define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@ +#define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@ // End of CMake generation /*------------------ Runtime configuration properties ------------------*/ diff --git a/include/zenoh-pico/link/endpoint.h b/include/zenoh-pico/link/endpoint.h index 8aa03390e..621d2840f 100644 --- a/include/zenoh-pico/link/endpoint.h +++ b/include/zenoh-pico/link/endpoint.h @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right); void _z_locator_init(_z_locator_t *locator); _z_string_t _z_locator_to_string(const _z_locator_t *loc); -z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s); +z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s); size_t _z_locator_size(_z_locator_t *lc); void _z_locator_clear(_z_locator_t *lc); @@ -72,7 +72,7 @@ typedef struct { } _z_endpoint_t; _z_string_t _z_endpoint_to_string(const _z_endpoint_t *e); -z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s); +z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s); void _z_endpoint_clear(_z_endpoint_t *ep); void _z_endpoint_free(_z_endpoint_t **ep); diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 522721fe8..8b2da1ea7 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -141,8 +141,8 @@ typedef struct _z_link_t { void _z_link_clear(_z_link_t *zl); void _z_link_free(_z_link_t **zl); -z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator); -z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator); +z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator); +z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator); z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf); size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr); diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 5ee1556dc..4babe6771 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -21,6 +21,7 @@ #include "zenoh-pico/collections/list.h" #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/session.h" @@ -55,6 +56,14 @@ typedef struct _z_session_t { _z_resource_list_t *_local_resources; _z_resource_list_t *_remote_resources; +#if Z_FEATURE_AUTO_RECONNECT == 1 + // Information for session restoring + _z_config_t _config; + _z_network_message_list_t *_decalaration_cache; + z_task_attr_t *_lease_task_attr; + z_task_attr_t *_read_task_attr; +#endif + // Session subscriptions #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_list_t *_subscriptions; @@ -99,14 +108,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session) * Open a zenoh-net session * * Parameters: + * zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value. * config: A set of properties. The caller keeps its ownership. - * zn: A pointer of A :c:type:`_z_session_t` used as a return value. + * zid: A pointer to Zenoh ID. * * Returns: * ``0`` in case of success, or a ``negative value`` in case of failure. + */ +z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid); + +/** + * Reopen a disconnected zenoh-net session * + * Parameters: + * zn: Existing zenoh-net session. + * + * Returns: + * ``0`` in case of success, or a ``negative value`` in case of failure. + */ +z_result_t _z_reopen(_z_session_rc_t *zn); + +/** + * Store declaration network message to cache for resend it after session restore + * + * Parameters: + * zs: A zenoh-net session. + * z_msg: Network message with declaration + */ +void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg); + +/** + * Remove corresponding declaration from the cache + * + * Parameters: + * zs: A zenoh-net session. + * z_msg: Network message with undeclaration */ -z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config); +void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg); /** * Close a zenoh-net session. diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 8895b77ed..2af9d97ef 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -303,6 +303,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg); inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); } _Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move) _Z_SVEC_DEFINE(_z_network_message, _z_network_message_t) +_Z_LIST_DEFINE(_z_network_message, _z_network_message_t) void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping); _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid, @@ -315,6 +316,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); _z_network_message_t _z_n_msg_make_interest(_z_interest_t interest); z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src); +_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src); #ifdef __cplusplus } diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 44c48ff91..238390e6e 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -30,7 +30,7 @@ extern "C" { _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout, const bool exit_on_first); -z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid); +z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid); void _z_session_clear(_z_session_t *zn); z_result_t _z_session_close(_z_session_t *zn, uint8_t reason); diff --git a/include/zenoh-pico/transport/common/transport.h b/include/zenoh-pico/transport/common/transport.h new file mode 100644 index 000000000..36410df24 --- /dev/null +++ b/include/zenoh-pico/transport/common/transport.h @@ -0,0 +1,30 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_COMMON_TRANSPORT_H +#define ZENOH_PICO_COMMON_TRANSPORT_H + +#include "zenoh-pico/transport/transport.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks); + +#ifdef __cplusplus +} +#endif + +#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/ diff --git a/include/zenoh-pico/transport/manager.h b/include/zenoh-pico/transport/manager.h index b22eff975..c4c8bbe63 100644 --- a/include/zenoh-pico/transport/manager.h +++ b/include/zenoh-pico/transport/manager.h @@ -28,7 +28,8 @@ enum _z_peer_op_e { _Z_PEER_OP_LISTEN = 1, }; -z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op); +z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode, + int peer_op); void _z_free_transport(_z_transport_t **zt); #ifdef __cplusplus diff --git a/include/zenoh-pico/transport/multicast/transport.h b/include/zenoh-pico/transport/multicast/transport.h index 3e61f4bc1..d324dc540 100644 --- a/include/zenoh-pico/transport/multicast/transport.h +++ b/include/zenoh-pico/transport/multicast/transport.h @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa const _z_id_t *local_zid); z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only); z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason); -void _z_multicast_transport_clear(_z_transport_t *zt); +void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks); #if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1 static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); } diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index c2f87b6ee..d8f5aa2dd 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -175,6 +175,7 @@ typedef struct { uint8_t _seq_num_res; } _z_transport_multicast_establish_param_t; +_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt); z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason); void _z_transport_clear(_z_transport_t *zt); void _z_transport_free(_z_transport_t **zt); diff --git a/include/zenoh-pico/transport/unicast/transport.h b/include/zenoh-pico/transport/unicast/transport.h index 3ca322c42..84e439200 100644 --- a/include/zenoh-pico/transport/unicast/transport.h +++ b/include/zenoh-pico/transport/unicast/transport.h @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c const _z_id_t *local_zid, int peer_op); z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only); z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason); -void _z_unicast_transport_clear(_z_transport_t *zt); +void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks); #ifdef __cplusplus } diff --git a/src/api/api.c b/src/api/api.c index dc604dedc..207f4b467 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -42,6 +42,7 @@ #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/unicast.h" +#include "zenoh-pico/utils/config.h" #include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/pointers.h" @@ -642,35 +643,74 @@ z_result_t z_scout(z_moved_config_t *config, z_moved_closure_hello_t *callback, void z_open_options_default(z_open_options_t *options) { options->__dummy = 0; } -z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) { - _ZP_UNUSED(options); +static _z_id_t _z_session_get_zid(const _z_config_t *config) { + _z_id_t zid = _z_id_empty(); + char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY); + if (opt_as_str != NULL) { + _z_uuid_to_bytes(zid.id, opt_as_str); + } else { + _z_session_generate_zid(&zid, Z_ZID_LENGTH); + } + return zid; +} + +static z_result_t _z_session_rc_init(z_owned_session_t *zs, _z_id_t *zid) { z_internal_session_null(zs); _z_session_t *s = z_malloc(sizeof(_z_session_t)); if (s == NULL) { - z_config_drop(config); return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } - memset(s, 0, sizeof(_z_session_t)); - // Create rc + + z_result_t ret = _z_session_init(s, zid); + if (ret != _Z_RES_OK) { + _Z_ERROR("_z_open failed: %i", ret); + z_free(s); + return ret; + } + _z_session_rc_t zsrc = _z_session_rc_new(s); if (zsrc._cnt == NULL) { + _z_session_clear(s); z_free(s); - z_config_drop(config); return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } zs->_rc = zsrc; - // Open session - z_result_t ret = _z_open(&zs->_rc, &config->_this._val); + + return _Z_RES_OK; +} + +z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) { + _ZP_UNUSED(options); + + _z_config_t *cfg = &config->_this._val; + if (config == NULL) { + _Z_ERROR("A valid config is missing."); + return _Z_ERR_GENERIC; + } + + _z_id_t zid = _z_session_get_zid(cfg); + + z_result_t ret = _z_session_rc_init(zs, &zid); if (ret != _Z_RES_OK) { - _Z_ERROR("_z_open failed: %i", ret); - _z_session_rc_decr(&zs->_rc); - z_internal_session_null(zs); z_config_drop(config); - z_free(s); return ret; } + + ret = _z_open(&zs->_rc, cfg, &zid); + if (ret != _Z_RES_OK) { + z_session_drop(z_session_move(zs)); + z_config_drop(config); + return ret; + } + // Clean up +#if Z_FEATURE_AUTO_RECONNECT == 1 + _Z_OWNED_RC_IN_VAL(zs)->_config = config->_this._val; + z_internal_config_null(&config->_this); +#else z_config_drop(config); +#endif + return _Z_RES_OK; } diff --git a/src/collections/list.c b/src/collections/list.c index 1df8bcba1..5b6857fc7 100644 --- a/src/collections/list.c +++ b/src/collections/list.c @@ -76,7 +76,7 @@ _z_list_t *_z_list_pop(_z_list_t *xs, z_element_free_f f_f, void **x) { return l; } -_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f c_f, void *e) { +_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f c_f, const void *e) { _z_list_t *l = (_z_list_t *)xs; _z_list_t *ret = NULL; while (l != NULL) { @@ -90,7 +90,7 @@ _z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f c_f, void *e) { return ret; } -_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, void *left) { +_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, const void *left) { _z_list_t *l = (_z_list_t *)xs; _z_list_t *previous = xs; _z_list_t *current = xs; diff --git a/src/link/endpoint.c b/src/link/endpoint.c index 1b15f5ec3..5cd621a37 100644 --- a/src/link/endpoint.c +++ b/src/link/endpoint.c @@ -85,7 +85,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right) { return res; } -static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_string_t *str) { +static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, const _z_string_t *str) { *protocol = _z_string_null(); const char *p_start = _z_string_data(str); @@ -97,7 +97,7 @@ static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_stri return _z_string_copy_substring(protocol, str, 0, p_len); } -static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string_t *str) { +static z_result_t _z_locator_address_from_string(_z_string_t *address, const _z_string_t *str) { *address = _z_string_null(); // Find protocol separator @@ -130,7 +130,7 @@ static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string return _z_string_copy_substring(address, str, start_offset, addr_len); } -z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, _z_string_t *str) { +z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, const _z_string_t *str) { *strint = _z_str_intmap_make(); // Find metadata separator @@ -169,7 +169,7 @@ void _z_locator_metadata_onto_str(char *dst, size_t dst_len, const _z_str_intmap _z_str_intmap_onto_str(dst, dst_len, s, 0, NULL); } -z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *str) { +z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *str) { if (str == NULL || !_z_string_check(str)) { return _Z_ERR_CONFIG_LOCATOR_INVALID; } @@ -284,7 +284,7 @@ void _z_endpoint_free(_z_endpoint_t **ep) { } } -z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, _z_string_t *str, _z_string_t *proto) { +z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, const _z_string_t *str, _z_string_t *proto) { char *p_start = (char *)memchr(_z_string_data(str), ENDPOINT_CONFIG_SEPARATOR, _z_string_len(str)); if (p_start != NULL) { p_start = _z_ptr_char_offset(p_start, 1); @@ -411,7 +411,7 @@ char *_z_endpoint_config_to_string(const _z_str_intmap_t *s, const _z_string_t * return NULL; } -z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *str) { +z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *str) { _z_endpoint_init(ep); _Z_CLEAN_RETURN_IF_ERR(_z_locator_from_string(&ep->_locator, str), _z_endpoint_clear(ep)); _Z_CLEAN_RETURN_IF_ERR(_z_endpoint_config_from_string(&ep->_config, str, &ep->_locator._protocol), diff --git a/src/link/link.c b/src/link/link.c index 8c5ffb312..76f454b38 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -21,7 +21,7 @@ #include "zenoh-pico/link/manager.h" #include "zenoh-pico/utils/logging.h" -z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) { +z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator) { z_result_t ret = _Z_RES_OK; _z_endpoint_t ep; @@ -71,7 +71,7 @@ z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) { return ret; } -z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator) { +z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator) { z_result_t ret = _Z_RES_OK; _z_endpoint_t ep; diff --git a/src/net/primitives.c b/src/net/primitives.c index d1909188f..b7651bfb2 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -21,6 +21,9 @@ #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" +#include "zenoh-pico/net/logger.h" +#include "zenoh-pico/net/sample.h" +#include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" #include "zenoh-pico/protocol/definitions/interest.h" @@ -38,6 +41,32 @@ #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" +/*------------------ Declaration Helpers ------------------*/ +static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t *n_msg) { + z_result_t ret = _Z_RES_OK; + ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + +#if Z_FEATURE_AUTO_RECONNECT == 1 + if (ret == _Z_RES_OK) { + _z_cache_declaration(zn, n_msg); + } +#endif + + return ret; +} + +static z_result_t _z_send_undecalre(_z_session_t *zn, const _z_network_message_t *n_msg) { + z_result_t ret = _Z_RES_OK; + ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + +#if Z_FEATURE_AUTO_RECONNECT == 1 + if (ret == _Z_RES_OK) { + _z_prune_declaration(zn, n_msg); + } +#endif + + return ret; +} /*------------------ Scouting ------------------*/ void _z_scout(const z_what_t what, const _z_id_t zid, _z_string_t *locator, const uint32_t timeout, _z_closure_hello_callback_t callback, void *arg_call, _z_drop_handler_t dropper, void *arg_drop) { @@ -68,7 +97,7 @@ uint16_t _z_declare_resource(_z_session_t *zn, _z_keyexpr_t keyexpr) { _z_keyexpr_t alias = _z_keyexpr_alias(keyexpr); _z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { + if (_z_send_decalre(zn, &n_msg) == _Z_RES_OK) { ret = id; } else { _z_unregister_resource(zn, id, _Z_KEYEXPR_MAPPING_LOCAL); @@ -88,7 +117,7 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_undecl_keyexpr(rid); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { + if (_z_send_undecalre(zn, &n_msg) == _Z_RES_OK) { _z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL); // Only if message is send, local resource is removed } else { @@ -235,7 +264,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sp_s); _z_subscriber_clear(&ret); return ret; @@ -265,8 +294,7 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { declaration = _z_make_undecl_subscriber(sub->_entity_id, &_Z_RC_IN_VAL(s)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(&sub->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != - _Z_RES_OK) { + if (_z_send_undecalre(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); @@ -299,7 +327,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_session_queryable(_Z_RC_IN_VAL(zn), sp_q); _z_queryable_clear(&ret); return ret; @@ -328,8 +356,7 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) { declaration = _z_make_undecl_queryable(qle->_entity_id, &_Z_RC_IN_VAL(q)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_n_msg(_Z_RC_IN_VAL(&qle->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != - _Z_RES_OK) { + if (_z_send_undecalre(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); diff --git a/src/net/session.c b/src/net/session.c index c70fdbabe..9374e9131 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -17,15 +17,16 @@ #include #include -#include "zenoh-pico/api/primitives.h" -#include "zenoh-pico/collections/slice.h" +#include "zenoh-pico/api/constants.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" -#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/declarations.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/lease.h" #include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/lease.h" #include "zenoh-pico/transport/multicast/read.h" @@ -34,70 +35,50 @@ #include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" +#include "zenoh-pico/utils/config.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" #include "zenoh-pico/utils/uuid.h" -static z_result_t __z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, z_whatami_t mode, int peer_op) { +static z_result_t _z_locators_by_scout(const _z_config_t *config, const _z_id_t *zid, _z_string_svec_t *locators) { z_result_t ret = _Z_RES_OK; - _z_id_t local_zid = _z_id_empty(); - ret = _z_session_generate_zid(&local_zid, Z_ZID_LENGTH); - if (ret != _Z_RES_OK) { - local_zid = _z_id_empty(); - return ret; + char *opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT; } - ret = _z_new_transport(&_Z_RC_IN_VAL(zn)->_tp, &local_zid, locator, mode, peer_op); - if (ret != _Z_RES_OK) { - local_zid = _z_id_empty(); - return ret; + z_what_t what = strtol(opt_as_str, NULL, 10); + + opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT; + } + _z_string_t mcast_locator = _z_string_alias_str(opt_as_str); + + opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY); + if (opt_as_str == NULL) { + opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT; + } + uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10); + + // Scout and return upon the first result + _z_hello_list_t *hellos = _z_scout_inner(what, *zid, &mcast_locator, timeout, true); + if (hellos != NULL) { + _z_hello_t *hello = _z_hello_list_head(hellos); + _z_string_svec_copy(locators, &hello->_locators, true); } - ret = _z_session_init(zn, &local_zid); + _z_hello_list_free(&hellos); return ret; } -z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { +static z_result_t _z_locators_by_config(_z_config_t *config, const _z_id_t *zid, _z_string_svec_t *locators, + int *peer_op) { z_result_t ret = _Z_RES_OK; - _Z_RC_IN_VAL(zn)->_tp._type = _Z_TRANSPORT_NONE; - - _z_id_t zid = _z_id_empty(); - char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY); - if (opt_as_str != NULL) { - _z_uuid_to_bytes(zid.id, opt_as_str); - } - if (config == NULL) { - _Z_ERROR("A valid config is missing."); - return _Z_ERR_GENERIC; - } - int peer_op = _Z_PEER_OP_LISTEN; - _z_string_svec_t locators = _z_string_svec_make(0); char *connect = _z_config_get(config, Z_CONFIG_CONNECT_KEY); char *listen = _z_config_get(config, Z_CONFIG_LISTEN_KEY); - if (connect == NULL && listen == NULL) { // Scout if peer is not configured - opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT; - } - z_what_t what = strtol(opt_as_str, NULL, 10); - - opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT; - } - _z_string_t mcast_locator = _z_string_alias_str(opt_as_str); - - opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY); - if (opt_as_str == NULL) { - opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT; - } - uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10); - - // Scout and return upon the first result - _z_hello_list_t *hellos = _z_scout_inner(what, zid, &mcast_locator, timeout, true); - if (hellos != NULL) { - _z_hello_t *hello = _z_hello_list_head(hellos); - _z_string_svec_copy(&locators, &hello->_locators, true); - } - _z_hello_list_free(&hellos); + if (connect == NULL && listen == NULL) { + // Scout if peer is not configured + ret = _z_locators_by_scout(config, zid, locators); } else { uint_fast8_t key = Z_CONFIG_CONNECT_KEY; if (listen != NULL) { @@ -108,11 +89,62 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { return _Z_ERR_GENERIC; } } else { - peer_op = _Z_PEER_OP_OPEN; + *peer_op = _Z_PEER_OP_OPEN; } - locators = _z_string_svec_make(1); + *locators = _z_string_svec_make(1); _z_string_t s = _z_string_copy_from_str(_z_config_get(config, key)); - _z_string_svec_append(&locators, &s, true); + _z_string_svec_append(locators, &s, true); + } + return ret; +} + +static z_result_t _z_config_get_mode(const _z_config_t *config, z_whatami_t *mode) { + z_result_t ret = _Z_RES_OK; + char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY); + *mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client + if (s_mode != NULL) { + if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) { + *mode = Z_WHATAMI_CLIENT; + } else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) { + *mode = Z_WHATAMI_PEER; + } else { + _Z_ERROR("Trying to configure an invalid mode: %s", s_mode); + ret = _Z_ERR_CONFIG_INVALID_MODE; + } + } + return ret; +} + +static z_result_t _z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, const _z_id_t *zid, z_whatami_t mode, + int peer_op) { + z_result_t ret = _Z_RES_OK; + + _z_transport_t zt; + ret = _z_new_transport(&zt, zid, locator, mode, peer_op); + if (ret != _Z_RES_OK) { + return ret; + } + + _z_transport_get_common(&zt)->_session = zn; + _Z_RC_IN_VAL(zn)->_tp = zt; + return ret; +} + +z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid) { + z_result_t ret = _Z_RES_OK; + _Z_RC_IN_VAL(zn)->_tp._type = _Z_TRANSPORT_NONE; + + int peer_op = _Z_PEER_OP_LISTEN; + _z_string_svec_t locators = _z_string_svec_make(0); + ret = _z_locators_by_config(config, zid, &locators, &peer_op); + if (ret != _Z_RES_OK) { + return ret; + } + + z_whatami_t mode; + ret = _z_config_get_mode(config, &mode); + if (ret != _Z_RES_OK) { + return ret; } ret = _Z_ERR_SCOUT_NO_RESULTS; @@ -124,32 +156,119 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { // @TODO: check invalid configurations // For example, client mode in multicast links - // Check operation mode - char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY); - z_whatami_t mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client - if (s_mode != NULL) { - if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) { - mode = Z_WHATAMI_CLIENT; - } else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) { - mode = Z_WHATAMI_PEER; + ret = _z_open_inner(zn, locator, zid, mode, peer_op); + if (ret == _Z_RES_OK) { + break; + } + } + _z_string_svec_clear(&locators); + return ret; +} + +#if Z_FEATURE_AUTO_RECONNECT == 1 + +z_result_t _z_reopen(_z_session_rc_t *zn) { + z_result_t ret = _Z_RES_OK; + _z_session_t *zs = _Z_RC_IN_VAL(zn); + if (_z_config_is_empty(&zs->_config)) { + return ret; + } + + do { + ret = _z_open(zn, &zs->_config, &zs->_local_zid); + if (ret != _Z_RES_OK) { + if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED || ret == _Z_ERR_SCOUT_NO_RESULTS || + ret == _Z_ERR_TRANSPORT_TX_FAILED || ret == _Z_ERR_TRANSPORT_RX_FAILED) { + _Z_DEBUG("Reopen failed, next try in 1s"); + z_sleep_s(1); + continue; } else { - ret = _Z_ERR_CONFIG_INVALID_MODE; + return ret; } } - if (ret == _Z_RES_OK) { - ret = __z_open_inner(zn, locator, mode, peer_op); - if (ret == _Z_RES_OK) { - break; +#if Z_FEATURE_MULTI_THREAD == 1 + ret = _zp_start_lease_task(zs, zs->_lease_task_attr); + if (ret != _Z_RES_OK) { + return ret; + } + ret = _zp_start_read_task(zs, zs->_read_task_attr); + if (ret != _Z_RES_OK) { + return ret; + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + if (ret == _Z_RES_OK && !_z_network_message_list_is_empty(zs->_decalaration_cache)) { + _z_network_message_list_t *iter = zs->_decalaration_cache; + while (iter != NULL) { + _z_network_message_t *n_msg = _z_network_message_list_head(iter); + ret = _z_send_n_msg(zs, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + if (ret != _Z_RES_OK) { + _Z_DEBUG("Send message during reopen failed: %i", ret); + continue; + } + + iter = _z_network_message_list_tail(iter); } - } else { - _Z_ERROR("Trying to configure an invalid mode."); } - } - _z_string_svec_clear(&locators); + } while (ret != _Z_RES_OK); + return ret; } +void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg) { + if (_z_config_is_empty(&zs->_config)) { + return; + } + zs->_decalaration_cache = _z_network_message_list_push_back(zs->_decalaration_cache, _z_n_msg_clone(n_msg)); +} + +#define _Z_CACHE_DECLARATION_UNDECLARE_FILTER(tp) \ + static bool _z_cache_declaration_undeclare_filter_##tp(const _z_network_message_t *left, \ + const _z_network_message_t *right) { \ + return left->_body._declare._decl._body._undecl_##tp._id == right->_body._declare._decl._body._decl_##tp._id; \ + } +_Z_CACHE_DECLARATION_UNDECLARE_FILTER(kexpr) +_Z_CACHE_DECLARATION_UNDECLARE_FILTER(subscriber) +_Z_CACHE_DECLARATION_UNDECLARE_FILTER(queryable) +_Z_CACHE_DECLARATION_UNDECLARE_FILTER(token) + +void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg) { + if (n_msg->_tag != _Z_N_DECLARE) { + _Z_ERROR("Invalid net message for _z_prune_declaration: %i", n_msg->_tag); + return; + } +#ifdef Z_BUILD_DEBUG + size_t cnt_before = _z_network_message_list_len(zs->_decalaration_cache); +#endif + const _z_declaration_t *decl = &n_msg->_body._declare._decl; + switch (decl->_tag) { + case _Z_UNDECL_KEXPR: + zs->_decalaration_cache = _z_network_message_list_drop_filter( + zs->_decalaration_cache, _z_cache_declaration_undeclare_filter_kexpr, n_msg); + break; + case _Z_UNDECL_SUBSCRIBER: + zs->_decalaration_cache = _z_network_message_list_drop_filter( + zs->_decalaration_cache, _z_cache_declaration_undeclare_filter_subscriber, n_msg); + break; + case _Z_UNDECL_QUERYABLE: + zs->_decalaration_cache = _z_network_message_list_drop_filter( + zs->_decalaration_cache, _z_cache_declaration_undeclare_filter_queryable, n_msg); + break; + case _Z_UNDECL_TOKEN: + zs->_decalaration_cache = _z_network_message_list_drop_filter( + zs->_decalaration_cache, _z_cache_declaration_undeclare_filter_token, n_msg); + break; + default: + _Z_ERROR("Invalid decl for _z_prune_declaration: %i", decl->_tag); + }; +#ifdef Z_BUILD_DEBUG + size_t cnt_after = _z_network_message_list_len(zs->_decalaration_cache); + assert(cnt_before == cnt_after + 1); +#endif +} +#endif // Z_FEATURE_AUTO_RECONNECT == 1 + void _z_close(_z_session_t *zn) { _z_session_close(zn, _Z_CLOSE_GENERIC); } bool _z_session_is_closed(const _z_session_t *session) { return session->_tp._type == _Z_TRANSPORT_NONE; } @@ -218,6 +337,10 @@ z_result_t _zp_start_read_task(_z_session_t *zn, z_task_attr_t *attr) { // Free task if operation failed if (ret != _Z_RES_OK) { z_free(task); +#if Z_FEATURE_AUTO_RECONNECT == 1 + } else { + zn->_read_task_attr = attr; +#endif } return ret; } @@ -247,6 +370,10 @@ z_result_t _zp_start_lease_task(_z_session_t *zn, z_task_attr_t *attr) { // Free task if operation failed if (ret != _Z_RES_OK) { z_free(task); +#if Z_FEATURE_AUTO_RECONNECT == 1 + } else { + zn->_lease_task_attr = attr; +#endif } return ret; } diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index 1a84ea1cd..bab75f7d9 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -354,6 +354,12 @@ z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t * } } +_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src) { + _z_network_message_t *dst = z_malloc(sizeof(_z_network_message_t)); + _z_n_msg_copy(dst, src); + return dst; +} + void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { switch (msg->_tag) { case _Z_N_DECLARE: { diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 69b1cc502..7bdfd9d85 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -41,7 +41,6 @@ void _z_t_msg_close_clear(_z_t_msg_close_t *msg) { (void)(msg); } void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg) { (void)(msg); } void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg) { - // TODO (sashacmc): make in more correct way if (!msg->_messages._aliased) { _z_network_message_svec_clear(&msg->_messages); } diff --git a/src/session/rx.c b/src/session/rx.c index eb5ddf6e4..9d44fcc45 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -164,7 +164,6 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * } } } - // TODO (sashacmc): why it was removed??? _z_msg_clear(msg); return ret; } diff --git a/src/session/utils.c b/src/session/utils.c index 89d92cdd4..7bb9da967 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -18,12 +18,16 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/transport/transport.h" +#include "zenoh-pico/utils/config.h" +#include "zenoh-pico/utils/result.h" /*------------------ clone helpers ------------------*/ void _z_timestamp_copy(_z_timestamp_t *dst, const _z_timestamp_t *src) { *dst = *src; } @@ -47,15 +51,26 @@ z_result_t _z_session_generate_zid(_z_id_t *bs, uint8_t size) { } /*------------------ Init/Free/Close session ------------------*/ -z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { +z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) { z_result_t ret = _Z_RES_OK; - _z_session_t *zn = _Z_RC_IN_VAL(zsrc); + +#if Z_FEATURE_MULTI_THREAD == 1 + ret = _z_mutex_init(&zn->_mutex_inner); + if (ret != _Z_RES_OK) { + return ret; + } +#endif // Initialize the counters to 1 zn->_entity_id = 1; zn->_resource_id = 1; zn->_query_id = 1; +#if Z_FEATURE_AUTO_RECONNECT == 1 + _z_config_init(&zn->_config); + zn->_decalaration_cache = NULL; +#endif + // Initialize the data structs zn->_local_resources = NULL; zn->_remote_resources = NULL; @@ -76,14 +91,6 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { zn->_pending_queries = NULL; #endif -#if Z_FEATURE_MULTI_THREAD == 1 - ret = _z_mutex_init(&zn->_mutex_inner); - if (ret != _Z_RES_OK) { - _z_transport_clear(&zn->_tp); - return ret; - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - #if Z_FEATURE_LIVELINESS == 1 _z_liveliness_init(zn); #endif @@ -91,20 +98,7 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { _z_interest_init(zn); zn->_local_zid = *zid; - // Note session in transport - switch (zn->_tp._type) { - case _Z_TRANSPORT_UNICAST_TYPE: - zn->_tp._transport._unicast._common._session = zsrc; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - zn->_tp._transport._multicast._common._session = zsrc; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - zn->_tp._transport._raweth._common._session = zsrc; - break; - default: - break; - } + return ret; } @@ -116,6 +110,12 @@ void _z_session_clear(_z_session_t *zn) { _zp_stop_read_task(zn); _zp_stop_lease_task(zn); #endif + +#if Z_FEATURE_AUTO_RECONNECT == 1 + _z_config_clear(&zn->_config); + _z_network_message_list_free(&zn->_decalaration_cache); +#endif + _z_close(zn); // Clear Zenoh PID // Clean up transports diff --git a/src/system/rpi_pico/system.c b/src/system/rpi_pico/system.c index c3b323483..7969b65a3 100644 --- a/src/system/rpi_pico/system.c +++ b/src/system/rpi_pico/system.c @@ -104,7 +104,6 @@ z_result_t _z_task_join(_z_task_t *task) { z_result_t _z_task_detach(_z_task_t *task) { _ZP_UNUSED(task); - assert(false); return _Z_ERR_GENERIC; } diff --git a/src/transport/common/transport.c b/src/transport/common/transport.c new file mode 100644 index 000000000..433a6a1b1 --- /dev/null +++ b/src/transport/common/transport.c @@ -0,0 +1,60 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/transport/transport.h" + +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/system/common/platform.h" +#include "zenoh-pico/utils/result.h" + +void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks) { +#if Z_FEATURE_MULTI_THREAD == 1 + // Clean up tasks + if (ztc->_read_task != NULL) { + ztc->_read_task_running = false; + if (detach_tasks) { + _z_task_detach(ztc->_read_task); + } else { + _z_task_join(ztc->_read_task); + } + z_free(ztc->_read_task); + ztc->_read_task = NULL; + } + if (ztc->_lease_task != NULL) { + ztc->_lease_task_running = false; + if (detach_tasks) { + _z_task_detach(ztc->_lease_task); + } else { + _z_task_join(ztc->_lease_task); + } + z_free(ztc->_lease_task); + ztc->_lease_task = NULL; + } + + // Clean up the mutexes + _z_mutex_drop(&ztc->_mutex_tx); + _z_mutex_drop(&ztc->_mutex_rx); +#else + _ZP_UNUSED(detach_tasks); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Clean up the buffers + _z_wbuf_clear(&ztc->_wbuf); + _z_zbuf_clear(&ztc->_zbuf); + _z_arc_slice_svec_release(&ztc->_arc_pool); + _z_network_message_svec_release(&ztc->_msg_pool); + + _z_link_clear(&ztc->_link); +} diff --git a/src/transport/manager.c b/src/transport/manager.c index 6b2447eea..77fb21c65 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -20,7 +20,7 @@ #include "zenoh-pico/transport/multicast/transport.h" #include "zenoh-pico/transport/unicast/transport.h" -static z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid) { +static z_result_t _z_new_transport_client(_z_transport_t *zt, const _z_string_t *locator, const _z_id_t *local_zid) { z_result_t ret = _Z_RES_OK; // Init link _z_link_t zl; @@ -62,7 +62,8 @@ static z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locat return ret; } -static z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid, int peer_op) { +static z_result_t _z_new_transport_peer(_z_transport_t *zt, const _z_string_t *locator, const _z_id_t *local_zid, + int peer_op) { z_result_t ret = _Z_RES_OK; // Init link _z_link_t zl; @@ -105,7 +106,8 @@ static z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator return ret; } -z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op) { +z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode, + int peer_op) { z_result_t ret; if (mode == Z_WHATAMI_CLIENT) { diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index b8f98cd19..b29603169 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -21,6 +21,7 @@ #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/multicast/lease.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" #if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 @@ -55,6 +56,13 @@ z_result_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { #if Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) +static void _zp_multicast_failed(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); +#if Z_FEATURE_AUTO_RECONNECT == 1 + _z_reopen(ztm->_common._session); +#endif +} + static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { _z_zint_t ret = local_lease; @@ -133,6 +141,7 @@ void *_zp_multicast_lease_task(void *ztm_arg) { if (ztm->_common._transmitted == false) { if (_zp_multicast_send_keep_alive(ztm) < 0) { _Z_INFO("Send keep alive failed."); + _zp_multicast_failed(ztm); } } // Reset the keep alive parameters diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index bbe13bf19..2bdcabf45 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, -#include "zenoh-pico/transport/multicast/transport.h" +#include "zenoh-pico/transport/common/transport.h" #include #include @@ -20,13 +20,9 @@ #include #include "zenoh-pico/link/link.h" -#include "zenoh-pico/transport/common/lease.h" -#include "zenoh-pico/transport/common/read.h" #include "zenoh-pico/transport/common/tx.h" -#include "zenoh-pico/transport/multicast.h" -#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/multicast/transport.h" #include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -193,33 +189,13 @@ z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t r return _z_multicast_send_close(ztm, reason, false); } -void _z_multicast_transport_clear(_z_transport_t *zt) { - _z_transport_multicast_t *ztm = &zt->_transport._multicast; +void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks) { + _z_common_transport_clear(&ztm->_common, detach_tasks); #if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztm->_common._read_task != NULL) { - _z_task_join(ztm->_common._read_task); - z_free(ztm->_common._read_task); - } - if (ztm->_common._lease_task != NULL) { - _z_task_join(ztm->_common._lease_task); - z_free(ztm->_common._lease_task); - } - // Clean up the mutexes - _z_mutex_drop(&ztm->_common._mutex_tx); - _z_mutex_drop(&ztm->_common._mutex_rx); _z_mutex_drop(&ztm->_mutex_peer); #endif // Z_FEATURE_MULTI_THREAD == 1 - // Clean up the buffers - _z_wbuf_clear(&ztm->_common._wbuf); - _z_zbuf_clear(&ztm->_common._zbuf); - _z_arc_slice_svec_release(&ztm->_common._arc_pool); - _z_network_message_svec_release(&ztm->_common._msg_pool); - - // Clean up peer list _z_transport_peer_entry_list_free(&ztm->_peers); - _z_link_clear(&ztm->_common._link); } #else diff --git a/src/transport/transport.c b/src/transport/transport.c index 2540ec5ac..89213a647 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -19,17 +19,25 @@ #include #include "zenoh-pico/config.h" -#include "zenoh-pico/link/link.h" #include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/raweth/rx.h" -#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/transport.h" -#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/unicast/transport.h" -#include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" +_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt) { + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + return &zt->_transport._unicast._common; + case _Z_TRANSPORT_MULTICAST_TYPE: + return &zt->_transport._multicast._common; + case _Z_TRANSPORT_RAWETH_TYPE: + return &zt->_transport._raweth._common; + default: + _Z_DEBUG("None transport, it should never happens"); + return NULL; + } +} + z_result_t _z_send_close(_z_transport_t *zt, uint8_t reason, bool link_only) { z_result_t ret = _Z_RES_OK; // Call transport function @@ -53,11 +61,11 @@ z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason) { return _z_se void _z_transport_clear(_z_transport_t *zt) { switch (zt->_type) { case _Z_TRANSPORT_UNICAST_TYPE: - _z_unicast_transport_clear(zt); + _z_unicast_transport_clear(&zt->_transport._unicast, false); break; case _Z_TRANSPORT_MULTICAST_TYPE: case _Z_TRANSPORT_RAWETH_TYPE: - _z_multicast_transport_clear(zt); + _z_multicast_transport_clear(&zt->_transport._multicast, false); break; default: break; @@ -78,49 +86,16 @@ void _z_transport_free(_z_transport_t **zt) { #if Z_FEATURE_BATCHING == 1 bool _z_transport_start_batching(_z_transport_t *zt) { - uint8_t *batch_state = NULL; - size_t *batch_count = NULL; - switch (zt->_type) { - case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._common._batch_state; - batch_count = &zt->_transport._unicast._common._batch_count; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._common._batch_state; - batch_count = &zt->_transport._multicast._common._batch_count; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._common._batch_state; - batch_count = &zt->_transport._raweth._common._batch_count; - break; - default: - break; - } - if (*batch_state == _Z_BATCHING_ACTIVE) { + _z_transport_common_t *ztc = _z_transport_get_common(zt); + if (ztc->_batch_state == _Z_BATCHING_ACTIVE) { return false; } - *batch_count = 0; - *batch_state = _Z_BATCHING_ACTIVE; + ztc->_batch_count = 0; + ztc->_batch_state = _Z_BATCHING_ACTIVE; return true; } -void _z_transport_stop_batching(_z_transport_t *zt) { - uint8_t *batch_state = NULL; - switch (zt->_type) { - case _Z_TRANSPORT_UNICAST_TYPE: - batch_state = &zt->_transport._unicast._common._batch_state; - break; - case _Z_TRANSPORT_MULTICAST_TYPE: - batch_state = &zt->_transport._multicast._common._batch_state; - break; - case _Z_TRANSPORT_RAWETH_TYPE: - batch_state = &zt->_transport._raweth._common._batch_state; - break; - default: - break; - } - *batch_state = _Z_BATCHING_IDLE; -} +void _z_transport_stop_batching(_z_transport_t *zt) { _z_transport_get_common(zt)->_batch_state = _Z_BATCHING_IDLE; } #endif /** diff --git a/src/transport/unicast/lease.c b/src/transport/unicast/lease.c index c4cd362f1..233ab0053 100644 --- a/src/transport/unicast/lease.c +++ b/src/transport/unicast/lease.c @@ -15,8 +15,9 @@ #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/session/query.h" -#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/system/common/platform.h" #include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/transport.h" #include "zenoh-pico/transport/unicast/transport.h" #include "zenoh-pico/utils/logging.h" @@ -40,6 +41,19 @@ z_result_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { #if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 +static void _zp_unicast_failed(_z_transport_unicast_t *ztu) { + _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); + _z_unicast_transport_clear(ztu, true); + +#if Z_FEATURE_AUTO_RECONNECT == 1 + _z_session_rc_ref_t *zs = ztu->_common._session; + z_result_t ret = _z_reopen(zs); + if (ret != _Z_RES_OK) { + _Z_ERROR("Reopen failed: %i", ret); + } +#endif +} + void *_zp_unicast_lease_task(void *ztu_arg) { _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; @@ -56,10 +70,10 @@ void *_zp_unicast_lease_task(void *ztu_arg) { // Reset the lease parameters ztu->_received = false; } else { + // THIS LOG STRING USED IN TEST, change with caution _Z_INFO("Closing session because it has expired after %zums", ztu->_common._lease); - ztu->_common._lease_task_running = false; - _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); - break; + _zp_unicast_failed(ztu); + return 0; } next_lease = (int)ztu->_common._lease; } @@ -68,7 +82,10 @@ void *_zp_unicast_lease_task(void *ztu_arg) { // Check if need to send a keep alive if (ztu->_common._transmitted == false) { if (_zp_unicast_send_keep_alive(ztu) < 0) { + // THIS LOG STRING USED IN TEST, change with caution _Z_INFO("Send keep alive failed."); + _zp_unicast_failed(ztu); + return 0; } } // Reset the keep alive parameters diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 7ab1578fa..53e316df9 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, +#include "zenoh-pico/transport/unicast/transport.h" + #include #include #include @@ -18,13 +20,10 @@ #include #include "zenoh-pico/link/link.h" +#include "zenoh-pico/system/common/platform.h" #include "zenoh-pico/transport/common/rx.h" +#include "zenoh-pico/transport/common/transport.h" #include "zenoh-pico/transport/common/tx.h" -#include "zenoh-pico/transport/multicast/rx.h" -#include "zenoh-pico/transport/unicast.h" -#include "zenoh-pico/transport/unicast/lease.h" -#include "zenoh-pico/transport/unicast/read.h" -#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -220,6 +219,7 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par _z_t_msg_clear(&oam); ret = _Z_ERR_MESSAGE_UNEXPECTED; } + // THIS LOG STRING USED IN TEST, change with caution _Z_DEBUG("Received Z_OPEN(Ack)"); param->_lease = oam._body._open._lease; // The session lease // The initial SN at RX side. Initialize the session as we had already received @@ -352,29 +352,9 @@ z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reaso return _z_unicast_send_close(ztu, reason, false); } -void _z_unicast_transport_clear(_z_transport_t *zt) { - _z_transport_unicast_t *ztu = &zt->_transport._unicast; -#if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztu->_common._read_task != NULL) { - _z_task_join(ztu->_common._read_task); - z_free(ztu->_common._read_task); - } - if (ztu->_common._lease_task != NULL) { - _z_task_join(ztu->_common._lease_task); - z_free(ztu->_common._lease_task); - } - - // Clean up the mutexes - _z_mutex_drop(&ztu->_common._mutex_tx); - _z_mutex_drop(&ztu->_common._mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 +void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks) { + _z_common_transport_clear(&ztu->_common, detach_tasks); - // Clean up the buffers - _z_wbuf_clear(&ztu->_common._wbuf); - _z_zbuf_clear(&ztu->_common._zbuf); - _z_arc_slice_svec_release(&ztu->_common._arc_pool); - _z_network_message_svec_release(&ztu->_common._msg_pool); #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); _z_wbuf_clear(&ztu->_dbuf_best_effort); @@ -382,7 +362,6 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { // Clean up PIDs ztu->_remote_zid = _z_id_empty(); - _z_link_clear(&ztu->_common._link); } #else diff --git a/tests/connection_restore.py b/tests/connection_restore.py new file mode 100644 index 000000000..c2685778e --- /dev/null +++ b/tests/connection_restore.py @@ -0,0 +1,203 @@ +import subprocess +import time +import sys +import os +import threading + +ROUTER_INIT_TIMEOUT_S = 3 +WAIT_MESSAGE_TIMEOUT_S = 15 +DISCONNECT_MESSAGES = ["Closing session because it has expired", "Send keep alive failed"] +CONNECT_MESSAGES = ["Z_OPEN(Ack)"] +ROUTER_ERROR_MESSAGE = "ERROR" +ZENOH_PORT = "7447" + +ROUTER_ARGS = ['-l', f'tcp/0.0.0.0:{ZENOH_PORT}', '--no-multicast-scouting'] +STDBUF_CMD = ["stdbuf", "-o0"] + +DIR_EXAMPLES = "build/examples" +ACTIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_pub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] +PASSIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] + +LIBASAN_PATH = subprocess.run(["gcc", "-print-file-name=libasan.so"], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + + +def run_process(command, output_collector, process_list): + env = os.environ.copy() + if LIBASAN_PATH: + env["LD_PRELOAD"] = LIBASAN_PATH + + print(f"Run {command}") + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) + process_list.append(process) + for line in iter(process.stdout.readline, ''): + print("--", line.strip()) + output_collector.append(line.strip()) + process.stdout.close() + process.wait() + + +def run_background(command, output_collector, process_list): + thread = threading.Thread(target=run_process, args=(command, output_collector, process_list)) + thread.start() + + +def terminate_processes(process_list): + for process in process_list: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process_list.clear() + + +def block_connection(): + subprocess.run(["iptables", "-A", "INPUT", "-p", "tcp", "--dport", ZENOH_PORT, "-j", "DROP"], check=True) + subprocess.run(["iptables", "-A", "OUTPUT", "-p", "tcp", "--sport", ZENOH_PORT, "-j", "DROP"], check=True) + + +def unblock_connection(): + subprocess.run(["iptables", "-D", "INPUT", "-p", "tcp", "--dport", ZENOH_PORT, "-j", "DROP"], check=False) + subprocess.run(["iptables", "-D", "OUTPUT", "-p", "tcp", "--sport", ZENOH_PORT, "-j", "DROP"], check=False) + + +def wait_messages(client_output, messages): + start_time = time.time() + while time.time() - start_time < WAIT_MESSAGE_TIMEOUT_S: + if any(message in line for line in client_output for message in messages): + return True + time.sleep(1) + return False + + +def wait_connect(client_output): + if wait_messages(client_output, CONNECT_MESSAGES): + print("Initial connection successful.") + else: + raise Exception("Connection failed.") + + +def wait_reconnect(client_output): + if wait_messages(client_output, CONNECT_MESSAGES): + print("Connection restored successfully.") + else: + raise Exception("Failed to restore connection.") + + +def wait_disconnect(client_output): + if wait_messages(client_output, DISCONNECT_MESSAGES): + print("Connection lost successfully.") + else: + raise Exception("Failed to block connection.") + + +def check_router_errors(router_output): + for line in router_output: + if ROUTER_ERROR_MESSAGE in line: + print(line) + raise Exception("Router have an error.") + + +def test_connection_drop(router_command, client_command, timeout): + print(f"Drop test {client_command} for timeout {timeout}") + router_output = [] + client_output = [] + process_list = [] + blocked = False + try: + run_background(router_command, router_output, process_list) + time.sleep(ROUTER_INIT_TIMEOUT_S) + + run_background(client_command, client_output, process_list) + + # Two iterations because there was an error on the second reconnection + for _ in range(2): + wait_connect(client_output) + client_output.clear() + + print("Blocking connection...") + block_connection() + blocked = True + + time.sleep(timeout) + + wait_disconnect(client_output) + client_output.clear() + + print("Unblocking connection...") + unblock_connection() + blocked = False + + wait_reconnect(client_output) + + check_router_errors(router_output) + + print(f"Drop test {client_command} for timeout {timeout} passed") + finally: + if blocked: + unblock_connection() + terminate_processes(process_list) + + +def test_router_restart(router_command, client_command, timeout): + print(f"Restart test {client_command} for timeout {timeout}") + router_output = [] + client_output = [] + router_process_list = [] + client_process_list = [] + try: + run_background(router_command, router_output, router_process_list) + time.sleep(ROUTER_INIT_TIMEOUT_S) + + run_background(client_command, client_output, client_process_list) + + wait_connect(client_output) + client_output.clear() + + print("Stop router...") + terminate_processes(router_process_list) + + time.sleep(timeout) + + wait_disconnect(client_output) + client_output.clear() + + print("Start router...") + run_background(router_command, router_output, router_process_list) + time.sleep(ROUTER_INIT_TIMEOUT_S) + + wait_reconnect(client_output) + + check_router_errors(router_output) + + print(f"Restart test {client_command} for timeout {timeout} passed") + finally: + terminate_processes(client_process_list + router_process_list) + + +def main(): + if len(sys.argv) != 2: + print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd") + sys.exit(1) + + router_command = STDBUF_CMD + [sys.argv[1]] + ROUTER_ARGS + + # timeout less than sesson timeout + test_connection_drop(router_command, ACTIVE_CLIENT_COMMAND, 15) + test_connection_drop(router_command, PASSIVE_CLIENT_COMMAND, 15) + + # timeout more than sesson timeout + test_connection_drop(router_command, ACTIVE_CLIENT_COMMAND, 15) + test_connection_drop(router_command, PASSIVE_CLIENT_COMMAND, 15) + + # timeout less than sesson timeout + test_router_restart(router_command, ACTIVE_CLIENT_COMMAND, 8) + test_router_restart(router_command, PASSIVE_CLIENT_COMMAND, 8) + + # timeout more than sesson timeout + test_router_restart(router_command, ACTIVE_CLIENT_COMMAND, 15) + test_router_restart(router_command, PASSIVE_CLIENT_COMMAND, 15) + + +if __name__ == "__main__": + main()