Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying subscriber #283

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ ALWAYS_DETAILED_SEC = NO
# operators of the base classes will not be shown.
# The default value is: NO.

INLINE_INHERITED_MEMB = NO
INLINE_INHERITED_MEMB = YES

# If the FULL_PATH_NAMES tag is set to YES, doxygen will prepend the full path
# before files name in the file list and in the header files. If set to NO the
Expand Down
4 changes: 4 additions & 0 deletions docs/ext.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ Extensions
Extra functionality, which is not a part of core Zenoh API.

.. doxygenclass:: zenoh::ext::PublicationCache
:members:
:membergroups: Constructors Operators Methods

.. doxygenclass:: zenoh::ext::QueryingSubscriber
:members:
:membergroups: Constructors Operators Methods
5 changes: 5 additions & 0 deletions docs/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ Session
:members:
:membergroups: Constructors Operators Methods Fields


.. doxygenstruct:: zenoh::GetOptions
:members:
:membergroups: Constructors Operators Methods Fields

3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*liveliness.*$")
continue()
endif()
if(${file} MATCHES ".*query_sub.*$")
continue()
endif()
if(${file} MATCHES ".*pub_cache.*$")
continue()
endif()
Expand Down
3 changes: 2 additions & 1 deletion examples/zenohc/z_pub_cache.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2022 ZettaScale Technology
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
Expand Down Expand Up @@ -46,6 +46,7 @@ int _main(int argc, char **argv) {
std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl;
Session::PublicationCacheOptions opts;
opts.history = std::atoi(history);
opts.queryable_complete = true;
if (!std::string(prefix).empty()) {
opts.queryable_prefix = KeyExpr(prefix);
}
Expand Down
80 changes: 80 additions & 0 deletions examples/zenohc/z_query_sub.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include <chrono>
#include <iostream>
#include <limits>
#include <sstream>
#include <thread>

#include "../getargs.h"
#include "zenoh.hxx"

using namespace zenoh;
using namespace std::chrono_literals;

const char *default_keyexpr = "demo/example/**";
const char *default_query = "";

const char *kind_to_str(SampleKind kind) {
switch (kind) {
case SampleKind::Z_SAMPLE_KIND_PUT:
return "PUT";
case SampleKind::Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}

int _main(int argc, char **argv) {
const char *keyexpr = default_keyexpr;
const char *query = default_query;
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}}, {{"-q", {"query", &query}}});

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'"
<< std::endl;
Session::QueryingSubscriberOptions opts;

if (!std::string(query).empty()) {
opts.query_keyexpr = KeyExpr(query);
opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY;
}
auto querying_subscriber = session.declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts));

std::cout << "Press CTRL-C to quit..." << std::endl;
for (auto res = querying_subscriber.handler().recv(); std::holds_alternative<Sample>(res);
res = querying_subscriber.handler().recv()) {
const auto &sample = std::get<Sample>(res);
std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('"
<< sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')"
<< std::endl;
}

return 0;
}

int main(int argc, char **argv) {
try {
init_log_from_env_or("error");
_main(argc, argv);
} catch (ZException e) {
std::cout << "Received an error :" << e.what() << "\n";
}
}
8 changes: 8 additions & 0 deletions include/zenoh/api/enums.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ inline std::string_view whatami_as_str(WhatAmI whatami) {
/// - **ZC_LOCALITY_SESSION_LOCAL**: Only from local sessions.
/// - **ZC_LOCALITY_SESSION_REMOTE**: Only from remote sessions.
typedef ::zc_locality_t Locality;

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Key expressions types to which Queryable should reply to.
///
/// Values:
/// - **ZC_REPLY_KEYEXPR_ANY**: Replies to any key expression queries.
/// - **ZC_REPLY_KEYEXPR_MATCHING_QUERY**: Replies only to queries with intersecting key expressions.
typedef ::zc_reply_keyexpr_t ReplyKeyExpr;
#endif

} // namespace zenoh
196 changes: 196 additions & 0 deletions include/zenoh/api/ext/querying_subscriber.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#pragma once

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "../base.hxx"
#include "../get.hxx"
#include "../interop.hxx"
#include "../keyexpr.hxx"

