Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align examples and remove reading from stdin #359

Merged
merged 9 commits into from
Mar 21, 2024
3 changes: 1 addition & 2 deletions examples/arduino/z_pub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void setup() {
}

void loop() {
delay(1000);
char buf[256];
sprintf(buf, "[%4d] %s", idx++, VALUE);
Serial.print("Writing Data ('");
Expand All @@ -105,8 +106,6 @@ void loop() {
if (z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL) < 0) {
Serial.println("Error while publishing data");
}

delay(1000);
}
#else
void setup() {
Expand Down
6 changes: 5 additions & 1 deletion examples/arduino/z_pull.ino
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define KEYEXPR "demo/example/**"

z_owned_pull_subscriber_t sub;
int idx = 0;

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
Expand Down Expand Up @@ -106,7 +107,10 @@ void setup() {
}

void loop() {
delay(5000);
delay(1000);
char buf[256];
sprintf(buf, "[%4d] Pulling...", idx++);
Serial.println(buf);
z_subscriber_pull(z_pull_subscriber_loan(&sub));
}
#else
Expand Down
2 changes: 1 addition & 1 deletion examples/arduino/z_queryable.ino
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void setup() {
delay(300);
}

void loop() { delay(5000); }
void loop() { delay(1000); }

#else
void setup() {
Expand Down
2 changes: 1 addition & 1 deletion examples/arduino/z_sub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void setup() {
delay(300);
}

void loop() { delay(5000); }
void loop() { delay(1000); }

#else
void setup() {
Expand Down
5 changes: 3 additions & 2 deletions examples/espidf/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ void app_main() {
}
printf("OK!\n");

int idx = 0;
while (1) {
sleep(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
sleep(1);
printf("[%4d] Pulling...\n", idx++);
z_subscriber_pull(z_loan(sub));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/espidf/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void app_main() {
printf("Zenoh setup finished!\n");

while (1) {
sleep(5);
sleep(1);
}

printf("Closing Zenoh Session...");
Expand Down
2 changes: 1 addition & 1 deletion examples/espidf/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void app_main() {
printf("OK!\n");

while (1) {
sleep(5);
sleep(1);
}

printf("Closing Zenoh Session...");
Expand Down
3 changes: 2 additions & 1 deletion examples/freertos_plus_tcp/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define KEYEXPR "demo/example/zenoh-pico-pub"
#define VALUE "[FreeRTOS-Plus-TCP] Pub from Zenoh-Pico!"
#define N 10

void app_main(void) {
z_owned_config_t config = z_config_default();
Expand All @@ -54,7 +55,7 @@ void app_main(void) {

char *buf = (char *)pvPortMalloc(256);
z_clock_t now = z_clock_now();
for (int idx = 0; 1;) {
for (int idx = 0; idx < N;) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case after 10 publications the example will exit, however all other examples kepp publishing indefinitely.

if (z_clock_elapsed_ms(&now) > 1000) {
snprintf(buf, 256, "[%4d] %s", idx, VALUE);
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf);
Expand Down
5 changes: 3 additions & 2 deletions examples/freertos_plus_tcp/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ void app_main(void) {
return;
}

int idx = 0;
while (1) {
z_sleep_s(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
z_sleep_s(1);
printf("[%4d] Pulling...\n", idx++);
z_subscriber_pull(z_loan(sub));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/freertos_plus_tcp/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void app_main(void) {
}

while (1) {
z_sleep_s(5);
z_sleep_s(1);
}

z_undeclare_queryable(z_move(qable));
Expand Down
2 changes: 1 addition & 1 deletion examples/freertos_plus_tcp/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void app_main(void) {
}

while (1) {
z_sleep_s(5);
z_sleep_s(1);
}

z_undeclare_subscriber(z_move(sub));
Expand Down
7 changes: 6 additions & 1 deletion examples/freertos_plus_tcp/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
#endif

#define KEYEXPR "demo/example/**"
#define N 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should align the behaviour with all other bindings, I believe we don't wait for just 10 samples by default.

Copy link
Contributor Author

@oteffahi oteffahi Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other bindings do not implement single-threaded pub/sub examples, therefore we only need to align the configuration of these examples across their different plateform-specific implementations. In this PR, the configuration is as follows:

  • On unix (c11, c99): N must be provided as a CLI argument to run the example. This is also used in our CI to run the test with a known number of messages. N is initialized to int32:max and can be overwritten through CLI arguments.
  • On windows: since getopt is unavailable, N is initialized to int32::max.
  • On freertos: I understand that CLI argument-passing may not be possible on certain embedded targets that run on freertos, therefore N is initialized to 10 to avoid running the example indefinitely. Should we change it to int32::max ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the other comment on this file, I'll update all implementations to start with a default number of samples of int32:max, and allow unix implementations to overwrite it through CLI argument.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour of a z_pub in all languages is to publish indefinitely, so uint32::MAX is close enough :)


int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_drop(z_move(keystr));
msg_nb++;
}

void app_main(void) {
Expand All @@ -58,7 +62,8 @@ void app_main(void) {
return;
}

while (1) {
printf("Running until %d messages are received...\n", N);
while (msg_nb < N) {
zp_read(z_loan(s), NULL);
zp_send_keep_alive(z_loan(s), NULL);
zp_send_join(z_loan(s), NULL);
Expand Down
5 changes: 3 additions & 2 deletions examples/mbed/z_pull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ int main(int argc, char **argv) {
}
printf("OK!\n");

int idx = 0;
while (1) {
z_sleep_s(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
z_sleep_s(1);
printf("[%4d] Pulling...\n", idx++);
z_subscriber_pull(z_pull_subscriber_loan(&sub));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/mbed/z_queryable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ int main(int argc, char **argv) {
printf("Zenoh setup finished!\n");

while (1) {
z_sleep_s(5);
z_sleep_s(1);
}

printf("Closing Zenoh Session...");
Expand Down
2 changes: 1 addition & 1 deletion examples/mbed/z_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int main(int argc, char **argv) {
printf("OK!\n");

while (1) {
z_sleep_s(5);
z_sleep_s(1);
}

printf("Closing Zenoh Session...");
Expand Down
55 changes: 35 additions & 20 deletions examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,30 @@
// ZettaScale Zenoh Team, <[email protected]>

#include <ctype.h>
#include <errno.h>
#include <pthread.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1
#define QUERY_TIMEOUT 10 // query timeout in seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we'll have to revise this example when timeout is exposed by zenoh pico

#define handle_error_en(en, msg) \
do { \
errno = en; \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
pthread_cond_t cond;
pthread_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
pthread_cond_signal(&cond);
pthread_cond_destroy(&cond);
Copy link
Contributor

@jean-roland jean-roland Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the public sync mechanisms found in include/zenoh-pico/system/platform.h, this impacts all the following pthread_cond and pthread_mutex calls, see z_ping for an example of usage.

}

void reply_handler(z_owned_reply_t *reply, void *ctx) {
Expand Down Expand Up @@ -73,6 +87,9 @@ int main(int argc, char **argv) {
}
}

pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);

z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (clocator != NULL) {
Expand Down Expand Up @@ -102,27 +119,25 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter any key to pull data or 'q' to quit...\n");
char c = '\0';
while (1) {
fflush(stdin);
int ret = scanf("%c", &c);
(void)ret; // Remove unused result warning
if (c == 'q') {
break;
}

printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper);
if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) {
printf("Unable to send query.\n");
return -1;
}
pthread_mutex_lock(&mutex);
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper);
if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) {
printf("Unable to send query.\n");
return -1;
}
struct timespec query_timeout;
clock_gettime(CLOCK_REALTIME, &query_timeout);
Copy link
Contributor

@jean-roland jean-roland Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the public timer mechanisms found in include/zenoh-pico/system/platform.h, this impacts all the following clock calls, see z_ping for an example of usage.

query_timeout.tv_sec += QUERY_TIMEOUT;
int err = pthread_cond_timedwait(&cond, &mutex, &query_timeout);
if (err != 0) {
handle_error_en(err, "pthread_cond_timedwait");
}
pthread_mutex_unlock(&mutex);

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
10 changes: 6 additions & 4 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ int main(int argc, char **argv) {
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 10;
int n = 2147483647; // max int value by default

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
Expand Down Expand Up @@ -96,14 +96,16 @@ int main(int argc, char **argv) {
return -1;
}

printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx < n; ++idx) {
sleep(1);
(void)idx;
printf("Putting Data ('%s': '%s')...\n", keyexpr, value);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

z_publisher_put_options_t options = z_publisher_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), &options);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
}

z_undeclare_publisher(z_move(pub));
Expand Down
10 changes: 6 additions & 4 deletions examples/unix/c11/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int main(int argc, char **argv) {
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 10;
int n = 2147483647; // max int value by default

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
Expand Down Expand Up @@ -86,11 +86,13 @@ int main(int argc, char **argv) {
return -1;
}
// Main loop
printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx < n; idx++) {
sleep(1);
(void)idx;
printf("Putting Data ('%s': '%s')...\n", keyexpr, value);
z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), NULL);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL);

zp_read(z_loan(s), NULL);
zp_send_keep_alive(z_loan(s), NULL);
Expand Down
12 changes: 4 additions & 8 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,11 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter any key to pull data or 'q' to quit...\n");
char c = '\0';
printf("Press CTRL-C to quit...\n");
int idx = 0;
while (1) {
fflush(stdin);
int ret = scanf("%c", &c);
(void)ret; // Remove unused result warning
if (c == 'q') {
break;
}
sleep(1);
printf("[%4d] Pulling...\n", idx++);
z_subscriber_pull(z_loan(sub));
}

Expand Down
9 changes: 3 additions & 6 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,9 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter 'q' to quit...\n");
char c = '\0';
while (c != 'q') {
fflush(stdin);
int ret = scanf("%c", &c);
(void)ret; // Remove unused result warning
printf("Press CTRL-C to quit...\n");
while (1) {
sleep(1);
}

z_undeclare_queryable(z_move(qable));
Expand Down
9 changes: 3 additions & 6 deletions examples/unix/c11/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,9 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter 'q' to quit...\n");
char c = '\0';
while (c != 'q') {
fflush(stdin);
int ret = scanf("%c", &c);
(void)ret; // Remove unused result warning
printf("Press CTRL-C to quit...\n");
while (1) {
sleep(1);
}

z_undeclare_subscriber(z_move(sub));
Expand Down
Loading