From e41b980929adc68ae47e5b95d9c479bd01023653 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 2 Dec 2024 15:58:14 +0100 Subject: [PATCH 1/3] add querier --- README.md | 11 +- docs/api.rst | 1 + docs/matching.rst | 11 + docs/publish_subscribe.rst | 2 +- docs/query_reply.rst | 4 + examples/getargs.h | 27 ++ examples/universal/z_get.cxx | 27 +- examples/universal/z_get_attachment.cxx | 23 +- examples/universal/z_get_channel.cxx | 29 +- .../universal/z_get_channel_non_blocking.cxx | 29 +- examples/universal/z_pub.cxx | 14 +- examples/universal/z_queryable.cxx | 8 +- examples/universal/z_queryable_attachment.cxx | 9 +- examples/zenohc/z_pub_shm.cxx | 4 +- examples/zenohc/z_querier.cxx | 112 ++++++++ include/zenoh/api.hxx | 4 + include/zenoh/api/matching.hxx | 157 +++++++++++ include/zenoh/api/publisher.hxx | 81 +++--- include/zenoh/api/querier.hxx | 254 ++++++++++++++++++ include/zenoh/api/session.hxx | 77 ++++++ zenoh-c | 2 +- 21 files changed, 782 insertions(+), 104 deletions(-) create mode 100644 docs/matching.rst create mode 100644 examples/zenohc/z_querier.cxx create mode 100644 include/zenoh/api/matching.hxx create mode 100644 include/zenoh/api/querier.hxx diff --git a/README.md b/README.md index 93a2f4af..1f208fe2 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,16 @@ The `z_pub` should receive message sent by `z_sub`. ./z_get ``` -The `z_get` should receive the data from `z_queryable`. +### Queryable and Querier Example +```bash +./z_queryable +``` + +```bash +./z_querier +``` + +The `z_querier` should continuously send queries and receive replies from `z_queryable`. ### Throughput Examples ```bash diff --git a/docs/api.rst b/docs/api.rst index e48d2a1a..197ef395 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -27,6 +27,7 @@ API Reference scouting publish_subscribe query_reply + matching serialization_deserialization channels interop diff --git a/docs/matching.rst b/docs/matching.rst new file mode 100644 index 00000000..554cae53 --- /dev/null +++ b/docs/matching.rst @@ -0,0 +1,11 @@ +Matching +================= +Classes related to getting information about matching Zenoh entities. + +.. doxygenstruct:: zenoh::MatchingStatus + :members: + :membergroups: Constructors Operators Methods Fields + +.. doxygenclass:: zenoh::MatchingListener + :members: + :membergroups: Constructors Operators Methods \ No newline at end of file diff --git a/docs/publish_subscribe.rst b/docs/publish_subscribe.rst index 69146ed2..68f414d3 100644 --- a/docs/publish_subscribe.rst +++ b/docs/publish_subscribe.rst @@ -18,7 +18,7 @@ Classes related to publish-subscribe pattern. .. doxygenclass:: zenoh::Publisher :members: - :membergroups: Constructors Operators Methods + :membergroups: Constructors Operators Methods Fields .. doxygenclass:: zenoh::Subscriber :members: diff --git a/docs/query_reply.rst b/docs/query_reply.rst index f39dca4f..02f0589e 100644 --- a/docs/query_reply.rst +++ b/docs/query_reply.rst @@ -16,6 +16,10 @@ Query-Reply =========== Classes related to query-reply pattern. +.. doxygenclass:: zenoh::Querier + :members: + :membergroups: Constructors Operators Methods Fields + .. doxygenclass:: zenoh::Queryable :members: :membergroups: Constructors Operators Methods diff --git a/examples/getargs.h b/examples/getargs.h index 047234b5..93cd551b 100644 --- a/examples/getargs.h +++ b/examples/getargs.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -155,4 +156,30 @@ inline zenoh::Config parse_args(int argc, char **argv, const std::vector } #endif return std::move(config); +} + +zenoh::QueryTarget parse_query_target(std::string_view v) { + if (v == "BEST_MATCHING") { + return zenoh::QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + } else if (v == "ALL") { + return zenoh::QueryTarget::Z_QUERY_TARGET_ALL; + } else if (v == "ALL_COMPLETE") { + return zenoh::QueryTarget::Z_QUERY_TARGET_ALL_COMPLETE; + } + + throw std::runtime_error(std::string("Unsupported QueryTarget: ") + std::string(v)); +} + +struct Selector { + std::string key_expr; + std::string parameters; +}; + +Selector parse_selector(const std::string &selector_string) { + size_t pos = selector_string.find('?'); + if (pos == std::string::npos) { + return Selector{selector_string, ""}; + } else { + return Selector{selector_string.substr(0, pos), selector_string.substr(pos + 1)}; + } } \ No newline at end of file diff --git a/examples/universal/z_get.cxx b/examples/universal/z_get.cxx index 57b4d371..8c754a67 100644 --- a/examples/universal/z_get.cxx +++ b/examples/universal/z_get.cxx @@ -25,10 +25,18 @@ using namespace zenoh; int _main(int argc, char **argv) { const char *expr = "demo/example/**"; - const char *value = "Get from C++"; - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}}); + const char *value = nullptr; + const char *target = "BEST_MATCHING"; + const char *timeout = "10000"; - KeyExpr keyexpr(expr); + Config config = parse_args(argc, argv, {}, {}, + {{"-s", CmdArg{"Query selector (string)", &expr}}, + {"-p", CmdArg{"Query payload (string)", &value}}, + {"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}}, + {"-o", CmdArg{"Timeout in ms (number)", &timeout}}}); + uint64_t timeout_ms = std::stoi(timeout); + QueryTarget query_target = parse_query_target(target); + Selector selector = parse_selector(expr); std::cout << "Opening session...\n"; auto session = Session::open(std::move(config)); @@ -55,14 +63,13 @@ int _main(int argc, char **argv) { done_signal.notify_all(); }; -#if __cpp_designated_initializers >= 201707L - session.get(keyexpr, "", on_reply, on_done, {.target = Z_QUERY_TARGET_ALL, .payload = Bytes::serialize(value)}); -#else Session::GetOptions options; - options.target = Z_QUERY_TARGET_ALL; - options.payload = value; - session.get(keyexpr, "", on_reply, on_done, std::move(options)); -#endif + options.target = query_target; + if (value != nullptr) { + options.payload = value; + } + options.timeout_ms = timeout_ms; + session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options)); std::unique_lock lock(m); done_signal.wait(lock, [&done] { return done; }); diff --git a/examples/universal/z_get_attachment.cxx b/examples/universal/z_get_attachment.cxx index 42f296e9..d0bb070f 100644 --- a/examples/universal/z_get_attachment.cxx +++ b/examples/universal/z_get_attachment.cxx @@ -25,10 +25,18 @@ using namespace zenoh; int _main(int argc, char **argv) { const char *expr = "demo/example/**"; - const char *value = "Get from C++"; - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}}); + const char *value = nullptr; + const char *target = "BEST_MATCHING"; + const char *timeout = "10000"; - KeyExpr keyexpr(expr); + Config config = parse_args(argc, argv, {}, {}, + {{"-s", CmdArg{"Query selector (string)", &expr}}, + {"-p", CmdArg{"Query payload (string)", &value}}, + {"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}}, + {"-o", CmdArg{"Timeout in ms (number)", &timeout}}}); + uint64_t timeout_ms = std::stoi(timeout); + QueryTarget query_target = parse_query_target(target); + Selector selector = parse_selector(expr); printf("Opening session...\n"); auto session = Session::open(std::move(config)); @@ -66,10 +74,13 @@ int _main(int argc, char **argv) { std::unordered_map attachment = {{"Source", "C++"}}; Session::GetOptions options; - options.target = QueryTarget::Z_QUERY_TARGET_ALL; - options.payload = value; + options.target = query_target; + if (value != nullptr) { + options.payload = value; + } + options.timeout_ms = timeout_ms; options.attachment = ext::serialize(attachment); - session.get(keyexpr, "", on_reply, on_done, std::move(options)); + session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options)); std::unique_lock lock(m); done_signal.wait(lock, [&done] { return done; }); diff --git a/examples/universal/z_get_channel.cxx b/examples/universal/z_get_channel.cxx index d5b4ec53..4948f17b 100644 --- a/examples/universal/z_get_channel.cxx +++ b/examples/universal/z_get_channel.cxx @@ -23,24 +23,31 @@ using namespace zenoh; int _main(int argc, char **argv) { const char *expr = "demo/example/**"; - const char *value = "Get from C++"; - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}}); + const char *value = nullptr; + const char *target = "BEST_MATCHING"; + const char *timeout = "10000"; - KeyExpr keyexpr(expr); + Config config = parse_args(argc, argv, {}, {}, + {{"-s", CmdArg{"Query selector (string)", &expr}}, + {"-p", CmdArg{"Query payload (string)", &value}}, + {"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}}, + {"-o", CmdArg{"Timeout in ms (number)", &timeout}}}); + uint64_t timeout_ms = std::stoi(timeout); + QueryTarget query_target = parse_query_target(target); + Selector selector = parse_selector(expr); std::cout << "Opening session...\n"; auto session = Session::open(std::move(config)); std::cout << "Sending Query '" << expr << "'...\n"; -#if __cpp_designated_initializers >= 201707L - auto replies = session.get(keyexpr, "", channels::FifoChannel(16), - {.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value}); -#else + Session::GetOptions options; - options.target = QueryTarget::Z_QUERY_TARGET_ALL; - options.payload = value; - auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options)); -#endif + options.target = query_target; + if (value != nullptr) { + options.payload = value; + } + options.timeout_ms = timeout_ms; + auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options)); for (auto res = replies.recv(); std::holds_alternative(res); res = replies.recv()) { const auto &sample = std::get(res).get_ok(); diff --git a/examples/universal/z_get_channel_non_blocking.cxx b/examples/universal/z_get_channel_non_blocking.cxx index ff57135f..3cea234d 100644 --- a/examples/universal/z_get_channel_non_blocking.cxx +++ b/examples/universal/z_get_channel_non_blocking.cxx @@ -25,24 +25,31 @@ using namespace std::chrono_literals; int _main(int argc, char **argv) { const char *expr = "demo/example/**"; - const char *value = "Get from C++"; - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}}); + const char *value = nullptr; + const char *target = "BEST_MATCHING"; + const char *timeout = "10000"; - KeyExpr keyexpr(expr); + Config config = parse_args(argc, argv, {}, {}, + {{"-s", CmdArg{"Query selector (string)", &expr}}, + {"-p", CmdArg{"Query payload (string)", &value}}, + {"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}}, + {"-o", CmdArg{"Timeout in ms (number)", &timeout}}}); + uint64_t timeout_ms = std::stoi(timeout); + QueryTarget query_target = parse_query_target(target); + + Selector selector = parse_selector(expr); std::cout << "Opening session...\n"; auto session = Session::open(std::move(config)); std::cout << "Sending Query '" << expr << "'...\n"; -#if __cpp_designated_initializers >= 201707L - auto replies = session.get(keyexpr, "", channels::FifoChannel(16), - {.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value}); -#else Session::GetOptions options; - options.target = QueryTarget::Z_QUERY_TARGET_ALL; - options.payload = value; - auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options)); -#endif + options.target = query_target; + if (value != nullptr) { + options.payload = value; + } + options.timeout_ms = timeout_ms; + auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options)); while (true) { auto res = replies.try_recv(); diff --git a/examples/universal/z_pub.cxx b/examples/universal/z_pub.cxx index 13d80060..c8d23a9a 100644 --- a/examples/universal/z_pub.cxx +++ b/examples/universal/z_pub.cxx @@ -43,7 +43,7 @@ int _main(int argc, char **argv) { Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}} #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) , - {{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}} + {{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}} #endif ); @@ -53,15 +53,14 @@ int _main(int argc, char **argv) { std::cout << "Declaring Publisher on '" << keyexpr << "'..." << std::endl; auto pub = session.declare_publisher(KeyExpr(keyexpr)); - std::cout << "Publisher on '" << keyexpr << "' declared" << std::endl; #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) if (std::string(add_matching_listener) == "true") { pub.declare_background_matching_listener( - [](const Publisher::MatchingStatus &s) { + [](const MatchingStatus &s) { if (s.matching) { - std::cout << "Subscriber matched" << std::endl; + std::cout << "Publisher has matching subscribers." << std::endl; } else { - std::cout << "No subscribers matched" << std::endl; + std::cout << "Publisher has NO MORE matching subscribers." << std::endl; } }, closures::none); @@ -75,13 +74,10 @@ int _main(int argc, char **argv) { ss << "[" << idx << "] " << value; auto s = ss.str(); std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n"; -#if __cpp_designated_initializers >= 201707L - pub.put(s, {.encoding = Encoding("text/plain")}); -#else + auto put_options = Publisher::PutOptions{}; put_options.encoding = Encoding("text/plain"); pub.put(s, std::move(put_options)); -#endif } return 0; } diff --git a/examples/universal/z_queryable.cxx b/examples/universal/z_queryable.cxx index a09c2e6d..6a6447eb 100644 --- a/examples/universal/z_queryable.cxx +++ b/examples/universal/z_queryable.cxx @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!"; const char *locator = nullptr; int _main(int argc, char **argv) { - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}}); + const char *complete = "false"; + Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}}, + {{"--complete", {CmdArg{"", &complete, true}}}}); printf("Opening session...\n"); auto session = Session::open(std::move(config)); @@ -64,7 +66,9 @@ int _main(int argc, char **argv) { auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; }; - auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable); + Session::QueryableOptions opts; + opts.complete = std::string(complete) == "true"; + auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts)); std::cout << "Press CTRL-C to quit...\n"; while (true) { diff --git a/examples/universal/z_queryable_attachment.cxx b/examples/universal/z_queryable_attachment.cxx index a249e300..611baeb7 100644 --- a/examples/universal/z_queryable_attachment.cxx +++ b/examples/universal/z_queryable_attachment.cxx @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!"; #endif int _main(int argc, char **argv) { - Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}}); + const char *complete = "false"; + Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}}, + {{"--complete", {CmdArg{"", &complete, true}}}}); std::cout << "Opening session...\n"; auto session = Session::open(std::move(config)); @@ -71,7 +73,10 @@ int _main(int argc, char **argv) { }; auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; }; - auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable); + + Session::QueryableOptions opts; + opts.complete = std::string(complete) == "true"; + auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts)); printf("Press CTRL-C to quit...\n"); while (true) { diff --git a/examples/zenohc/z_pub_shm.cxx b/examples/zenohc/z_pub_shm.cxx index 7c058d12..9cbdd560 100644 --- a/examples/zenohc/z_pub_shm.cxx +++ b/examples/zenohc/z_pub_shm.cxx @@ -36,7 +36,7 @@ int _main(int argc, char **argv) { Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}} #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) , - {{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}} + {{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}} #endif ); @@ -49,7 +49,7 @@ int _main(int argc, char **argv) { #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) if (std::string(add_matching_listener) == "true") { pub.declare_background_matching_listener( - [](const Publisher::MatchingStatus &s) { + [](const MatchingStatus &s) { if (s.matching) { std::cout << "Subscriber matched" << std::endl; } else { diff --git a/examples/zenohc/z_querier.cxx b/examples/zenohc/z_querier.cxx new file mode 100644 index 00000000..06d64014 --- /dev/null +++ b/examples/zenohc/z_querier.cxx @@ -0,0 +1,112 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include +#include +#include +#include +#include + +#include "../getargs.h" +#include "zenoh.hxx" + +using namespace zenoh; +using namespace std::chrono_literals; + +int _main(int argc, char **argv) { + const char *expr = "demo/example/**"; + const char *value = nullptr; + const char *target = "BEST_MATCHING"; + const char *timeout = "10000"; + const char *add_matching_listener = "false"; + + Config config = parse_args(argc, argv, {}, {}, + {{"-s", CmdArg{"Query selector (string)", &expr}}, + {"-p", CmdArg{"Query payload (string)", &value}}, + {"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}}, + {"-o", CmdArg{"Timeout in ms (number)", &timeout}} +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + , + {"--add-matching-listener", CmdArg{"", &add_matching_listener, true}} +#endif + } + + ); + uint64_t timeout_ms = std::stoi(timeout); + QueryTarget query_target = parse_query_target(target); + Selector selector = parse_selector(expr); + + std::cout << "Opening session..." << std::endl; + auto session = Session::open(std::move(config)); + + std::cout << "Declaring Querier on '" << selector.key_expr << "'..." << std::endl; + Session::QuerierOptions options; + options.target = query_target; + options.timeout_ms = timeout_ms; + auto querier = session.declare_querier(selector.key_expr, std::move(options)); + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + if (std::string(add_matching_listener) == "true") { + querier.declare_background_matching_listener( + [](const MatchingStatus &s) { + if (s.matching) { + std::cout << "Querier has matching queryables." << std::endl; + } else { + std::cout << "Querier has NO MORE matching queryables." << std::endl; + } + }, + closures::none); + } +#endif + + std::cout << "Press CTRL-C to quit..." << std::endl; + for (int idx = 0; idx < std::numeric_limits::max(); ++idx) { + std::this_thread::sleep_for(1s); + std::ostringstream ss; + ss << "[" << idx << "] "; + if (value != nullptr) { + ss << value; + } + auto s = ss.str(); + std::cout << "Querying '" << expr << "' with payload '" << s << "'...\n"; + + Querier::GetOptions get_options; + get_options.payload = std::move(s); + auto replies = querier.get(selector.parameters, channels::FifoChannel(16), std::move(get_options)); + for (auto res = replies.recv(); std::holds_alternative(res); res = replies.recv()) { + const auto &reply = std::get(res); + if (reply.is_ok()) { + const auto &sample = std::get(res).get_ok(); + std::cout << "Received ('" << reply.get_ok().get_keyexpr().as_string_view() << "' : '" + << reply.get_ok().get_payload().as_string() << "')\n"; + } else { + std::cout << "Received an error\n"; + } + } + } + return 0; +} + +int main(int argc, char **argv) { + try { +#ifdef ZENOHCXX_ZENOHC + init_log_from_env_or("error"); +#endif + _main(argc, argv); + } catch (ZException e) { + std::cout << "Received an error :" << e.what() << "\n"; + } +} diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index 45757ee1..c1ef23dc 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -34,6 +34,9 @@ #include "api/session.hxx" #include "api/subscriber.hxx" #include "api/timestamp.hxx" +#if defined(ZENOHCXX_ZENOHC) +#include "api/querier.hxx" +#endif #if defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API) #include "api/shm/shm.hxx" #endif @@ -41,4 +44,5 @@ #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "api/ext/publication_cache.hxx" #include "api/ext/querying_subscriber.hxx" +#include "api/matching.hxx" #endif diff --git a/include/zenoh/api/matching.hxx b/include/zenoh/api/matching.hxx new file mode 100644 index 00000000..422d374e --- /dev/null +++ b/include/zenoh/api/matching.hxx @@ -0,0 +1,157 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "../detail/closures.hxx" +#include "base.hxx" +#include "interop.hxx" + +namespace zenoh { +class Publisher; +class Querier; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future +/// release. +/// @brief A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables +/// matching Querier's key expression and target. +/// @note Zenoh-c only. +struct MatchingStatus { + /// @name Fields + + /// true if there exist entities matching the target (i.e either Subscribers matching Publisher's key expression or + /// Queryables matching Querier's key expression and target). + bool matching; +}; + +namespace detail::closures { +extern "C" { +inline void _zenoh_on_status_change_call(const ::zc_matching_status_t* status, void* context) { + IClosure::call_from_context(context, MatchingStatus{status->matching}); +} +} +} // namespace detail::closures + +namespace detail { +class MatchingListenerBase : public Owned<::zc_owned_matching_listener_t> { + protected: + MatchingListenerBase(zenoh::detail::null_object_t) : Owned(nullptr){}; + MatchingListenerBase(::zc_owned_matching_listener_t* m) : Owned(m){}; + friend struct interop::detail::Converter; +}; +} // namespace detail + +template +class MatchingListener; + +template <> +class MatchingListener : public detail::MatchingListenerBase { + protected: + using MatchingListenerBase::MatchingListenerBase; + friend class Publisher; + friend class Querier; + + public: + /// @name Methods + + /// @brief Undeclare matching listener. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::zc_undeclare_matching_listener(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare matching listener"); + } +}; + +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future +/// release. +/// @brief A Zenoh matching listener. +/// +/// A listener that sends notifications when the [`MatchingStatus`] of a corresponding Zenoh entity changes. +/// Dropping the corresponding Zenoh entity, disables the matching listener. +/// @note Zenoh-c only. +template +class MatchingListener : public detail::MatchingListenerBase { + Handler _handler; + + public: + /// @name Constructors + + /// @brief Construct stream matching listener from callback matching listener and handler. + /// + /// @param m callback matching listener, that should expose data to the handler in its callback. + /// @param handler handler to access data exposed by `m`. Zenoh handlers implement + /// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own + /// interface. + MatchingListener(MatchingListener&& m, Handler handler) + : MatchingListenerBase(interop::as_owned_c_ptr(m)), _handler(std::move(handler)) {} + + /// @name Methods + + /// @brief Undeclare matching listener, and return its handler, which can still be used to process any messages + /// received prior to undeclaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + Handler undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::zc_undeclare_matching_listener(interop::as_moved_c_ptr(*this)), err, + "Failed to undeclare matching listener"); + return std::move(this->_handler); + } + + /// @brief Return the handler to matching listener data stream. + const Handler& handler() const { return _handler; }; + friend class Session; +}; + +namespace interop { +/// @brief Return a pair of pointers to owned zenoh-c representations of matching listener and its callback. +template >> +auto as_owned_c_ptr(MatchingListener& m) { + return std::make_pair(as_owned_c_ptr(static_cast(m)), + as_owned_c_ptr(const_cast(m.handler()))); +} + +/// @brief Return a pair of pointers to owned zenoh-c representations of matching listener and its callback. +template >> +auto as_owned_c_ptr(const MatchingListener& m) { + return std::make_pair(as_owned_c_ptr(static_cast(m)), + as_owned_c_ptr(m.handler())); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of matching listener and its callback. +template >> +auto as_moved_c_ptr(MatchingListener& m) { + return std::make_pair(as_moved_c_ptr(static_cast(m)), + as_moved_c_ptr(const_cast(m.handler()))); +} + +/// @brief Return a pair of pointers to moved zenoh-c representations of matching listener and its callback. +/// Will return a pair of null pointers if option is empty. +template >> +auto as_moved_c_ptr(std::optional>& m) -> decltype(as_moved_c_ptr(m.value())) { + if (!m.has_value()) { + return as_moved_c_ptr(m.value()); + } else { + return {}; + } +} + +/// @brief Move matching listener and its handler to a pair containing corresponding zenoh-c structs. +template >> +auto move_to_c_obj(MatchingListener&& m) { + return std::make_pair(move_to_c_obj(std::move(static_cast(m))), + move_to_c_obj(std::move(const_cast(m)))); +} +} // namespace interop + +} // namespace zenoh \ No newline at end of file diff --git a/include/zenoh/api/publisher.hxx b/include/zenoh/api/publisher.hxx index ee7b98e1..a7377f60 100644 --- a/include/zenoh/api/publisher.hxx +++ b/include/zenoh/api/publisher.hxx @@ -24,19 +24,12 @@ #include "keyexpr.hxx" #include "timestamp.hxx" #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "matching.hxx" #include "source_info.hxx" #endif #include namespace zenoh { - -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) -namespace detail::closures { -extern "C" { -void _zenoh_on_status_change_call(const ::zc_matching_status_t* status, void* context); -} -} // namespace detail::closures -#endif class Session; /// A Zenoh publisher. Constructed by ``Session::declare_publisher`` method. @@ -140,45 +133,24 @@ class Publisher : public Owned<::z_owned_publisher_t> { #endif #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief A struct that indicates if there exist Subscribers matching the Publisher's key expression. - /// @note Zenoh-c only. - struct MatchingStatus { - /// True if there exist Subscribers matching the Publisher's key expression, false otherwise. - bool matching; - }; - - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future - /// release. - /// @brief A Zenoh matching listener. - /// - /// A listener that sends notifications when the [`MatchingStatus`] of a publisher changes. - /// Dropping the corresponding publisher, disables the matching listener. - class MatchingListener : public Owned<::zc_owned_matching_listener_t> { - MatchingListener(zenoh::detail::null_object_t) : Owned(nullptr){}; - friend struct interop::detail::Converter; - friend class Publisher; - }; - /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. /// @brief Construct matching listener, registering a callback for notifying subscribers matching with a given /// publisher. /// /// @param on_status_change: the callable that will be called every time the matching status of the publisher - /// changes (If last subscriber, disconnects or when the first subscriber connects). - /// @param on_drop the callable that will be called once publisher is destroyed or undeclared. + /// changes (i.e. if last subscriber disconnects or when the first subscriber connects). + /// @param on_drop the callable that will be called once matching listener is destroyed or undeclared. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. /// @return a ``MatchingListener`` object. /// @note Zenoh-c only. template - [[nodiscard]] MatchingListener declare_matching_listener(C&& on_status_change, D&& on_drop, - ZResult* err = nullptr) const { + [[nodiscard]] MatchingListener declare_matching_listener(C&& on_status_change, D&& on_drop, + ZResult* err = nullptr) const { static_assert(std::is_invocable_r::value, "on_status_change should be callable with the following signature: void on_status_change(const " - "zenoh::Publisher::MatchingStatus& status)"); + "zenoh::MatchingStatus& status)"); static_assert(std::is_invocable_r::value, "on_drop should be callable with the following signature: void on_drop()"); ::zc_owned_closure_matching_status_t c_closure; @@ -188,20 +160,44 @@ class Publisher : public Owned<::z_owned_publisher_t> { auto closure = ClosureType::into_context(std::forward(on_status_change), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_status_change_call, detail::closures::_zenoh_on_drop, closure); - MatchingListener m(zenoh::detail::null_object); + MatchingListener m(zenoh::detail::null_object); ZResult res = ::zc_publisher_declare_matching_listener(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(m), ::z_move(c_closure)); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); return m; } + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct matching listener, delivering notification on publisher status change through a streaming + /// handler. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param channel: an instance of channel. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``MatchingListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] MatchingListener> declare_matching_listener( + Channel channel, ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + MatchingListener m(zenoh::detail::null_object); + ZResult res = ::zc_publisher_declare_matching_listener( + interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(m), ::z_move(cb_handler_pair.first)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return MatchingListener>( + std::move(m), std::move(cb_handler_pair.second)); + } + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. /// @brief Declare matching listener, registering a callback for notifying subscribers matching with a given /// publisher. The callback will be run in the background until the corresponding publisher is destroyed. /// /// @param on_status_change: the callable that will be called every time the matching status of the publisher - /// changes (If last subscriber, disconnects or when the first subscriber connects). + /// changes (i.e. if last subscriber disconnects or when the first subscriber connects). /// @param on_drop the callable that will be called once publisher is destroyed or undeclared. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. @@ -210,7 +206,7 @@ class Publisher : public Owned<::z_owned_publisher_t> { void declare_background_matching_listener(C&& on_status_change, D&& on_drop, ZResult* err = nullptr) const { static_assert(std::is_invocable_r::value, "on_status_change should be callable with the following signature: void on_status_change(const " - "zenoh::Publisher::MatchingStatus& status)"); + "zenoh::MatchingStatus& status)"); static_assert(std::is_invocable_r::value, "on_drop should be callable with the following signature: void on_drop()"); ::zc_owned_closure_matching_status_t c_closure; @@ -240,16 +236,5 @@ class Publisher : public Owned<::z_owned_publisher_t> { #endif }; -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) -namespace detail::closures { -extern "C" { -inline void _zenoh_on_status_change_call(const ::zc_matching_status_t* status, void* context) { - IClosure::call_from_context(context, - Publisher::MatchingStatus{status->matching}); -} -} -} // namespace detail::closures -#endif - } // namespace zenoh #endif \ No newline at end of file diff --git a/include/zenoh/api/querier.hxx b/include/zenoh/api/querier.hxx new file mode 100644 index 00000000..cb232637 --- /dev/null +++ b/include/zenoh/api/querier.hxx @@ -0,0 +1,254 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#if defined(ZENOHCXX_ZENOHC) + +#include "../detail/closures_concrete.hxx" +#include "base.hxx" +#include "bytes.hxx" +#include "encoding.hxx" +#include "enums.hxx" +#include "interop.hxx" +#include "keyexpr.hxx" +#include "reply.hxx" +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) +#include "matching.hxx" +#include "source_info.hxx" +#endif +#include + +namespace zenoh { +class Session; + +/// A Zenoh Querier. Constructed by ``Session::declare_querier`` method. Queriers allow to send queries to a queryable. +class Querier : public Owned<::z_owned_querier_t> { + Querier(zenoh::detail::null_object_t) : Owned(nullptr){}; + friend struct interop::detail::Converter; + + public: + /// @brief Options passed to the ``Querier::get`` operation. + struct GetOptions { + /// @name Fields + + /// @brief An optional payload of the query. + std::optional payload = {}; + /// @brief An optional encoding of the query payload and/or attachment. + std::optional encoding = {}; +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief The source info for the query. + std::optional source_info = {}; +#endif + + /// @brief An optional attachment to the query. + std::optional attachment = {}; + + /// @name Methods + + /// @brief Create default option settings. + static GetOptions create_default() { return {}; } + }; + + /// @name Methods + + /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. + /// @param parameters the parameters string in URL format. + /// @param options Options to pass to get operation. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + template + void get(const std::string& parameters, C&& on_reply, D&& on_drop, + GetOptions&& options = GetOptions::create_default(), ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_reply should be callable with the following signature: void on_reply(zenoh::Reply& reply)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::z_owned_closure_reply_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_reply), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_reply_call, detail::closures::_zenoh_on_drop, closure); + ::z_querier_get_options_t opts; + z_querier_get_options_default(&opts); + opts.payload = interop::as_moved_c_ptr(options.payload); + opts.encoding = interop::as_moved_c_ptr(options.encoding); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.source_info = interop::as_moved_c_ptr(options.source_info); +#endif + opts.attachment = interop::as_moved_c_ptr(options.attachment); + + __ZENOH_RESULT_CHECK( + ::z_querier_get(interop::as_loaned_c_ptr(*this), parameters.c_str(), ::z_move(c_closure), &opts), err, + "Failed to perform Querier::get operation"); + } + + /// @brief Query data from the matching queryables in the system. Replies are provided through a channel. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param parameters the parameters string in URL format. + /// @param channel channel instance. + /// @param options Options to pass to get operation. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return reply handler. + template + typename Channel::template HandlerType get(const std::string& parameters, Channel channel, + GetOptions&& options = GetOptions::create_default(), + ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::z_querier_get_options_t opts; + z_querier_get_options_default(&opts); + opts.payload = interop::as_moved_c_ptr(options.payload); + opts.encoding = interop::as_moved_c_ptr(options.encoding); +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.source_info = interop::as_moved_c_ptr(options.source_info); +#endif + opts.attachment = interop::as_moved_c_ptr(options.attachment); + + ZResult res = ::z_querier_get(interop::as_loaned_c_ptr(*this), parameters.c_str(), + ::z_move(cb_handler_pair.first), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to perform Querier::get operation"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return std::move(cb_handler_pair.second); + } + + /// @brief Get the key expression of the querier. + const KeyExpr& get_keyexpr() const { + return interop::as_owned_cpp_ref(::z_querier_keyexpr(interop::as_loaned_c_ptr(*this))); + } + + /// @brief Undeclares querier. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + void undeclare(ZResult* err = nullptr) && { + __ZENOH_RESULT_CHECK(::z_undeclare_querier(interop::as_moved_c_ptr(*this)), err, "Failed to undeclare querier"); + } + +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Get the id of the querier. + /// @return id of this querier. + EntityGlobalId get_id() const { + return interop::into_copyable_cpp_obj(::z_querier_id(interop::as_loaned_c_ptr(*this))); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct matching listener, registering a callback for notifying queryables matching with a given + /// querier's key expression and target. + /// + /// @param on_status_change: the callable that will be called every time the matching status of the querier + /// changes (i.e. if last querier disconnects or when the first querier connects). + /// @param on_drop the callable that will be called once matching listener is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``MatchingListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] MatchingListener declare_matching_listener(C&& on_status_change, D&& on_drop, + ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_status_change should be callable with the following signature: void on_status_change(const " + "zenoh::MatchingStatus& status)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::zc_owned_closure_matching_status_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_status_change), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_status_change_call, detail::closures::_zenoh_on_drop, + closure); + MatchingListener m(zenoh::detail::null_object); + ZResult res = ::zc_querier_declare_matching_listener(interop::as_loaned_c_ptr(*this), + interop::as_owned_c_ptr(m), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); + return m; + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Construct matching listener, delivering notification on querier status change through a streaming + /// handler. + /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or + /// ``zenoh::channels::RingChannel``). + /// @param channel: an instance of channel. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``MatchingListener`` object. + /// @note Zenoh-c only. + template + [[nodiscard]] MatchingListener> declare_matching_listener( + Channel channel, ZResult* err = nullptr) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + MatchingListener m(zenoh::detail::null_object); + ZResult res = ::zc_querier_declare_matching_listener( + interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(m), ::z_move(cb_handler_pair.first)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener"); + if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); + return MatchingListener>( + std::move(m), std::move(cb_handler_pair.second)); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Declare matching listener, registering a callback for notifying queryables matching with a given + /// querier. The callback will be run in the background until the corresponding querier is destroyed. + /// + /// @param on_status_change: the callable that will be called every time the matching status of the querier + /// changes (i.e. if last queryable disconnects or when the first queryable connects). + /// @param on_drop the callable that will be called once querier is destroyed or undeclared. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + template + void declare_background_matching_listener(C&& on_status_change, D&& on_drop, ZResult* err = nullptr) const { + static_assert(std::is_invocable_r::value, + "on_status_change should be callable with the following signature: void on_status_change(const " + "zenoh::MatchingStatus& status)"); + static_assert(std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()"); + ::zc_owned_closure_matching_status_t c_closure; + using Cval = std::remove_reference_t; + using Dval = std::remove_reference_t; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_status_change), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_status_change_call, detail::closures::_zenoh_on_drop, + closure); + ZResult res = + ::zc_querier_declare_background_matching_listener(interop::as_loaned_c_ptr(*this), ::z_move(c_closure)); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare background Matching Listener"); + } + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Gets querier matching status - i.e. if there are any queryables matching its key expression. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @note Zenoh-c only. + MatchingStatus get_matching_status(ZResult* err = nullptr) const { + ::zc_matching_status_t m; + ZResult res = ::zc_querier_get_matching_status(interop::as_loaned_c_ptr(*this), &m); + __ZENOH_RESULT_CHECK(res, err, "Failed to get matching status"); + return {m.matching}; + } +#endif +}; + +} // namespace zenoh +#endif \ No newline at end of file diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index e30b0b7d..69ee7f19 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -29,6 +29,9 @@ #include "query_consolidation.hxx" #include "subscriber.hxx" #include "timestamp.hxx" +#if defined(ZENOHCXX_ZENOHC) +#include "querier.hxx" +#endif #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API) #include "shm/client_storage/client_storage.hxx" #endif @@ -280,6 +283,9 @@ class Session : public Owned<::z_owned_session_t> { z_get_options_default(&opts); opts.target = options.target; opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation); + opts.congestion_control = options.congestion_control; + opts.priority = options.priority; + opts.is_express = options.is_express; opts.payload = interop::as_moved_c_ptr(options.payload); opts.encoding = interop::as_moved_c_ptr(options.encoding); #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) @@ -685,6 +691,77 @@ class Session : public Owned<::z_owned_session_t> { return p; } #endif + +#if defined(ZENOHCXX_ZENOHC) + /// @brief Options to be passed when declaring a ``Querier``. + struct QuerierOptions { + /// @name Fields + + /// @brief The Queryables that should be target of the querier queries. + QueryTarget target = QueryTarget::Z_QUERY_TARGET_BEST_MATCHING; + /// @brief The replies consolidation strategy to apply on replies to the querier queries. + QueryConsolidation consolidation = QueryConsolidation(); + /// @brief The priority of the querier queries. + Priority priority = Z_PRIORITY_DEFAULT; + /// @brief The congestion control to apply when routing querier queries. + CongestionControl congestion_control = Z_CONGESTION_CONTROL_DEFAULT; + /// @brief Whether Zenoh will NOT wait to batch querier queries with other messages to reduce the bandwith. + bool is_express = false; +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// + /// @brief The accepted replies for the querier queries. + /// @note Zenoh-c only. + ReplyKeyExpr accept_replies = ::zc_reply_keyexpr_default(); + + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. + /// @brief Allowed destination for querier queries. + /// @note Zenoh-c only. + Locality allowed_destination = ::zc_locality_default(); +#endif + + /// @brief The timeout for the querier queries in milliseconds. 0 means default query timeout from zenoh + /// configuration. + uint64_t timeout_ms = 0; + + /// @name Methods + + /// @brief Create default option settings. + static QuerierOptions create_default() { return {}; } + }; + + /// @brief Create a ``Querier`` object to send queries to matching ``Queryable`` objects. + /// @param key_expr the key expression to match the queryables. + /// @param options options passed to querier declaration. + /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be + /// thrown in case of error. + /// @return a ``Querier`` object. + Querier declare_querier(const KeyExpr& key_expr, QuerierOptions&& options = QuerierOptions::create_default(), + ZResult* err = nullptr) const { + ::z_querier_options_t opts; + z_querier_options_default(&opts); + opts.target = options.target; + opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation); + opts.congestion_control = options.congestion_control; + opts.priority = options.priority; + opts.is_express = options.is_express; + ; +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + opts.accept_replies = options.accept_replies; + opts.allowed_destination = options.allowed_destination; +#endif + opts.timeout_ms = options.timeout_ms; + + Querier q = interop::detail::null(); + ZResult res = ::z_declare_querier(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(q), + interop::as_loaned_c_ptr(key_expr), &opts); + __ZENOH_RESULT_CHECK(res, err, "Failed to declare Querier"); + return q; + } +#endif + /// @brief Fetches the Zenoh IDs of all connected routers. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be /// thrown in case of error. diff --git a/zenoh-c b/zenoh-c index 216f35e6..1a9583d4 160000 --- a/zenoh-c +++ b/zenoh-c @@ -1 +1 @@ -Subproject commit 216f35e64d389641eadc8152b238611e56a8c899 +Subproject commit 1a9583d44a0dc857eb6fc8323ebcaf829fa0bbd6 From 4d450d487d73fca6ed0fd43a59c8ff7953b4dc79 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 2 Dec 2024 18:02:37 +0100 Subject: [PATCH 2/3] move querier under unstable --- examples/CMakeLists.txt | 3 +++ include/zenoh/api.hxx | 2 +- include/zenoh/api/querier.hxx | 2 ++ include/zenoh/api/session.hxx | 8 ++++++-- zenoh-c | 2 +- 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 8f846a28..449d03f6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -66,6 +66,9 @@ function(add_examples glob mode lib) if(${file} MATCHES ".*liveliness.*$") continue() endif() + if(${file} MATCHES ".*querier.*$") + continue() + endif() if(${file} MATCHES ".*query_sub.*$") continue() endif() diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index c1ef23dc..a0833ce5 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -34,7 +34,7 @@ #include "api/session.hxx" #include "api/subscriber.hxx" #include "api/timestamp.hxx" -#if defined(ZENOHCXX_ZENOHC) +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "api/querier.hxx" #endif #if defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API) diff --git a/include/zenoh/api/querier.hxx b/include/zenoh/api/querier.hxx index cb232637..9e106db6 100644 --- a/include/zenoh/api/querier.hxx +++ b/include/zenoh/api/querier.hxx @@ -32,6 +32,8 @@ namespace zenoh { class Session; +/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future +/// release. /// A Zenoh Querier. Constructed by ``Session::declare_querier`` method. Queriers allow to send queries to a queryable. class Querier : public Owned<::z_owned_querier_t> { Querier(zenoh::detail::null_object_t) : Owned(nullptr){}; diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index 69ee7f19..a9111fd4 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -29,7 +29,7 @@ #include "query_consolidation.hxx" #include "subscriber.hxx" #include "timestamp.hxx" -#if defined(ZENOHCXX_ZENOHC) +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) #include "querier.hxx" #endif #if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API) @@ -692,7 +692,9 @@ class Session : public Owned<::z_owned_session_t> { } #endif -#if defined(ZENOHCXX_ZENOHC) +#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. /// @brief Options to be passed when declaring a ``Querier``. struct QuerierOptions { /// @name Fields @@ -732,6 +734,8 @@ class Session : public Owned<::z_owned_session_t> { static QuerierOptions create_default() { return {}; } }; + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future + /// release. /// @brief Create a ``Querier`` object to send queries to matching ``Queryable`` objects. /// @param key_expr the key expression to match the queryables. /// @param options options passed to querier declaration. diff --git a/zenoh-c b/zenoh-c index 1a9583d4..2a1511a7 160000 --- a/zenoh-c +++ b/zenoh-c @@ -1 +1 @@ -Subproject commit 1a9583d44a0dc857eb6fc8323ebcaf829fa0bbd6 +Subproject commit 2a1511a7f4992a3b242ec61b4435306b4f5b6ac1 From ccec8af5759c5e2a0758301c7438654d3df91252 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 3 Dec 2024 13:39:06 +0100 Subject: [PATCH 3/3] switch zenoh-c to main --- zenoh-c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-c b/zenoh-c index 2a1511a7..cb348a7c 160000 --- a/zenoh-c +++ b/zenoh-c @@ -1 +1 @@ -Subproject commit 2a1511a7f4992a3b242ec61b4435306b4f5b6ac1 +Subproject commit cb348a7cb906ac764606f455434005df2dd8a1ec