namespace zenoh {
namespace ext {

namespace detail {
class QueryingSubscriberBase : public Owned<::ze_owned_querying_subscriber_t> {
protected:
QueryingSubscriberBase(zenoh::detail::null_object_t) : Owned(nullptr){};
QueryingSubscriberBase(::ze_owned_querying_subscriber_t* qs) : Owned(qs){};

public:
/// @name Methods

///@copydoc zenoh::GetOptions
using GetOptions = zenoh::GetOptions;

/// @brief Make querying subscriber perform an additional query on a specified selector.
/// The queried samples will be merged with the received publications and made available in the subscriber callback.
/// @param key_expr the key expression matching resources to query.
/// @param options query options.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
void get(const KeyExpr& key_expr, zenoh::GetOptions&& options = zenoh::GetOptions::create_default(),
ZResult* err = nullptr) const {
::z_get_options_t opts;
z_get_options_default(&opts);
opts.target = options.target;
opts.consolidation = *interop::as_copyable_c_ptr(options.consolidation);
opts.payload = interop::as_moved_c_ptr(options.payload);
opts.encoding = interop::as_moved_c_ptr(options.encoding);
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
opts.source_info = interop::as_moved_c_ptr(options.source_info);
opts.accept_replies = options.accept_replies;
opts.allowed_destination = options.allowed_destination;
#endif
opts.attachment = interop::as_moved_c_ptr(options.attachment);
opts.timeout_ms = options.timeout_ms;

ZResult res =
::ze_querying_subscriber_get(interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), &opts);
__ZENOH_RESULT_CHECK(res, err, "Failed to perform get operation");
}

friend class zenoh::Session;
};

} // namespace detail

template <class Handler>
class QueryingSubscriber;

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A Zenoh querying subscriber.
///
/// In addition to receiving the data it is subscribed to,
/// it also will fetch data from a Queryable at startup and peridodically (using `QueryingSubscriber::get`).
/// @note Zenoh-c only.
template <>
class QueryingSubscriber<void> : public detail::QueryingSubscriberBase {
protected:
using QueryingSubscriberBase::QueryingSubscriberBase;
friend class Session;
friend struct interop::detail::Converter;

public:
/// @name Methods

/// @brief Undeclare publication cache.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
void undeclare(ZResult* err = nullptr) && {
__ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err,
"Failed to undeclare Querying Subscriber");
}
};

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A Zenoh querying subscriber declared by ``zenoh::Session::declare_querying_subscriber``.
///
/// In addition to receiving the data it is subscribed to,
/// it also will fetch data from a Queryable at startup and peridodically (using ``QueryingSubscriber::get``).
/// @note Zenoh-c only.
/// @tparam Handler streaming handler exposing data. If `void`, no handler access is provided and instead data is being
/// processed inside the callback.
template <class Handler>
class QueryingSubscriber : public detail::QueryingSubscriberBase {
Handler _handler;

public:
/// @name Constructors

/// @brief Construct stream querying subscriber from callback querying subscriber and handler.
///
/// @param qs callback querying subscriber, that should expose data to the handler in its callback.
/// @param handler handler to access data exposed by `s`. Zenoh handlers implement
/// recv and try_recv methods, for blocking and non-blocking message reception. But user is free to define his own
/// interface.
QueryingSubscriber(QueryingSubscriber<void>&& qs, Handler handler)
: QueryingSubscriberBase(interop::as_owned_c_ptr(qs)), _handler(std::move(handler)) {}

/// @name Methods

/// @brief Undeclare querying subscriber, and return its handler, which can still be used to process any messages
/// received prior to undeclaration.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
Handler undeclare(ZResult* err = nullptr) && {
__ZENOH_RESULT_CHECK(::ze_undeclare_querying_subscriber(interop::as_moved_c_ptr(*this)), err,
"Failed to undeclare Querying Subscriber");
return std::move(this->_handler);
}

/// @brief Return the handler to subscriber data stream.
const Handler& handler() const { return _handler; };
friend class Session;
};

} // namespace ext

namespace interop {
/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_owned_c_ptr(zenoh::ext::QueryingSubscriber<Handler>& s) {
return std::make_pair(as_owned_c_ptr(static_cast<zenoh::ext::detail::QueryingSubscriberBase&>(s)),
as_owned_c_ptr(const_cast<Handler&>(s.handler())));
}

/// @brief Return a pair of pointers to owned zenoh-c representations of querying subscriber and its callback.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_owned_c_ptr(const zenoh::ext::QueryingSubscriber<Handler>& s) {
return std::make_pair(as_owned_c_ptr(static_cast<const zenoh::ext::detail::QueryingSubscriberBase&>(s)),
as_owned_c_ptr(s.handler()));
}

/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_loaned_c_ptr(zenoh::ext::QueryingSubscriber<Handler>& s) {
return std::make_pair(as_loaned_c_ptr(static_cast<zenoh::ext::detail::QueryingSubscriberBase&>(s)),
as_loaned_c_ptr(const_cast<Handler&>(s.handler())));
}

/// @brief Return a pair of pointers to loaned zenoh-c representations of querying subscriber and its callback.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_loaned_c_ptr(const zenoh::ext::QueryingSubscriber<Handler>& s) {
return std::make_pair(as_loaned_c_ptr(static_cast<const zenoh::ext::detail::QueryingSubscriberBase&>(s)),
as_loaned_c_ptr(s.handler()));
}

/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_moved_c_ptr(zenoh::ext::QueryingSubscriber<Handler>& s) {
return std::make_pair(as_moved_c_ptr(static_cast<zenoh::ext::detail::QueryingSubscriberBase&>(s)),
as_moved_c_ptr(const_cast<Handler&>(s.handler())));
}

/// @brief Return a pair of pointers to moved zenoh-c representations of querying subscriber and its callback.
/// Will return a pair of null pointers if option is empty.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto as_moved_c_ptr(std::optional<zenoh::ext::QueryingSubscriber<Handler>>& s) -> decltype(as_moved_c_ptr(s.value())) {
if (!s.has_value()) {
return as_moved_c_ptr(s.value());
} else {
return {};
}
}

/// @brief Move querying subscriber and its handler to a pair containing corresponding zenoh-c structs.
template <class Handler, typename = std::enable_if_t<!std::is_same_v<Handler, void>>>
auto move_to_c_obj(zenoh::ext::QueryingSubscriber<Handler>&& s) {
return std::make_pair(move_to_c_obj(std::move(static_cast<zenoh::ext::detail::QueryingSubscriberBase&>(s))),
move_to_c_obj(std::move(const_cast<Handler&>(s))));
}
} // namespace interop

} // namespace zenoh
#endif
Loading
Loading