Skip to content

Commit

Permalink
add querier support (#297)
Browse files Browse the repository at this point in the history
* add querier

* move querier under unstable

* switch zenoh-c to main
  • Loading branch information
DenisBiryukov91 authored Dec 3, 2024
1 parent e8eca99 commit 725a14b
Show file tree
Hide file tree
Showing 22 changed files with 791 additions and 104 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ The `z_pub` should receive message sent by `z_sub`.
./z_get
```

The `z_get` should receive the data from `z_queryable`.
### Queryable and Querier Example
```bash
./z_queryable
```

```bash
./z_querier
```

The `z_querier` should continuously send queries and receive replies from `z_queryable`.

### Throughput Examples
```bash
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ API Reference
scouting
publish_subscribe
query_reply
matching
serialization_deserialization
channels
interop
Expand Down
11 changes: 11 additions & 0 deletions docs/matching.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Matching
=================
Classes related to getting information about matching Zenoh entities.

.. doxygenstruct:: zenoh::MatchingStatus
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::MatchingListener
:members:
:membergroups: Constructors Operators Methods
2 changes: 1 addition & 1 deletion docs/publish_subscribe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Classes related to publish-subscribe pattern.

.. doxygenclass:: zenoh::Publisher
:members:
:membergroups: Constructors Operators Methods
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::Subscriber
:members:
Expand Down
4 changes: 4 additions & 0 deletions docs/query_reply.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Query-Reply
===========
Classes related to query-reply pattern.

.. doxygenclass:: zenoh::Querier
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::Queryable
:members:
:membergroups: Constructors Operators Methods
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*liveliness.*$")
continue()
endif()
if(${file} MATCHES ".*querier.*$")
continue()
endif()
if(${file} MATCHES ".*query_sub.*$")
continue()
endif()
Expand Down
27 changes: 27 additions & 0 deletions examples/getargs.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cstring>
#include <iostream>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -155,4 +156,30 @@ inline zenoh::Config parse_args(int argc, char **argv, const std::vector<CmdArg>
}
#endif
return std::move(config);
}

zenoh::QueryTarget parse_query_target(std::string_view v) {
if (v == "BEST_MATCHING") {
return zenoh::QueryTarget::Z_QUERY_TARGET_BEST_MATCHING;
} else if (v == "ALL") {
return zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
} else if (v == "ALL_COMPLETE") {
return zenoh::QueryTarget::Z_QUERY_TARGET_ALL_COMPLETE;
}

throw std::runtime_error(std::string("Unsupported QueryTarget: ") + std::string(v));
}

struct Selector {
std::string key_expr;
std::string parameters;
};

Selector parse_selector(const std::string &selector_string) {
size_t pos = selector_string.find('?');
if (pos == std::string::npos) {
return Selector{selector_string, ""};
} else {
return Selector{selector_string.substr(0, pos), selector_string.substr(pos + 1)};
}
}
27 changes: 17 additions & 10 deletions examples/universal/z_get.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));
Expand All @@ -55,14 +63,13 @@ int _main(int argc, char **argv) {
done_signal.notify_all();
};

#if __cpp_designated_initializers >= 201707L
session.get(keyexpr, "", on_reply, on_done, {.target = Z_QUERY_TARGET_ALL, .payload = Bytes::serialize(value)});
#else
Session::GetOptions options;
options.target = Z_QUERY_TARGET_ALL;
options.payload = value;
session.get(keyexpr, "", on_reply, on_done, std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options));

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
23 changes: 17 additions & 6 deletions examples/universal/z_get_attachment.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

printf("Opening session...\n");
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -66,10 +74,13 @@ int _main(int argc, char **argv) {
std::unordered_map<std::string, std::string> attachment = {{"Source", "C++"}};

Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
options.attachment = ext::serialize(attachment);
session.get(keyexpr, "", on_reply, on_done, std::move(options));
session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options));

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
29 changes: 18 additions & 11 deletions examples/universal/z_get_channel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));

std::cout << "Sending Query '" << expr << "'...\n";
#if __cpp_designated_initializers >= 201707L
auto replies = session.get(keyexpr, "", channels::FifoChannel(16),
{.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value});
#else

Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options));

for (auto res = replies.recv(); std::holds_alternative<Reply>(res); res = replies.recv()) {
const auto &sample = std::get<Reply>(res).get_ok();
Expand Down
29 changes: 18 additions & 11 deletions examples/universal/z_get_channel_non_blocking.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,31 @@ using namespace std::chrono_literals;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);

Selector selector = parse_selector(expr);
std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));

std::cout << "Sending Query '" << expr << "'...\n";

#if __cpp_designated_initializers >= 201707L
auto replies = session.get(keyexpr, "", channels::FifoChannel(16),
{.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value});
#else
Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options));

while (true) {
auto res = replies.try_recv();
Expand Down
14 changes: 5 additions & 9 deletions examples/universal/z_pub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
{{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}}
#endif
);

Expand All @@ -53,15 +53,14 @@ int _main(int argc, char **argv) {
std::cout << "Declaring Publisher on '" << keyexpr << "'..." << std::endl;
auto pub = session.declare_publisher(KeyExpr(keyexpr));

std::cout << "Publisher on '" << keyexpr << "' declared" << std::endl;
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
[](const MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
std::cout << "Publisher has matching subscribers." << std::endl;
} else {
std::cout << "No subscribers matched" << std::endl;
std::cout << "Publisher has NO MORE matching subscribers." << std::endl;
}
},
closures::none);
Expand All @@ -75,13 +74,10 @@ int _main(int argc, char **argv) {
ss << "[" << idx << "] " << value;
auto s = ss.str();
std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n";
#if __cpp_designated_initializers >= 201707L
pub.put(s, {.encoding = Encoding("text/plain")});
#else

auto put_options = Publisher::PutOptions{};
put_options.encoding = Encoding("text/plain");
pub.put(s, std::move(put_options));
#endif
}
return 0;
}
Expand Down
8 changes: 6 additions & 2 deletions examples/universal/z_queryable.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!";
const char *locator = nullptr;

int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *complete = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}},
{{"--complete", {CmdArg{"", &complete, true}}}});

printf("Opening session...\n");
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -64,7 +66,9 @@ int _main(int argc, char **argv) {

auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; };

auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable);
Session::QueryableOptions opts;
opts.complete = std::string(complete) == "true";
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts));

std::cout << "Press CTRL-C to quit...\n";
while (true) {
Expand Down
9 changes: 7 additions & 2 deletions examples/universal/z_queryable_attachment.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!";
#endif

int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *complete = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}},
{{"--complete", {CmdArg{"", &complete, true}}}});

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -71,7 +73,10 @@ int _main(int argc, char **argv) {
};

auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; };
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable);

Session::QueryableOptions opts;
opts.complete = std::string(complete) == "true";
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts));

printf("Press CTRL-C to quit...\n");
while (true) {
Expand Down
4 changes: 2 additions & 2 deletions examples/zenohc/z_pub_shm.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
{{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}}
#endif
);

Expand All @@ -49,7 +49,7 @@ int _main(int argc, char **argv) {
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
[](const MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
} else {
Expand Down
Loading

0 comments on commit 725a14b

Please sign in to comment.