Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add querier support #297

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ The `z_pub` should receive message sent by `z_sub`.
./z_get
```

The `z_get` should receive the data from `z_queryable`.
### Queryable and Querier Example
```bash
./z_queryable
```

```bash
./z_querier
```

The `z_querier` should continuously send queries and receive replies from `z_queryable`.

### Throughput Examples
```bash
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ API Reference
scouting
publish_subscribe
query_reply
matching
serialization_deserialization
channels
interop
Expand Down
11 changes: 11 additions & 0 deletions docs/matching.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Matching
=================
Classes related to getting information about matching Zenoh entities.

.. doxygenstruct:: zenoh::MatchingStatus
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::MatchingListener
:members:
:membergroups: Constructors Operators Methods
2 changes: 1 addition & 1 deletion docs/publish_subscribe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Classes related to publish-subscribe pattern.

.. doxygenclass:: zenoh::Publisher
:members:
:membergroups: Constructors Operators Methods
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::Subscriber
:members:
Expand Down
4 changes: 4 additions & 0 deletions docs/query_reply.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Query-Reply
===========
Classes related to query-reply pattern.

.. doxygenclass:: zenoh::Querier
:members:
:membergroups: Constructors Operators Methods Fields

.. doxygenclass:: zenoh::Queryable
:members:
:membergroups: Constructors Operators Methods
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ function(add_examples glob mode lib)
if(${file} MATCHES ".*liveliness.*$")
continue()
endif()
if(${file} MATCHES ".*querier.*$")
continue()
endif()
if(${file} MATCHES ".*query_sub.*$")
continue()
endif()
Expand Down
27 changes: 27 additions & 0 deletions examples/getargs.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cstring>
#include <iostream>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -155,4 +156,30 @@ inline zenoh::Config parse_args(int argc, char **argv, const std::vector<CmdArg>
}
#endif
return std::move(config);
}

zenoh::QueryTarget parse_query_target(std::string_view v) {
if (v == "BEST_MATCHING") {
return zenoh::QueryTarget::Z_QUERY_TARGET_BEST_MATCHING;
} else if (v == "ALL") {
return zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
} else if (v == "ALL_COMPLETE") {
return zenoh::QueryTarget::Z_QUERY_TARGET_ALL_COMPLETE;
}

throw std::runtime_error(std::string("Unsupported QueryTarget: ") + std::string(v));
}

struct Selector {
std::string key_expr;
std::string parameters;
};

Selector parse_selector(const std::string &selector_string) {
size_t pos = selector_string.find('?');
if (pos == std::string::npos) {
return Selector{selector_string, ""};
} else {
return Selector{selector_string.substr(0, pos), selector_string.substr(pos + 1)};
}
}
27 changes: 17 additions & 10 deletions examples/universal/z_get.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));
Expand All @@ -55,14 +63,13 @@ int _main(int argc, char **argv) {
done_signal.notify_all();
};

#if __cpp_designated_initializers >= 201707L
session.get(keyexpr, "", on_reply, on_done, {.target = Z_QUERY_TARGET_ALL, .payload = Bytes::serialize(value)});
#else
Session::GetOptions options;
options.target = Z_QUERY_TARGET_ALL;
options.payload = value;
session.get(keyexpr, "", on_reply, on_done, std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options));

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
23 changes: 17 additions & 6 deletions examples/universal/z_get_attachment.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

printf("Opening session...\n");
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -66,10 +74,13 @@ int _main(int argc, char **argv) {
std::unordered_map<std::string, std::string> attachment = {{"Source", "C++"}};

Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
options.attachment = ext::serialize(attachment);
session.get(keyexpr, "", on_reply, on_done, std::move(options));
session.get(selector.key_expr, selector.parameters, on_reply, on_done, std::move(options));

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
29 changes: 18 additions & 11 deletions examples/universal/z_get_channel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@ using namespace zenoh;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);
Selector selector = parse_selector(expr);

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));

std::cout << "Sending Query '" << expr << "'...\n";
#if __cpp_designated_initializers >= 201707L
auto replies = session.get(keyexpr, "", channels::FifoChannel(16),
{.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value});
#else

Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options));

