From 43dd763c64025739e6e4f4f0d2c69133e217b731 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 20 Nov 2024 21:50:37 +0100 Subject: [PATCH 1/4] add querying subscriber support; add missing get options; --- docs/ext.rst | 4 + examples/zenohc/z_pub_cache.cxx | 3 +- examples/zenohc/z_query_sub.cxx | 80 +++++++ include/zenoh/api/enums.hxx | 8 + include/zenoh/api/ext/querying_subscriber.hxx | 191 +++++++++++++++++ include/zenoh/api/get.hxx | 72 +++++++ include/zenoh/api/session.hxx | 198 ++++++++++++++---- 7 files changed, 517 insertions(+), 39 deletions(-) create mode 100644 examples/zenohc/z_query_sub.cxx create mode 100644 include/zenoh/api/ext/querying_subscriber.hxx create mode 100644 include/zenoh/api/get.hxx diff --git a/docs/ext.rst b/docs/ext.rst index acd8d1c8..abbcbaac 100644 --- a/docs/ext.rst +++ b/docs/ext.rst @@ -17,5 +17,9 @@ Extensions Extra functionality, which is not a part of core Zenoh API. .. doxygenclass:: zenoh::ext::PublicationCache + :members: + :membergroups: Constructors Operators Methods + +.. doxygenclass:: zenoh::ext::QueryingSubscriber :members: :membergroups: Constructors Operators Methods \ No newline at end of file diff --git a/examples/zenohc/z_pub_cache.cxx b/examples/zenohc/z_pub_cache.cxx index f037348e..497f9390 100644 --- a/examples/zenohc/z_pub_cache.cxx +++ b/examples/zenohc/z_pub_cache.cxx @@ -1,5 +1,5 @@ // -// Copyright (c) 2022 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -46,6 +46,7 @@ int _main(int argc, char **argv) { std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl; Session::PublicationCacheOptions opts; opts.history = std::atoi(history); + opts.queryable_complete = true; if (!std::string(prefix).empty()) { opts.queryable_prefix = KeyExpr(prefix); } diff --git a/examples/zenohc/z_query_sub.cxx b/examples/zenohc/z_query_sub.cxx new file mode 100644 index 00000000..c7419ed1 --- /dev/null +++ b/examples/zenohc/z_query_sub.cxx @@ -0,0 +1,80 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include +#include +#include +#include +#include + +#include "../getargs.h" +#include "zenoh.hxx" + +using namespace zenoh; +using namespace std::chrono_literals; + +const char *default_keyexpr = "demo/example/**"; +const char *default_query = ""; + +const char *kind_to_str(SampleKind kind) { + switch (kind) { + case SampleKind::Z_SAMPLE_KIND_PUT: + return "PUT"; + case SampleKind::Z_SAMPLE_KIND_DELETE: + return "DELETE"; + default: + return "UNKNOWN"; + } +} + +int _main(int argc, char **argv) { + const char *keyexpr = default_keyexpr; + const char *query = default_query; + Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}}, {{"-q", {"query", &query}}}); + + std::cout << "Opening session..." << std::endl; + auto session = Session::open(std::move(config)); + + std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'" + << std::endl; + Session::QueryingSubscriberOptions opts; + + if (!std::string(query).empty()) { + opts.query_keyexpr = KeyExpr(query); + opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY; + } + auto querying_subscriber = session.declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts)); + + std::cout << "Press CTRL-C to quit..." << std::endl; + for (auto res = querying_subscriber.handler().recv(); std::holds_alternative(res); + res = querying_subscriber.handler().recv()) { + const auto &sample = std::get(res); + std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('" + << sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')" + << std::endl; + } + + return 0; +} + +int main(int argc, char **argv) { + try { + init_log_from_env_or("error"); + _main(argc, argv); + } catch (ZException e) { + std::cout << "Received an error :" << e.what() << "\n"; + } +} diff --git a/include/zenoh/api/enums.hxx b/include/zenoh/api/enums.hxx index f631f44c..a18cd787 100644 --- a/include/zenoh/api/enums.hxx +++ b/include/zenoh/api/enums.hxx @@ -110,6 +110,14 @@ inline std::string_view whatami_as_str(WhatAmI whatami) { /// - **ZC_LOCALITY_SESSION_LOCAL**: Only from local sessions. /// - **ZC_LOCALITY_SESSION_REMOTE**: Only from remote sessions. typedef ::zc_locality_t Locality; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief Key expressions types to which Queryable should reply to. +/// +/// Values: +/// - **ZC_REPLY_KEYEXPR_ANY**: Replies to any key expression queries. +/// - **ZC_REPLY_KEYEXPR_MATCHING_QUERY**: Replies only to queries with intersecting key expressions. +typedef ::zc_reply_keyexpr_t ReplyKeyExpr; #endif } // namespace zenoh \ No newline at end of file diff --git a/include/zenoh/api/ext/querying_subscriber.hxx b/include/zenoh/api/ext/querying_subscriber.hxx new file mode 100644 index 00000000..688acb33 --- /dev/null +++ b/include/zenoh/api/ext/querying_subscriber.hxx @@ -0,0 +1,191 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "../base.hxx" +#include "../get.hxx" +#include "../interop.hxx" +#include "../keyexpr.hxx" + +namespace zenoh { +namespace ext { + +namespace detail { +class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { + protected: + QueryingSubscriberBase(zenoh::detail::null_object_t) : Owned(nullptr){}; + QueryingSubscriberBase(::ze_owned_querying_subscriber_t* qs) : Owned(qs){}; + + public: + /// @name Methods + + using GetOptions = zenoh::GetOptions; + /// @brief Make querying subscriber perform an additional query on a specified selector. + /// The queried samples will be merged with the received publications and made available in the subscriber callback. + /// @param key_expr the key expression matching resources to query. + /// @param options query options. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void get(const KeyExpr& key_expr, GetOptions&& options = GetOptions::create_default(), + ZResult* err = nullptr) const { + ::z_get_options_t opts; + z_get_options_default(&opts); + opts.target = options.target; + opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation); + opts.payload = interop::as_moved_c_ptr(options.payload); + opts.encoding = interop::as_moved_c_ptr(options.encoding); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; +#endif + opts.attachment = interop::as_moved_c_ptr(options.attachment); + opts.timeout_ms = options.timeout_ms; + + ZResult res = + ::ze_querying_subscriber_get(interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to perform get operation"); + } + + friend class zenoh::Session; +}; + +} // namespace detail + +template +class QueryingSubscriber; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh querying subscriber. +/// +/// In addition to receiving the data it is subscribed to, +/// it also will fetch data from a Queryable at startup and peridodically (using `QueryingSubscriber::get`). +/// @note Zenoh-c only +template <> +class QueryingSubscriber : public detail::QueryingSubscriberBase { + protected: + using QueryingSubscriberBase::QueryingSubscriberBase; + friend class Session; + friend struct interop::detail::Converter; + + public: + /// @name Methods + using QueryingSubscriberBase::get; + using QueryingSubscriberBase::GetOptions; + + /// @brief Undeclare publication cache. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Querying Subscriber"); + } +}; + +/// @brief A Zenoh subscriber. Destroying or undeclaring subscriber cancels the subscription. + +/// Constructed by ``Session::declare_subscriber`` method. +/// @tparam Handler streaming handler exposing data. If `void`, no handler access is provided and instead data is being +/// processed inside the callback. +template +class QueryingSubscriber : public detail::QueryingSubscriberBase { + Handler _handler; + + public: + /// @name Constructors + + /// @brief Construct stream querying subscriber from callback querying subscriber and handler. + /// + /// @param qs callback querying subscriber, that should expose data to the handler in its callback. + /// @param handler handler to access data exposed by `s`. Zenoh handlers implement + /// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own + /// interface. + QueryingSubscriber(QueryingSubscriber&& qs, Handler handler) + : QueryingSubscriberBase(interop::as_owned_c_ptr(qs)), _handler(std::move(handler)) {} + + /// @brief Undeclare querying subscriber, and return its handler, which can still be used to process any messages + /// received prior to undeclaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + Handler undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Querying Subscriber"); + return std::move(this->_handler); + } + + /// @brief Return the handler to subscriber data stream. + const Handler& handler() const { return _handler; }; + friend class Session; +}; + +} // namespace ext + +namespace interop { +/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_owned_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_owned_c_ptr(const zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_loaned_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_loaned_c_ptr(const zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback. +template >> +auto as_moved_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_moved_c_ptr(static_cast(s)), + as_moved_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback. +/// Will return a pair of null pointers if option is empty. +template >> +auto as_moved_c_ptr(std::optional>& s) -> decltype(as_moved_c_ptr(s.value())) { + if (!s.has_value()) { + return as_moved_c_ptr(s.value()); + } else { + return {}; + } +} + +/// @brief Move querying subscriber and its handler to a pair containing corresponding zenoh-c structs. +template >> +auto move_to_c_obj(zenoh::ext::QueryingSubscriber&& s) { + return std::make_pair(move_to_c_obj(std::move(static_cast(s))), + move_to_c_obj(std::move(const_cast(s)))); +} +} // namespace interop + +} // namespace zenoh +#endif diff --git a/include/zenoh/api/get.hxx b/include/zenoh/api/get.hxx new file mode 100644 index 00000000..711b71a1 --- /dev/null +++ b/include/zenoh/api/get.hxx @@ -0,0 +1,72 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +#pragma once + +#include "bytes.hxx" +#include "enums.hxx" +#include "query_consolidation.hxx" +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "source_info.hxx" +#endif + +namespace zenoh { +#if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 +/// @brief Options passed to the ``Session::get`` operation. +struct GetOptions { + /// @name Fields + + /// @brief The Queryables that should be target of the query. + QueryTarget target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + /// @brief The replies consolidation strategy to apply on replies to the query. + QueryConsolidation consolidation = QueryConsolidation(); + /// @brief The priority of the get message. + Priority priority = Z_PRIORITY_DEFAULT; + /// @brief The congestion control to apply when routing get message. + CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; + /// @brief Whether Zenoh will NOT wait to batch get message with others to reduce the bandwith. + bool is_express = false; + /// @brief An optional payload of the query. + std::optional payload = {}; + /// @brief An optional encoding of the query payload and/or attachment. + std::optional encoding = {}; +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief The source info for the query. + std::optional source_info = {}; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// + /// @brief The accepted replies for the query. + /// @note Zenoh-c only. + ReplyKeyExpr accept_replies = ::zc_reply_keyexpr_default(); + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Allowed destination. + /// @note Zenoh-c only. + Locality allowed_destination = ::zc_locality_default(); +#endif + + /// @brief An optional attachment to the query. + std::optional attachment = {}; + /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. + uint64_t timeout_ms = 0; + + /// @name Methods + /// @brief Create default option settings. + static GetOptions create_default() { return {}; } +}; +#endif +} // namespace zenoh \ No newline at end of file diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index 4555e0da..1d194a3f 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -21,6 +21,7 @@ #include "closures.hxx" #include "config.hxx" #include "enums.hxx" +#include "get.hxx" #include "id.hxx" #include "interop.hxx" #include "liveliness.hxx" @@ -33,6 +34,7 @@ #endif #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "ext/publication_cache.hxx" +#include "ext/querying_subscriber.hxx" #endif #include @@ -162,39 +164,7 @@ class Session : public Owned<::z_owned_session_t> { err, "Failed to undeclare key expression"); } #if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 - /// @brief Options passed to the ``get`` operation. - struct GetOptions { - /// @name Fields - - /// @brief The Queryables that should be target of the query. - QueryTarget target = QueryTarget::Z_QUERY_TARGET_ALL; - /// @brief The replies consolidation strategy to apply on replies to the query. - QueryConsolidation consolidation = QueryConsolidation(); - /// @brief The priority of the get message. - Priority priority = Z_PRIORITY_DEFAULT; - /// @brief The congestion control to apply when routing get message. - CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; - /// @brief Whether Zenoh will NOT wait to batch get message with others to reduce the bandwith. - bool is_express = false; - /// @brief An optional payload of the query. - std::optional payload = {}; - /// @brief An optional encoding of the query payload and/or attachment. - std::optional encoding = {}; -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief The source info for the query. - std::optional source_info = {}; -#endif - /// @brief An optional attachment to the query. - std::optional attachment = {}; - /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. - uint64_t timeout_ms = 0; - - /// @name Methods - /// @brief Create default option settings. - static GetOptions create_default() { return {}; } - }; + using GetOptions = zenoh::GetOptions; /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. /// @param key_expr ``KeyExpr`` the key expression matching resources to query. @@ -228,6 +198,8 @@ class Session : public Owned<::z_owned_session_t> { opts.encoding = interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; #endif opts.attachment = interop::as_moved_c_ptr(options.attachment); opts.timeout_ms = options.timeout_ms; @@ -261,6 +233,8 @@ class Session : public Owned<::z_owned_session_t> { opts.encoding = interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; #endif opts.attachment = interop::as_moved_c_ptr(options.attachment); opts.timeout_ms = options.timeout_ms; @@ -1009,7 +983,7 @@ class Session : public Owned<::z_owned_session_t> { #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. - /// @brief Options passed to the `ze_declare_publication_cache()` function. + /// @brief Options passed to the ``Session::declare_publication_cache``. /// @note Zenoh-c only. struct PublicationCacheOptions { /// The prefix used for queryable. @@ -1040,7 +1014,7 @@ class Session : public Owned<::z_owned_session_t> { /// @return declared ``zenoh::ext::PublicationCache`` instance. [[nodiscard]] ext::PublicationCache declare_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) { + ZResult* err = nullptr) const { ::ze_publication_cache_options_t opts; ze_publication_cache_options_default(&opts); opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); @@ -1061,13 +1035,13 @@ class Session : public Owned<::z_owned_session_t> { /// release. /// @brief Declare a background publication cache. It will function in background until the corresponding session /// is closed or destoryed. - /// @param key_expr: The key expression to publish to. - /// @param options: Additional options for the publication cache. + /// @param key_expr the key expression to publish to. + /// @param options additional options for the publication cache. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. void declare_background_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) { + ZResult* err = nullptr) const { ::ze_publication_cache_options_t opts; ze_publication_cache_options_default(&opts); opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); @@ -1082,6 +1056,154 @@ class Session : public Owned<::z_owned_session_t> { __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Publication Cache"); } + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Options passed to the ``Session::declare_querying_subscriber``. + /// @note Zenoh-c only. + struct QueryingSubscriberOptions { + /// @name Fields + + /// The key expression to be used for queries. + std::optional query_keyexpr = {}; +#if defined(Z_FEATURE_UNSTABLE_API) + /// The restriction for the matching publications that will be received by this publication cache. + Locality allowed_origin = ::zc_locality_default(); + /// The accepted replies for queries. + ReplyKeyExpr query_accept_replies = ::zc_reply_keyexpr_default(); +#endif + /// @brief The target to be used for queries. + QueryTarget query_target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + /// @brief The consolidation mode to be used for queries. + QueryConsolidation query_consolidation = QueryConsolidation(ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + /// @brief The timeout to be used for queries. + uint64_t query_timeout_ms = 0; + + /// @name Methods + /// @brief Create default option settings. + static QueryingSubscriberOptions create_default() { return {}; } + }; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct and declare a querying subscriber. + /// @param key_expr the key expression to subscribe to. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options additional options for querying subscriber. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return declared ``zenoh::ext::QueryingSubscriber`` instance. + template + [[nodiscard]] ext::QueryingSubscriber declare_querying_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + return qs; + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Declare a background querying subscriber for a given key expression. Subscriber callback will be called + /// to process the messages, until the corresponding session is closed or dropped. + /// @param key_expr the key expression to subscribe to. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options additional options for querying subscriber. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + template + void declare_background_querying_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + ; + opts.query_timeout_ms = options.query_timeout_ms; + ZResult res = ::ze_declare_background_querying_subscriber( + interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct and declare a querying subscriber. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param key_expr the key expression to subscriber to. + /// @param channel an instance of channel. + /// @param options options to pass to querying subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``zenoh::ext::QueryingSubscriber`` object. + template + [[nodiscard]] ext::QueryingSubscriber> declare_querying_subscriber( + const KeyExpr& key_expr, Channel channel, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), + ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return ext::QueryingSubscriber>( + std::move(qs), std::move(cb_handler_pair.second)); + } #endif }; } // namespace zenoh From c3c3fa859143f33e6760bf489bdad7ba2da334cc Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 20 Nov 2024 23:19:51 +0100 Subject: [PATCH 2/4] docs fix --- docs/Doxyfile | 2 +- docs/session.rst | 5 +++++ include/zenoh/api/ext/querying_subscriber.hxx | 19 ++++++++++++------- include/zenoh/api/get.hxx | 3 ++- include/zenoh/api/keyexpr.hxx | 2 +- include/zenoh/api/publisher.hxx | 4 ++++ include/zenoh/api/session.hxx | 13 +++++++++++-- include/zenoh/api/subscriber.hxx | 2 -- 8 files changed, 36 insertions(+), 14 deletions(-) diff --git a/docs/Doxyfile b/docs/Doxyfile index 8da316b0..26c2621a 100644 --- a/docs/Doxyfile +++ b/docs/Doxyfile @@ -165,7 +165,7 @@ ALWAYS_DETAILED_SEC = NO # operators of the base classes will not be shown. # The default value is: NO. -INLINE_INHERITED_MEMB = NO +INLINE_INHERITED_MEMB = YES # If the FULL_PATH_NAMES tag is set to YES, doxygen will prepend the full path # before files name in the file list and in the header files. If set to NO the diff --git a/docs/session.rst b/docs/session.rst index a729afa5..dc093998 100644 --- a/docs/session.rst +++ b/docs/session.rst @@ -25,3 +25,8 @@ Session :members: :membergroups: Constructors Operators Methods Fields + +.. doxygenstruct:: zenoh::GetOptions + :members: + :membergroups: Constructors Operators Methods Fields + diff --git a/include/zenoh/api/ext/querying_subscriber.hxx b/include/zenoh/api/ext/querying_subscriber.hxx index 688acb33..d388382a 100644 --- a/include/zenoh/api/ext/querying_subscriber.hxx +++ b/include/zenoh/api/ext/querying_subscriber.hxx @@ -31,14 +31,16 @@ class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { public: /// @name Methods + ///@copydoc zenoh::GetOptions using GetOptions = zenoh::GetOptions; + /// @brief Make querying subscriber perform an additional query on a specified selector. /// The queried samples will be merged with the received publications and made available in the subscriber callback. /// @param key_expr the key expression matching resources to query. /// @param options query options. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. - void get(const KeyExpr& key_expr, GetOptions&& options = GetOptions::create_default(), + void get(const KeyExpr& key_expr, zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), ZResult* err = nullptr) const { ::z_get_options_t opts; z_get_options_default(&opts); @@ -72,7 +74,7 @@ class QueryingSubscriber; /// /// In addition to receiving the data it is subscribed to, /// it also will fetch data from a Queryable at startup and peridodically (using `QueryingSubscriber::get`). -/// @note Zenoh-c only +/// @note Zenoh-c only. template <> class QueryingSubscriber : public detail::QueryingSubscriberBase { protected: @@ -82,8 +84,6 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { public: /// @name Methods - using QueryingSubscriberBase::get; - using QueryingSubscriberBase::GetOptions; /// @brief Undeclare publication cache. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be @@ -94,9 +94,12 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { } }; -/// @brief A Zenoh subscriber. Destroying or undeclaring subscriber cancels the subscription. - -/// Constructed by ``Session::declare_subscriber`` method. +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh querying subscriber declared by ``zenoh::Session::declare_querying_subscriber``. +/// +/// In addition to receiving the data it is subscribed to, +/// it also will fetch data from a Queryable at startup and peridodically (using ``QueryingSubscriber::get``). +/// @note Zenoh-c only. /// @tparam Handler streaming handler exposing data. If `void`, no handler access is provided and instead data is being /// processed inside the callback. template @@ -115,6 +118,8 @@ class QueryingSubscriber : public detail::QueryingSubscriberBase { QueryingSubscriber(QueryingSubscriber&& qs, Handler handler) : QueryingSubscriberBase(interop::as_owned_c_ptr(qs)), _handler(std::move(handler)) {} + /// @name Methods + /// @brief Undeclare querying subscriber, and return its handler, which can still be used to process any messages /// received prior to undeclaration. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be diff --git a/include/zenoh/api/get.hxx b/include/zenoh/api/get.hxx index 711b71a1..c6346b91 100644 --- a/include/zenoh/api/get.hxx +++ b/include/zenoh/api/get.hxx @@ -65,8 +65,9 @@ struct GetOptions { uint64_t timeout_ms = 0; /// @name Methods + /// @brief Create default option settings. static GetOptions create_default() { return {}; } }; #endif -} // namespace zenoh \ No newline at end of file +} // namespace zenoh diff --git a/include/zenoh/api/keyexpr.hxx b/include/zenoh/api/keyexpr.hxx index 88bbefe9..75f6a47e 100644 --- a/include/zenoh/api/keyexpr.hxx +++ b/include/zenoh/api/keyexpr.hxx @@ -22,7 +22,7 @@ namespace zenoh { /// @brief A Zenoh key expression . /// -/// Key expression can be registered in the ``zenoh::Session`` object with ``zenoh::Session::declare_keyexpr()`` method. +/// Key expression can be registered in the ``zenoh::Session`` object with ``zenoh::Session::declare_keyexpr`` method. /// The unique id is internally assinged to the key expression string in this case. This allows to reduce bandwith usage /// when transporting key expressions. diff --git a/include/zenoh/api/publisher.hxx b/include/zenoh/api/publisher.hxx index 05cd36fa..ee7b98e1 100644 --- a/include/zenoh/api/publisher.hxx +++ b/include/zenoh/api/publisher.hxx @@ -71,9 +71,13 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// @brief Options to be passed to ``Publisher::delete_resource`` operation. struct DeleteOptions { + /// @name Fields + /// @brief The timestamp of this message. std::optional timestamp = {}; + /// @name Methods + /// @brief Create default option settings. static DeleteOptions create_default() { return {}; } }; diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index ff9b5ca2..bf44c3a3 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -164,6 +164,7 @@ class Session : public Owned<::z_owned_session_t> { err, "Failed to undeclare key expression"); } #if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 + /// @copydoc zenoh::GetOptions using GetOptions = zenoh::GetOptions; /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. @@ -176,7 +177,7 @@ class Session : public Owned<::z_owned_session_t> { /// thrown in case of error. template void get(const KeyExpr& key_expr, const std::string& parameters, C&& on_reply, D&& on_drop, - GetOptions&& options = GetOptions::create_default(), ZResult* err = nullptr) const { + zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), ZResult* err = nullptr) const { static_assert(std::is_invocable_r::value, "on_reply should be callable with the following signature: void on_reply(zenoh::Reply& reply)"); static_assert(std::is_invocable_r::value, @@ -222,7 +223,7 @@ class Session : public Owned<::z_owned_session_t> { template typename Channel::template HandlerType get(const KeyExpr& key_expr, const std::string& parameters, Channel channel, - GetOptions&& options = GetOptions::create_default(), + zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), ZResult* err = nullptr) const { auto cb_handler_pair = channel.template into_cb_handler_pair(); ::z_get_options_t opts; @@ -255,6 +256,7 @@ class Session : public Owned<::z_owned_session_t> { bool complete = false; /// @name Methods + /// @brief Create default option settings. static QueryableOptions create_default() { return {}; } }; @@ -356,6 +358,7 @@ class Session : public Owned<::z_owned_session_t> { /// @name Fields /// @name Methods + /// @brief Create default option settings. static SubscriberOptions create_default() { return {}; } }; @@ -475,6 +478,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional timestamp = {}; /// @name Methods + /// @brief Create default option settings. static DeleteOptions create_default() { return {}; } }; @@ -537,6 +541,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional attachment = {}; /// @name Methods + /// @brief Create default option settings. static PutOptions create_default() { return {}; } }; @@ -596,6 +601,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional encoding = {}; /// @name Methods + /// @brief Create default option settings. static PublisherOptions create_default() { return {}; } }; @@ -892,6 +898,7 @@ class Session : public Owned<::z_owned_session_t> { uint32_t timeout_ms = 10000; /// @name Methods + /// @brief Create default option settings. static LivelinessGetOptions create_default() { return {}; } }; @@ -1000,6 +1007,7 @@ class Session : public Owned<::z_owned_session_t> { size_t resources_limit = 0; /// @name Methods + /// @brief Create default option settings. static PublicationCacheOptions create_default() { return {}; } }; @@ -1080,6 +1088,7 @@ class Session : public Owned<::z_owned_session_t> { uint64_t query_timeout_ms = 0; /// @name Methods + /// @brief Create default option settings. static QueryingSubscriberOptions create_default() { return {}; } }; diff --git a/include/zenoh/api/subscriber.hxx b/include/zenoh/api/subscriber.hxx index db51c83b..b7ce438f 100644 --- a/include/zenoh/api/subscriber.hxx +++ b/include/zenoh/api/subscriber.hxx @@ -50,7 +50,6 @@ class Subscriber : public detail::SubscriberBase { public: /// @name Methods - using SubscriberBase::get_keyexpr; /// @brief Undeclare subscriber. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be @@ -83,7 +82,6 @@ class Subscriber : public detail::SubscriberBase { : SubscriberBase(interop::as_owned_c_ptr(s)), _handler(std::move(handler)) {} /// @name Methods - using SubscriberBase::get_keyexpr; /// @brief Undeclare subscriber, and return its handler, which can still be used to process any messages received /// prior to undeclaration. From 982e4695edfdd506cd6f734a874a0f956fa4b80f Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 20 Nov 2024 23:22:45 +0100 Subject: [PATCH 3/4] do not build querying subscriber example when unstable is disabled --- examples/CMakeLists.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bbbda6e6..b49539b5 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -60,6 +60,9 @@ function(add_examples glob mode lib) if(${file} MATCHES ".*liveliness.*$") continue() endif() + if(${file} MATCHES ".*query_sub.*$") + continue() + endif() if(${file} MATCHES ".*pub_cache.*$") continue() endif() From c45d01213a915519664de9c699630ff5b5d9964e Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Thu, 21 Nov 2024 13:54:23 +0100 Subject: [PATCH 4/4] move GetOptions back under Session:: scope --- docs/session.rst | 6 - include/zenoh/api.hxx | 1 + include/zenoh/api/ext/querying_subscriber.hxx | 94 +++++++++++- include/zenoh/api/get.hxx | 73 --------- include/zenoh/api/session.hxx | 145 ++++++++---------- 5 files changed, 151 insertions(+), 168 deletions(-) delete mode 100644 include/zenoh/api/get.hxx diff --git a/docs/session.rst b/docs/session.rst index dc093998..cf0a9c34 100644 --- a/docs/session.rst +++ b/docs/session.rst @@ -24,9 +24,3 @@ Session .. doxygenclass:: zenoh::Session :members: :membergroups: Constructors Operators Methods Fields - - -.. doxygenstruct:: zenoh::GetOptions - :members: - :membergroups: Constructors Operators Methods Fields - diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index 81cefda3..45757ee1 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -40,4 +40,5 @@ #include "api/ext/serialization.hxx" #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "api/ext/publication_cache.hxx" +#include "api/ext/querying_subscriber.hxx" #endif diff --git a/include/zenoh/api/ext/querying_subscriber.hxx b/include/zenoh/api/ext/querying_subscriber.hxx index d388382a..0c4feb7b 100644 --- a/include/zenoh/api/ext/querying_subscriber.hxx +++ b/include/zenoh/api/ext/querying_subscriber.hxx @@ -15,9 +15,9 @@ #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "../base.hxx" -#include "../get.hxx" #include "../interop.hxx" #include "../keyexpr.hxx" +#include "../session.hxx" namespace zenoh { namespace ext { @@ -31,16 +31,14 @@ class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { public: /// @name Methods - ///@copydoc zenoh::GetOptions - using GetOptions = zenoh::GetOptions; - /// @brief Make querying subscriber perform an additional query on a specified selector. /// The queried samples will be merged with the received publications and made available in the subscriber callback. /// @param key_expr the key expression matching resources to query. /// @param options query options. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. - void get(const KeyExpr& key_expr, zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), + void get(const KeyExpr& key_expr, + zenoh::Session::GetOptions&& options = zenoh::Session::GetOptions::create_default(), ZResult* err = nullptr) const { ::z_get_options_t opts; z_get_options_default(&opts); @@ -192,5 +190,91 @@ auto move_to_c_obj(zenoh::ext::QueryingSubscriber&& s) { } } // namespace interop +template +[[nodiscard]] ext::QueryingSubscriber Session::declare_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, + D&& on_drop, + QueryingSubscriberOptions&& options, + ZResult* err) const { + static_assert(std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + return qs; +} + +template +void Session::declare_background_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options, ZResult* err) const { + static_assert(std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + ; + opts.query_timeout_ms = options.query_timeout_ms; + ZResult res = ::ze_declare_background_querying_subscriber( + interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); +} + +template +[[nodiscard]] ext::QueryingSubscriber> +Session::declare_querying_subscriber(const KeyExpr& key_expr, Channel channel, QueryingSubscriberOptions&& options, + ZResult* err) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = + ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return ext::QueryingSubscriber>(std::move(qs), + std::move(cb_handler_pair.second)); +} + } // namespace zenoh #endif diff --git a/include/zenoh/api/get.hxx b/include/zenoh/api/get.hxx deleted file mode 100644 index c6346b91..00000000 --- a/include/zenoh/api/get.hxx +++ /dev/null @@ -1,73 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -#pragma once - -#include "bytes.hxx" -#include "enums.hxx" -#include "query_consolidation.hxx" -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) -#include "source_info.hxx" -#endif - -namespace zenoh { -#if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 -/// @brief Options passed to the ``Session::get`` operation. -struct GetOptions { - /// @name Fields - - /// @brief The Queryables that should be target of the query. - QueryTarget target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; - /// @brief The replies consolidation strategy to apply on replies to the query. - QueryConsolidation consolidation = QueryConsolidation(); - /// @brief The priority of the get message. - Priority priority = Z_PRIORITY_DEFAULT; - /// @brief The congestion control to apply when routing get message. - CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; - /// @brief Whether Zenoh will NOT wait to batch get message with others to reduce the bandwith. - bool is_express = false; - /// @brief An optional payload of the query. - std::optional payload = {}; - /// @brief An optional encoding of the query payload and/or attachment. - std::optional encoding = {}; -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief The source info for the query. - std::optional source_info = {}; - - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// - /// @brief The accepted replies for the query. - /// @note Zenoh-c only. - ReplyKeyExpr accept_replies = ::zc_reply_keyexpr_default(); - - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief Allowed destination. - /// @note Zenoh-c only. - Locality allowed_destination = ::zc_locality_default(); -#endif - - /// @brief An optional attachment to the query. - std::optional attachment = {}; - /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. - uint64_t timeout_ms = 0; - - /// @name Methods - - /// @brief Create default option settings. - static GetOptions create_default() { return {}; } -}; -#endif -} // namespace zenoh diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index bf44c3a3..1ec324d9 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -21,9 +21,9 @@ #include "closures.hxx" #include "config.hxx" #include "enums.hxx" -#include "get.hxx" #include "id.hxx" #include "interop.hxx" +#include "keyexpr.hxx" #include "liveliness.hxx" #include "publisher.hxx" #include "query_consolidation.hxx" @@ -34,12 +34,16 @@ #endif #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "ext/publication_cache.hxx" -#include "ext/querying_subscriber.hxx" #endif -#include - namespace zenoh { +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +namespace ext { +template +class QueryingSubscriber; +} +#endif + /// A Zenoh session. class Session : public Owned<::z_owned_session_t> { Session(zenoh::detail::null_object_t) : Owned(nullptr){}; @@ -164,8 +168,54 @@ class Session : public Owned<::z_owned_session_t> { err, "Failed to undeclare key expression"); } #if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 - /// @copydoc zenoh::GetOptions - using GetOptions = zenoh::GetOptions; + /// @brief Options passed to the ``Session::get`` operation. + struct GetOptions { + /// @name Fields + + /// @brief The Queryables that should be target of the query. + QueryTarget target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + /// @brief The replies consolidation strategy to apply on replies to the query. + QueryConsolidation consolidation = QueryConsolidation(); + /// @brief The priority of the get message. + Priority priority = Z_PRIORITY_DEFAULT; + /// @brief The congestion control to apply when routing get message. + CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; + /// @brief Whether Zenoh will NOT wait to batch get message with others to reduce the bandwith. + bool is_express = false; + /// @brief An optional payload of the query. + std::optional payload = {}; + /// @brief An optional encoding of the query payload and/or attachment. + std::optional encoding = {}; +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief The source info for the query. + std::optional source_info = {}; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// + /// @brief The accepted replies for the query. + /// @note Zenoh-c only. + ReplyKeyExpr accept_replies = ::zc_reply_keyexpr_default(); + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Allowed destination. + /// @note Zenoh-c only. + Locality allowed_destination = ::zc_locality_default(); +#endif + + /// @brief An optional attachment to the query. + std::optional attachment = {}; + /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. + uint64_t timeout_ms = 0; + + /// @name Methods + + /// @brief Create default option settings. + static GetOptions create_default() { return {}; } + }; /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. /// @param key_expr ``KeyExpr`` the key expression matching resources to query. @@ -177,7 +227,7 @@ class Session : public Owned<::z_owned_session_t> { /// thrown in case of error. template void get(const KeyExpr& key_expr, const std::string& parameters, C&& on_reply, D&& on_drop, - zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), ZResult* err = nullptr) const { + GetOptions&& options = GetOptions::create_default(), ZResult* err = nullptr) const { static_assert(std::is_invocable_r::value, "on_reply should be callable with the following signature: void on_reply(zenoh::Reply& reply)"); static_assert(std::is_invocable_r::value, @@ -223,7 +273,7 @@ class Session : public Owned<::z_owned_session_t> { template typename Channel::template HandlerType get(const KeyExpr& key_expr, const std::string& parameters, Channel channel, - zenoh::GetOptions&& options = zenoh::GetOptions::create_default(), + GetOptions&& options = GetOptions::create_default(), ZResult* err = nullptr) const { auto cb_handler_pair = channel.template into_cb_handler_pair(); ::z_get_options_t opts; @@ -1107,34 +1157,7 @@ class Session : public Owned<::z_owned_session_t> { [[nodiscard]] ext::QueryingSubscriber declare_querying_subscriber( const KeyExpr& key_expr, C&& on_sample, D&& on_drop, QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const { - static_assert( - std::is_invocable_r::value, - "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); - static_assert(std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()"); - ::z_owned_closure_sample_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - opts.query_timeout_ms = options.query_timeout_ms; - ext::QueryingSubscriber qs = interop::detail::null>(); - ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), - interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); - return qs; - } + ZResult* err = nullptr) const; /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. @@ -1150,33 +1173,7 @@ class Session : public Owned<::z_owned_session_t> { void declare_background_querying_subscriber( const KeyExpr& key_expr, C&& on_sample, D&& on_drop, QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const { - static_assert( - std::is_invocable_r::value, - "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); - static_assert(std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()"); - ::z_owned_closure_sample_t c_closure; - using Cval = std::remove_reference_t; - using Dval = std::remove_reference_t; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - ; - opts.query_timeout_ms = options.query_timeout_ms; - ZResult res = ::ze_declare_background_querying_subscriber( - interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); - } + ZResult* err = nullptr) const; /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. @@ -1193,27 +1190,7 @@ class Session : public Owned<::z_owned_session_t> { [[nodiscard]] ext::QueryingSubscriber> declare_querying_subscriber( const KeyExpr& key_expr, Channel channel, QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), - ZResult* err = nullptr) const { - auto cb_handler_pair = channel.template into_cb_handler_pair(); - ::ze_querying_subscriber_options_t opts; - ze_querying_subscriber_options_default(&opts); - opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); -#if defined(Z_FEATURE_UNSTABLE_API) - opts.allowed_origin = options.allowed_origin; - opts.query_accept_replies = options.query_accept_replies; -#endif - opts.query_target = options.query_target; - opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); - opts.query_timeout_ms = options.query_timeout_ms; - ext::QueryingSubscriber qs = interop::detail::null>(); - ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), - interop::as_loaned_c_ptr(key_expr), - ::z_move(cb_handler_pair.first), &opts); - __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); - if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); - return ext::QueryingSubscriber>( - std::move(qs), std::move(cb_handler_pair.second)); - } + ZResult* err = nullptr) const; #endif /// @brief Check if session is closed.