diff --git a/docs/Doxyfile b/docs/Doxyfile index 8da316b..26c2621 100644 --- a/docs/Doxyfile +++ b/docs/Doxyfile @@ -165,7 +165,7 @@ ALWAYS_DETAILED_SEC = NO # operators of the base classes will not be shown. # The default value is: NO. -INLINE_INHERITED_MEMB = NO +INLINE_INHERITED_MEMB = YES # If the FULL_PATH_NAMES tag is set to YES, doxygen will prepend the full path # before files name in the file list and in the header files. If set to NO the diff --git a/docs/ext.rst b/docs/ext.rst index acd8d1c..abbcbaa 100644 --- a/docs/ext.rst +++ b/docs/ext.rst @@ -17,5 +17,9 @@ Extensions Extra functionality, which is not a part of core Zenoh API. .. doxygenclass:: zenoh::ext::PublicationCache + :members: + :membergroups: Constructors Operators Methods + +.. doxygenclass:: zenoh::ext::QueryingSubscriber :members: :membergroups: Constructors Operators Methods \ No newline at end of file diff --git a/docs/session.rst b/docs/session.rst index a729afa..cf0a9c3 100644 --- a/docs/session.rst +++ b/docs/session.rst @@ -24,4 +24,3 @@ Session .. doxygenclass:: zenoh::Session :members: :membergroups: Constructors Operators Methods Fields - diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bbbda6e..b49539b 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -60,6 +60,9 @@ function(add_examples glob mode lib) if(${file} MATCHES ".*liveliness.*$") continue() endif() + if(${file} MATCHES ".*query_sub.*$") + continue() + endif() if(${file} MATCHES ".*pub_cache.*$") continue() endif() diff --git a/examples/zenohc/z_pub_cache.cxx b/examples/zenohc/z_pub_cache.cxx index f037348..497f939 100644 --- a/examples/zenohc/z_pub_cache.cxx +++ b/examples/zenohc/z_pub_cache.cxx @@ -1,5 +1,5 @@ // -// Copyright (c) 2022 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -46,6 +46,7 @@ int _main(int argc, char **argv) { std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl; Session::PublicationCacheOptions opts; opts.history = std::atoi(history); + opts.queryable_complete = true; if (!std::string(prefix).empty()) { opts.queryable_prefix = KeyExpr(prefix); } diff --git a/examples/zenohc/z_query_sub.cxx b/examples/zenohc/z_query_sub.cxx new file mode 100644 index 0000000..c7419ed --- /dev/null +++ b/examples/zenohc/z_query_sub.cxx @@ -0,0 +1,80 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include +#include +#include +#include +#include + +#include "../getargs.h" +#include "zenoh.hxx" + +using namespace zenoh; +using namespace std::chrono_literals; + +const char *default_keyexpr = "demo/example/**"; +const char *default_query = ""; + +const char *kind_to_str(SampleKind kind) { + switch (kind) { + case SampleKind::Z_SAMPLE_KIND_PUT: + return "PUT"; + case SampleKind::Z_SAMPLE_KIND_DELETE: + return "DELETE"; + default: + return "UNKNOWN"; + } +} + +int _main(int argc, char **argv) { + const char *keyexpr = default_keyexpr; + const char *query = default_query; + Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}}, {{"-q", {"query", &query}}}); + + std::cout << "Opening session..." << std::endl; + auto session = Session::open(std::move(config)); + + std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'" + << std::endl; + Session::QueryingSubscriberOptions opts; + + if (!std::string(query).empty()) { + opts.query_keyexpr = KeyExpr(query); + opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY; + } + auto querying_subscriber = session.declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts)); + + std::cout << "Press CTRL-C to quit..." << std::endl; + for (auto res = querying_subscriber.handler().recv(); std::holds_alternative(res); + res = querying_subscriber.handler().recv()) { + const auto &sample = std::get(res); + std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('" + << sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')" + << std::endl; + } + + return 0; +} + +int main(int argc, char **argv) { + try { + init_log_from_env_or("error"); + _main(argc, argv); + } catch (ZException e) { + std::cout << "Received an error :" << e.what() << "\n"; + } +} diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index 81cefda..45757ee 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -40,4 +40,5 @@ #include "api/ext/serialization.hxx" #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "api/ext/publication_cache.hxx" +#include "api/ext/querying_subscriber.hxx" #endif diff --git a/include/zenoh/api/enums.hxx b/include/zenoh/api/enums.hxx index f631f44..a18cd78 100644 --- a/include/zenoh/api/enums.hxx +++ b/include/zenoh/api/enums.hxx @@ -110,6 +110,14 @@ inline std::string_view whatami_as_str(WhatAmI whatami) { /// - **ZC_LOCALITY_SESSION_LOCAL**: Only from local sessions. /// - **ZC_LOCALITY_SESSION_REMOTE**: Only from remote sessions. typedef ::zc_locality_t Locality; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief Key expressions types to which Queryable should reply to. +/// +/// Values: +/// - **ZC_REPLY_KEYEXPR_ANY**: Replies to any key expression queries. +/// - **ZC_REPLY_KEYEXPR_MATCHING_QUERY**: Replies only to queries with intersecting key expressions. +typedef ::zc_reply_keyexpr_t ReplyKeyExpr; #endif } // namespace zenoh \ No newline at end of file diff --git a/include/zenoh/api/ext/querying_subscriber.hxx b/include/zenoh/api/ext/querying_subscriber.hxx new file mode 100644 index 0000000..0c4feb7 --- /dev/null +++ b/include/zenoh/api/ext/querying_subscriber.hxx @@ -0,0 +1,280 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "../base.hxx" +#include "../interop.hxx" +#include "../keyexpr.hxx" +#include "../session.hxx" + +namespace zenoh { +namespace ext { + +namespace detail { +class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> { + protected: + QueryingSubscriberBase(zenoh::detail::null_object_t) : Owned(nullptr){}; + QueryingSubscriberBase(::ze_owned_querying_subscriber_t* qs) : Owned(qs){}; + + public: + /// @name Methods + + /// @brief Make querying subscriber perform an additional query on a specified selector. + /// The queried samples will be merged with the received publications and made available in the subscriber callback. + /// @param key_expr the key expression matching resources to query. + /// @param options query options. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void get(const KeyExpr& key_expr, + zenoh::Session::GetOptions&& options = zenoh::Session::GetOptions::create_default(), + ZResult* err = nullptr) const { + ::z_get_options_t opts; + z_get_options_default(&opts); + opts.target = options.target; + opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation); + opts.payload = interop::as_moved_c_ptr(options.payload); + opts.encoding = interop::as_moved_c_ptr(options.encoding); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; +#endif + opts.attachment = interop::as_moved_c_ptr(options.attachment); + opts.timeout_ms = options.timeout_ms; + + ZResult res = + ::ze_querying_subscriber_get(interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to perform get operation"); + } + + friend class zenoh::Session; +}; + +} // namespace detail + +template +class QueryingSubscriber; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh querying subscriber. +/// +/// In addition to receiving the data it is subscribed to, +/// it also will fetch data from a Queryable at startup and peridodically (using `QueryingSubscriber::get`). +/// @note Zenoh-c only. +template <> +class QueryingSubscriber : public detail::QueryingSubscriberBase { + protected: + using QueryingSubscriberBase::QueryingSubscriberBase; + friend class Session; + friend struct interop::detail::Converter; + + public: + /// @name Methods + + /// @brief Undeclare publication cache. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Querying Subscriber"); + } +}; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. +/// @brief A Zenoh querying subscriber declared by ``zenoh::Session::declare_querying_subscriber``. +/// +/// In addition to receiving the data it is subscribed to, +/// it also will fetch data from a Queryable at startup and peridodically (using ``QueryingSubscriber::get``). +/// @note Zenoh-c only. +/// @tparam Handler streaming handler exposing data. If `void`, no handler access is provided and instead data is being +/// processed inside the callback. +template +class QueryingSubscriber : public detail::QueryingSubscriberBase { + Handler _handler; + + public: + /// @name Constructors + + /// @brief Construct stream querying subscriber from callback querying subscriber and handler. + /// + /// @param qs callback querying subscriber, that should expose data to the handler in its callback. + /// @param handler handler to access data exposed by `s`. Zenoh handlers implement + /// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own + /// interface. + QueryingSubscriber(QueryingSubscriber&& qs, Handler handler) + : QueryingSubscriberBase(interop::as_owned_c_ptr(qs)), _handler(std::move(handler)) {} + + /// @name Methods + + /// @brief Undeclare querying subscriber, and return its handler, which can still be used to process any messages + /// received prior to undeclaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + Handler undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare Querying Subscriber"); + return std::move(this->_handler); + } + + /// @brief Return the handler to subscriber data stream. + const Handler& handler() const { return _handler; }; + friend class Session; +}; + +} // namespace ext + +namespace interop { +/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_owned_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_owned_c_ptr(const zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_owned_c_ptr(static_cast(s)), + as_owned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_loaned_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback. +template >> +auto as_loaned_c_ptr(const zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_loaned_c_ptr(static_cast(s)), + as_loaned_c_ptr(s.handler())); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback. +template >> +auto as_moved_c_ptr(zenoh::ext::QueryingSubscriber& s) { + return std::make_pair(as_moved_c_ptr(static_cast(s)), + as_moved_c_ptr(const_cast(s.handler()))); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback. +/// Will return a pair of null pointers if option is empty. +template >> +auto as_moved_c_ptr(std::optional>& s) -> decltype(as_moved_c_ptr(s.value())) { + if (!s.has_value()) { + return as_moved_c_ptr(s.value()); + } else { + return {}; + } +} + +/// @brief Move querying subscriber and its handler to a pair containing corresponding zenoh-c structs. +template >> +auto move_to_c_obj(zenoh::ext::QueryingSubscriber&& s) { + return std::make_pair(move_to_c_obj(std::move(static_cast(s))), + move_to_c_obj(std::move(const_cast(s)))); +} +} // namespace interop + +template +[[nodiscard]] ext::QueryingSubscriber Session::declare_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, + D&& on_drop, + QueryingSubscriberOptions&& options, + ZResult* err) const { + static_assert(std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); + return qs; +} + +template +void Session::declare_background_querying_subscriber(const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options, ZResult* err) const { + static_assert(std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_sample_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + ; + opts.query_timeout_ms = options.query_timeout_ms; + ZResult res = ::ze_declare_background_querying_subscriber( + interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Querying Subscriber"); +} + +template +[[nodiscard]] ext::QueryingSubscriber> +Session::declare_querying_subscriber(const KeyExpr& key_expr, Channel channel, QueryingSubscriberOptions&& options, + ZResult* err) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::ze_querying_subscriber_options_t opts; + ze_querying_subscriber_options_default(&opts); + opts.query_selector = interop::as_loaned_c_ptr(options.query_keyexpr); +#if defined(Z_FEATURE_UNSTABLE_API) + opts.allowed_origin = options.allowed_origin; + opts.query_accept_replies = options.query_accept_replies; +#endif + opts.query_target = options.query_target; + opts.query_consolidation = *interop::as_copyable_c_ptr(options.query_consolidation); + opts.query_timeout_ms = options.query_timeout_ms; + ext::QueryingSubscriber qs = interop::detail::null>(); + ZResult res = + ::ze_declare_querying_subscriber(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(qs), + interop::as_loaned_c_ptr(key_expr), ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querying Subscriber"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return ext::QueryingSubscriber>(std::move(qs), + std::move(cb_handler_pair.second)); +} + +} // namespace zenoh +#endif diff --git a/include/zenoh/api/keyexpr.hxx b/include/zenoh/api/keyexpr.hxx index 88bbefe..75f6a47 100644 --- a/include/zenoh/api/keyexpr.hxx +++ b/include/zenoh/api/keyexpr.hxx @@ -22,7 +22,7 @@ namespace zenoh { /// @brief A Zenoh key expression . /// -/// Key expression can be registered in the ``zenoh::Session`` object with ``zenoh::Session::declare_keyexpr()`` method. +/// Key expression can be registered in the ``zenoh::Session`` object with ``zenoh::Session::declare_keyexpr`` method. /// The unique id is internally assinged to the key expression string in this case. This allows to reduce bandwith usage /// when transporting key expressions. diff --git a/include/zenoh/api/publisher.hxx b/include/zenoh/api/publisher.hxx index 05cd36f..ee7b98e 100644 --- a/include/zenoh/api/publisher.hxx +++ b/include/zenoh/api/publisher.hxx @@ -71,9 +71,13 @@ class Publisher : public Owned<::z_owned_publisher_t> { /// @brief Options to be passed to ``Publisher::delete_resource`` operation. struct DeleteOptions { + /// @name Fields + /// @brief The timestamp of this message. std::optional timestamp = {}; + /// @name Methods + /// @brief Create default option settings. static DeleteOptions create_default() { return {}; } }; diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index de601c6..1ec324d 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -23,6 +23,7 @@ #include "enums.hxx" #include "id.hxx" #include "interop.hxx" +#include "keyexpr.hxx" #include "liveliness.hxx" #include "publisher.hxx" #include "query_consolidation.hxx" @@ -35,9 +36,14 @@ #include "ext/publication_cache.hxx" #endif -#include - namespace zenoh { +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +namespace ext { +template +class QueryingSubscriber; +} +#endif + /// A Zenoh session. class Session : public Owned<::z_owned_session_t> { Session(zenoh::detail::null_object_t) : Owned(nullptr){}; @@ -162,12 +168,12 @@ class Session : public Owned<::z_owned_session_t> { err, "Failed to undeclare key expression"); } #if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_QUERY == 1 - /// @brief Options passed to the ``get`` operation. + /// @brief Options passed to the ``Session::get`` operation. struct GetOptions { /// @name Fields /// @brief The Queryables that should be target of the query. - QueryTarget target = QueryTarget::Z_QUERY_TARGET_ALL; + QueryTarget target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; /// @brief The replies consolidation strategy to apply on replies to the query. QueryConsolidation consolidation = QueryConsolidation(); /// @brief The priority of the get message. @@ -185,13 +191,28 @@ class Session : public Owned<::z_owned_session_t> { /// release. /// @brief The source info for the query. std::optional source_info = {}; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// + /// @brief The accepted replies for the query. + /// @note Zenoh-c only. + ReplyKeyExpr accept_replies = ::zc_reply_keyexpr_default(); + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Allowed destination. + /// @note Zenoh-c only. + Locality allowed_destination = ::zc_locality_default(); #endif + /// @brief An optional attachment to the query. std::optional attachment = {}; /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. uint64_t timeout_ms = 0; /// @name Methods + /// @brief Create default option settings. static GetOptions create_default() { return {}; } }; @@ -228,6 +249,8 @@ class Session : public Owned<::z_owned_session_t> { opts.encoding = interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; #endif opts.attachment = interop::as_moved_c_ptr(options.attachment); opts.timeout_ms = options.timeout_ms; @@ -261,6 +284,8 @@ class Session : public Owned<::z_owned_session_t> { opts.encoding = interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) opts.source_info = interop::as_moved_c_ptr(options.source_info); + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; #endif opts.attachment = interop::as_moved_c_ptr(options.attachment); opts.timeout_ms = options.timeout_ms; @@ -281,6 +306,7 @@ class Session : public Owned<::z_owned_session_t> { bool complete = false; /// @name Methods + /// @brief Create default option settings. static QueryableOptions create_default() { return {}; } }; @@ -382,6 +408,7 @@ class Session : public Owned<::z_owned_session_t> { /// @name Fields /// @name Methods + /// @brief Create default option settings. static SubscriberOptions create_default() { return {}; } }; @@ -501,6 +528,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional timestamp = {}; /// @name Methods + /// @brief Create default option settings. static DeleteOptions create_default() { return {}; } }; @@ -563,6 +591,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional attachment = {}; /// @name Methods + /// @brief Create default option settings. static PutOptions create_default() { return {}; } }; @@ -622,6 +651,7 @@ class Session : public Owned<::z_owned_session_t> { std::optional encoding = {}; /// @name Methods + /// @brief Create default option settings. static PublisherOptions create_default() { return {}; } }; @@ -918,6 +948,7 @@ class Session : public Owned<::z_owned_session_t> { uint32_t timeout_ms = 10000; /// @name Methods + /// @brief Create default option settings. static LivelinessGetOptions create_default() { return {}; } }; @@ -1009,7 +1040,7 @@ class Session : public Owned<::z_owned_session_t> { #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. - /// @brief Options passed to the `ze_declare_publication_cache()` function. + /// @brief Options passed to the ``Session::declare_publication_cache``. /// @note Zenoh-c only. struct PublicationCacheOptions { /// The prefix used for queryable. @@ -1026,6 +1057,7 @@ class Session : public Owned<::z_owned_session_t> { size_t resources_limit = 0; /// @name Methods + /// @brief Create default option settings. static PublicationCacheOptions create_default() { return {}; } }; @@ -1041,7 +1073,7 @@ class Session : public Owned<::z_owned_session_t> { /// @note Zenoh-c only. [[nodiscard]] ext::PublicationCache declare_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) { + ZResult* err = nullptr) const { ::ze_publication_cache_options_t opts; ze_publication_cache_options_default(&opts); opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); @@ -1062,13 +1094,13 @@ class Session : public Owned<::z_owned_session_t> { /// release. /// @brief Declare a background publication cache. It will function in background until the corresponding session /// is closed or destoryed. - /// @param key_expr: The key expression to publish to. - /// @param options: Additional options for the publication cache. + /// @param key_expr the key expression to publish to. + /// @param options additional options for the publication cache. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. void declare_background_publication_cache( const KeyExpr& key_expr, PublicationCacheOptions&& options = PublicationCacheOptions::create_default(), - ZResult* err = nullptr) { + ZResult* err = nullptr) const { ::ze_publication_cache_options_t opts; ze_publication_cache_options_default(&opts); opts.queryable_prefix = interop::as_loaned_c_ptr(options.queryable_prefix); @@ -1082,6 +1114,83 @@ class Session : public Owned<::z_owned_session_t> { interop::as_loaned_c_ptr(key_expr), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Publication Cache"); } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Options passed to the ``Session::declare_querying_subscriber``. + /// @note Zenoh-c only. + struct QueryingSubscriberOptions { + /// @name Fields + + /// The key expression to be used for queries. + std::optional query_keyexpr = {}; +#if defined(Z_FEATURE_UNSTABLE_API) + /// The restriction for the matching publications that will be received by this publication cache. + Locality allowed_origin = ::zc_locality_default(); + /// The accepted replies for queries. + ReplyKeyExpr query_accept_replies = ::zc_reply_keyexpr_default(); +#endif + /// @brief The target to be used for queries. + QueryTarget query_target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + /// @brief The consolidation mode to be used for queries. + QueryConsolidation query_consolidation = QueryConsolidation(ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + /// @brief The timeout to be used for queries. + uint64_t query_timeout_ms = 0; + + /// @name Methods + + /// @brief Create default option settings. + static QueryingSubscriberOptions create_default() { return {}; } + }; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct and declare a querying subscriber. + /// @param key_expr the key expression to subscribe to. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options additional options for querying subscriber. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return declared ``zenoh::ext::QueryingSubscriber`` instance. + template + [[nodiscard]] ext::QueryingSubscriber declare_querying_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Declare a background querying subscriber for a given key expression. Subscriber callback will be called + /// to process the messages, until the corresponding session is closed or dropped. + /// @param key_expr the key expression to subscribe to. + /// @param on_sample the callback that will be called for each received sample. + /// @param on_drop the callback that will be called once subscriber is destroyed or undeclared. + /// @param options additional options for querying subscriber. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + template + void declare_background_querying_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const; + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct and declare a querying subscriber. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param key_expr the key expression to subscriber to. + /// @param channel an instance of channel. + /// @param options options to pass to querying subscriber declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``zenoh::ext::QueryingSubscriber`` object. + template + [[nodiscard]] ext::QueryingSubscriber> declare_querying_subscriber( + const KeyExpr& key_expr, Channel channel, + QueryingSubscriberOptions&& options = QueryingSubscriberOptions::create_default(), + ZResult* err = nullptr) const; #endif /// @brief Check if session is closed. diff --git a/include/zenoh/api/subscriber.hxx b/include/zenoh/api/subscriber.hxx index db51c83..b7ce438 100644 --- a/include/zenoh/api/subscriber.hxx +++ b/include/zenoh/api/subscriber.hxx @@ -50,7 +50,6 @@ class Subscriber : public detail::SubscriberBase { public: /// @name Methods - using SubscriberBase::get_keyexpr; /// @brief Undeclare subscriber. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be @@ -83,7 +82,6 @@ class Subscriber : public detail::SubscriberBase { : SubscriberBase(interop::as_owned_c_ptr(s)), _handler(std::move(handler)) {} /// @name Methods - using SubscriberBase::get_keyexpr; /// @brief Undeclare subscriber, and return its handler, which can still be used to process any messages received /// prior to undeclaration.