for (auto res = replies.recv(); std::holds_alternative<Reply>(res); res = replies.recv()) {
const auto &sample = std::get<Reply>(res).get_ok();
Expand Down
29 changes: 18 additions & 11 deletions examples/universal/z_get_channel_non_blocking.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,31 @@ using namespace std::chrono_literals;

int _main(int argc, char **argv) {
const char *expr = "demo/example/**";
const char *value = "Get from C++";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload value", &value}});
const char *value = nullptr;
const char *target = "BEST_MATCHING";
const char *timeout = "10000";

KeyExpr keyexpr(expr);
Config config = parse_args(argc, argv, {}, {},
{{"-s", CmdArg{"Query selector (string)", &expr}},
{"-p", CmdArg{"Query payload (string)", &value}},
{"-t", CmdArg{"Query target (BEST_MATCHING | ALL | ALL_COMPLETE)", &target}},
{"-o", CmdArg{"Timeout in ms (number)", &timeout}}});
uint64_t timeout_ms = std::stoi(timeout);
QueryTarget query_target = parse_query_target(target);

Selector selector = parse_selector(expr);
std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));

std::cout << "Sending Query '" << expr << "'...\n";

#if __cpp_designated_initializers >= 201707L
auto replies = session.get(keyexpr, "", channels::FifoChannel(16),
{.target = QueryTarget::Z_QUERY_TARGET_ALL, .payload = value});
#else
Session::GetOptions options;
options.target = QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = value;
auto replies = session.get(keyexpr, "", channels::FifoChannel(16), std::move(options));
#endif
options.target = query_target;
if (value != nullptr) {
options.payload = value;
}
options.timeout_ms = timeout_ms;
auto replies = session.get(selector.key_expr, selector.parameters, channels::FifoChannel(16), std::move(options));

while (true) {
auto res = replies.try_recv();
Expand Down
14 changes: 5 additions & 9 deletions examples/universal/z_pub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
{{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}}
#endif
);

Expand All @@ -53,15 +53,14 @@ int _main(int argc, char **argv) {
std::cout << "Declaring Publisher on '" << keyexpr << "'..." << std::endl;
auto pub = session.declare_publisher(KeyExpr(keyexpr));

std::cout << "Publisher on '" << keyexpr << "' declared" << std::endl;
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
[](const MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
std::cout << "Publisher has matching subscribers." << std::endl;
} else {
std::cout << "No subscribers matched" << std::endl;
std::cout << "Publisher has NO MORE matching subscribers." << std::endl;
}
},
closures::none);
Expand All @@ -75,13 +74,10 @@ int _main(int argc, char **argv) {
ss << "[" << idx << "] " << value;
auto s = ss.str();
std::cout << "Putting Data ('" << keyexpr << "': '" << s << "')...\n";
#if __cpp_designated_initializers >= 201707L
pub.put(s, {.encoding = Encoding("text/plain")});
#else

auto put_options = Publisher::PutOptions{};
put_options.encoding = Encoding("text/plain");
pub.put(s, std::move(put_options));
#endif
}
return 0;
}
Expand Down
8 changes: 6 additions & 2 deletions examples/universal/z_queryable.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!";
const char *locator = nullptr;

int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *complete = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}},
{{"--complete", {CmdArg{"", &complete, true}}}});

printf("Opening session...\n");
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -64,7 +66,9 @@ int _main(int argc, char **argv) {

auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; };

auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable);
Session::QueryableOptions opts;
opts.complete = std::string(complete) == "true";
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts));

std::cout << "Press CTRL-C to quit...\n";
while (true) {
Expand Down
9 changes: 7 additions & 2 deletions examples/universal/z_queryable_attachment.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ const char *value = "Queryable from C++ zenoh-pico!";
#endif

int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}});
const char *complete = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &expr}, {"payload_value", &value}},
{{"--complete", {CmdArg{"", &complete, true}}}});

std::cout << "Opening session...\n";
auto session = Session::open(std::move(config));
Expand Down Expand Up @@ -71,7 +73,10 @@ int _main(int argc, char **argv) {
};

auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; };
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable);

Session::QueryableOptions opts;
opts.complete = std::string(complete) == "true";
auto queryable = session.declare_queryable(keyexpr, on_query, on_drop_queryable, std::move(opts));

printf("Press CTRL-C to quit...\n");
while (true) {
Expand Down
4 changes: 2 additions & 2 deletions examples/zenohc/z_pub_shm.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int _main(int argc, char **argv) {
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
{{"--add-matching-listener", CmdArg{"", &add_matching_listener, true}}}
#endif
);

Expand All @@ -49,7 +49,7 @@ int _main(int argc, char **argv) {
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
[](const MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
} else {
Expand Down
Loading
Loading