Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying subscriber #283

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ ALWAYS_DETAILED_SEC = NO
# operators of the base classes will not be shown.
# The default value is: NO.

INLINE_INHERITED_MEMB = NO
INLINE_INHERITED_MEMB = YES

# If the FULL_PATH_NAMES tag is set to YES, doxygen will prepend the full path
# before files name in the file list and in the header files. If set to NO the
Expand Down
4 changes: 4 additions & 0 deletions docs/ext.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ Extensions
Extra functionality, which is not a part of core Zenoh API.

.. doxygenclass:: zenoh::ext::PublicationCache
:members:
:membergroups: Constructors Operators Methods

.. doxygenclass:: zenoh::ext::QueryingSubscriber
:members:
:membergroups: Constructors Operators Methods
1 change: 0 additions & 1 deletion docs/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ Session
.. doxygenclass:: zenoh::Session
:members:
:membergroups: Constructors Operators Methods Fields

3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*liveliness.*$")
continue()
endif()
if(${file} MATCHES ".*query_sub.*$")
continue()
endif()
if(${file} MATCHES ".*pub_cache.*$")
continue()
endif()
Expand Down
3 changes: 2 additions & 1 deletion examples/zenohc/z_pub_cache.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2022 ZettaScale Technology
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
Expand Down Expand Up @@ -46,6 +46,7 @@ int _main(int argc, char **argv) {
std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl;
Session::PublicationCacheOptions opts;
opts.history = std::atoi(history);
opts.queryable_complete = true;
if (!std::string(prefix).empty()) {
opts.queryable_prefix = KeyExpr(prefix);
}
Expand Down
80 changes: 80 additions & 0 deletions examples/zenohc/z_query_sub.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include <chrono>
#include <iostream>
#include <limits>
#include <sstream>
#include <thread>

#include "../getargs.h"
#include "zenoh.hxx"

using namespace zenoh;
using namespace std::chrono_literals;

const char *default_keyexpr = "demo/example/**";
const char *default_query = "";

const char *kind_to_str(SampleKind kind) {
switch (kind) {
case SampleKind::Z_SAMPLE_KIND_PUT:
return "PUT";
case SampleKind::Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}

int _main(int argc, char **argv) {
const char *keyexpr = default_keyexpr;
const char *query = default_query;
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}}, {{"-q", {"query", &query}}});

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'"
<< std::endl;
Session::QueryingSubscriberOptions opts;

if (!std::string(query).empty()) {
opts.query_keyexpr = KeyExpr(query);
opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY;
}
auto querying_subscriber = session.declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts));

std::cout << "Press CTRL-C to quit..." << std::endl;
for (auto res = querying_subscriber.handler().recv(); std::holds_alternative<Sample>(res);
res = querying_subscriber.handler().recv()) {
const auto &sample = std::get<Sample>(res);
std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('"
<< sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')"
<< std::endl;
}

return 0;
}

int main(int argc, char **argv) {
try {
init_log_from_env_or("error");
_main(argc, argv);
} catch (ZException e) {
std::cout << "Received an error :" << e.what() << "\n";
}
}
1 change: 1 addition & 0 deletions include/zenoh/api.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
#include "api/ext/serialization.hxx"
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "api/ext/publication_cache.hxx"
#include "api/ext/querying_subscriber.hxx"
#endif
8 changes: 8 additions & 0 deletions include/zenoh/api/enums.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ inline std::string_view whatami_as_str(WhatAmI whatami) {
/// - **ZC_LOCALITY_SESSION_LOCAL**: Only from local sessions.
/// - **ZC_LOCALITY_SESSION_REMOTE**: Only from remote sessions.
typedef ::zc_locality_t Locality;

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Key expressions types to which Queryable should reply to.
///
/// Values:
/// - **ZC_REPLY_KEYEXPR_ANY**: Replies to any key expression queries.
/// - **ZC_REPLY_KEYEXPR_MATCHING_QUERY**: Replies only to queries with intersecting key expressions.
typedef ::zc_reply_keyexpr_t ReplyKeyExpr;
#endif

} // namespace zenoh
Loading
Loading