Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced pub sub #339

Merged
merged 11 commits into from
Dec 19, 2024
18 changes: 17 additions & 1 deletion docs/ext.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,20 @@ Extra Zenoh entities.

.. doxygenclass:: zenoh::ext::QueryingSubscriber
:members:
:membergroups: Constructors Operators Methods Fields
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::ext::AdvancedPublisher
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::ext::AdvancedSubscriber
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenstruct:: zenoh::ext::Miss
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::ext::SampleMissListener
:members:
:membergroups: Constructors Operators Methods
2 changes: 1 addition & 1 deletion docs/matching.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Matching
=================
========
Classes related to getting information about matching Zenoh entities.

.. doxygenstruct:: zenoh::MatchingStatus
Expand Down
5 changes: 1 addition & 4 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*querier.*$")
continue()
endif()
if(${file} MATCHES ".*query_sub.*$")
continue()
endif()
if(${file} MATCHES ".*pub_cache.*$")
if((${file} MATCHES ".*advanced_sub.*$") OR (${file} MATCHES ".*advanced_pub.*$"))
continue()
endif()
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include <chrono>
#include <cstdlib>
#include <iostream>
#include <limits>
#include <sstream>
Expand All @@ -26,60 +24,57 @@
using namespace zenoh;
using namespace std::chrono_literals;

#ifdef ZENOHCXX_ZENOHC
const char *default_value = "Pub from C++ zenoh-c!";
const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-c-pub";
const char *default_prefix = "";
#elif ZENOHCXX_ZENOHPICO
const char *default_value = "Pub from C++ zenoh-pico!";
const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-pico-pub";
#else
#error "Unknown zenoh backend"
#endif

int _main(int argc, char **argv) {
auto &&[config, args] =
ConfigCliArgParser(argc, argv)
.named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to write to (string)", default_keyexpr)
.named_value({"v", "value"}, "VALUE", "Value to publish (string)", default_value)
.named_value({"i", "history"}, "HISTORY", "Number of publications to keep in cache (number)", "1")
.named_flag({"o", "complete"},
"Set `complete` option to true. This means that this queryable is ultimate data source, no "
"need to scan other queryables")
.named_value({"x", "prefix"}, "PREFIX", "Queryable prefix", "")
.named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to publish to (string)", default_keyexpr)
.named_value({"p", "payload"}, "PAYLOAD", "Payload to publish (string)", default_value)
.named_value({"i", "history"}, "HISTORY_SIZE", "The number of publications to keep in cache (number)", "1")
.run();

auto keyexpr = args.value("key");
auto value = args.value("value");
auto payload = args.value("payload");
auto history = std::atoi(args.value("history").data());
auto complete = args.flag("complete");
auto prefix = args.value("prefix");

config.insert_json5(Z_CONFIG_ADD_TIMESTAMP_KEY, "true");

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Publication cache on '" << keyexpr << "'..." << std::endl;
ext::SessionExt::PublicationCacheOptions opts;
opts.history = history;
opts.queryable_complete = complete;
if (!prefix.empty()) {
opts.queryable_prefix = KeyExpr(prefix);
}
if (!std::string(prefix).empty()) {
opts.queryable_prefix = KeyExpr(prefix);
}
auto pub_cache = session.ext().declare_publication_cache(keyexpr, std::move(opts));
ext::SessionExt::AdvancedPublisherOptions opts;
opts.cache = ext::SessionExt::AdvancedPublisherOptions::CacheOptions{};
opts.cache->max_samples = history;
opts.publisher_detection = true;
opts.sample_miss_detection = true;

std::cout << "Declaring AdvancedPublisher on '" << keyexpr << "'..." << std::endl;
auto pub = session.ext().declare_advanced_publisher(KeyExpr(keyexpr), std::move(opts));

std::cout << "Press CTRL-C to quit..." << std::endl;
for (int idx = 0; idx < std::numeric_limits<int>::max(); ++idx) {
std::this_thread::sleep_for(1s);
std::ostringstream ss;
ss << "[" << idx << "] " << value;
ss << "[" << idx << "] " << payload;
auto s = ss.str();
std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n";
session.put(keyexpr, std::move(s));
std::cout << "Put Data ('" << keyexpr << "': '" << s << "')...\n";
pub.put(s);
}
return 0;
}

int main(int argc, char **argv) {
try {
#ifdef ZENOHCXX_ZENOHC
init_log_from_env_or("error");
#endif
_main(argc, argv);
} catch (ZException e) {
std::cout << "Received an error :" << e.what() << "\n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include <chrono>
#include <iostream>
#include <limits>
Expand Down Expand Up @@ -41,34 +38,40 @@ int _main(int argc, char **argv) {
auto &&[config, args] =
ConfigCliArgParser(argc, argv)
.named_value({"k", "key"}, "KEY_EXPRESSION", "Key expression to subscriber to (string)", "demo/example/**")
.named_value({"q", "query"}, "Query",
"Selector to use for queries (by default it's same as 'KEY_EXPRESSION') (string)", "")
.run();

auto keyexpr = args.value("key");
auto query = args.value("query");

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Querying Subscriber on '" << keyexpr << "' with initial query on '" << query << "'"
<< std::endl;
ext::SessionExt::QueryingSubscriberOptions opts;

if (!query.empty()) {
opts.query_keyexpr = KeyExpr(query);
opts.query_accept_replies = ReplyKeyExpr::ZC_REPLY_KEYEXPR_ANY;
}
auto querying_subscriber =
session.ext().declare_querying_subscriber(keyexpr, channels::FifoChannel(16), std::move(opts));
ext::SessionExt::AdvancedSubscriberOptions opts;
opts.history = ext::SessionExt::AdvancedSubscriberOptions::HistoryOptions{};
opts.history->detect_late_publishers = true;
opts.recovery = ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions{};
opts.recovery->periodic_queries_period_ms = 1000;
opts.subscriber_detection = true;

std::cout << "Press CTRL-C to quit..." << std::endl;
for (auto res = querying_subscriber.handler().recv(); std::holds_alternative<Sample>(res);
res = querying_subscriber.handler().recv()) {
const auto &sample = std::get<Sample>(res);
auto data_handler = [](const Sample &sample) {
std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('"
<< sample.get_keyexpr().as_string_view() << "': '" << sample.get_payload().as_string() << "')"
<< sample.get_keyexpr().as_string_view() << "' : '" << sample.get_payload().as_string() << "')";
std::cout << std::endl;
};

auto missed_sample_handler = [](const ext::Miss &miss) {
std::cout << ">> [Subscriber] Missed " << miss.nb << " samples from '" << miss.source.id() << "' !!!"
<< std::endl;
};

std::cout << "Declaring AdvancedSubscriber on '" << keyexpr << "'" << std::endl;
auto advanced_subscriber =
session.ext().declare_advanced_subscriber(keyexpr, data_handler, closures::none, std::move(opts));

advanced_subscriber.declare_background_sample_miss_listener(missed_sample_handler, closures::none);

std::cout << "Press CTRL-C to quit..." << std::endl;
while (true) {
std::this_thread::sleep_for(1s);
}

return 0;
Expand Down
3 changes: 1 addition & 2 deletions include/zenoh/api.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#endif
#include "api/ext/serialization.hxx"
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
#include "api/ext/publication_cache.hxx"
#include "api/ext/querying_subscriber.hxx"
#include "api/ext/session_ext.hxx"
#include "api/matching.hxx"
#endif
Loading
Loading