From c57c58d056f60413cc8ab55ec39c3cba5a0d40ac Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Mon, 15 Apr 2024 17:16:50 +0800 Subject: [PATCH 1/2] Add RingBuffer support in zenoh-python. Signed-off-by: ChenYing Kuo --- Cargo.lock | 1 + Cargo.toml | 1 + docs/index.rst | 2 +- examples/README.md | 18 +++++++++ examples/z_pull.py | 92 ++++++++++++++++++++++++++++++++++++++++++++++ src/closures.rs | 27 ++++++++++++++ src/lib.rs | 1 + zenoh/__init__.py | 2 +- zenoh/closures.py | 49 +++++++++++++++++++++++- 9 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 examples/z_pull.py diff --git a/Cargo.lock b/Cargo.lock index 4248b85b..a664f3b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3334,6 +3334,7 @@ dependencies = [ "validated_struct", "zenoh", "zenoh-buffers", + "zenoh-collections", "zenoh-core", ] diff --git a/Cargo.toml b/Cargo.toml index c524fbcf..db39cfbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,3 +49,4 @@ validated_struct = "2.1.0" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes", features = ["unstable"], default-features = false } zenoh-buffers = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" } zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" } +zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" } diff --git a/docs/index.rst b/docs/index.rst index b115941c..df807cde 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -192,4 +192,4 @@ Timestamp :members: .. automodule:: zenoh - :members: Queue, ListCollector, Closure, Handler, IClosure, IHandler, IValue + :members: Queue, RingBuffer, ListCollector, Closure, Handler, IClosure, IHandler, IValue diff --git a/examples/README.md b/examples/README.md index 638c1c3e..cba85c2b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -88,6 +88,24 @@ or python3 z_sub.py -k 'demo/**' ``` +### z_pull + +Creates a subscriber with a key expression and using RingBuffer. +The subscriber will receive the last N messages matching the key +expression and will print the received key/value. + +Typical usage: + + ```bash + python3 z_pull.py + ``` + +or + + ```bash + python3 z_pull.py -k 'demo/**' --size 3 --interval 5 + ``` + ### z_get Sends a query message for a selector. diff --git a/examples/z_pull.py b/examples/z_pull.py new file mode 100644 index 00000000..1845e6dc --- /dev/null +++ b/examples/z_pull.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import time +import argparse +import json +import zenoh +from zenoh import Reliability + +# --- Command line argument parsing --- --- --- --- --- --- +parser = argparse.ArgumentParser( + prog='z_pull', + description='zenoh pull example') +parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') +parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') +parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') +parser.add_argument('--key', '-k', dest='key', + default='demo/example/**', + type=str, + help='The key expression matching resources to pull.') +parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') +parser.add_argument('--size', dest='size', + default=3, + type=int, + help='The size of the ringbuffer') +parser.add_argument('--interval', dest='interval', + default=5.0, + type=float, + help='The interval for pulling the ringbuffer') + +args = parser.parse_args() +conf = zenoh.Config.from_file( + args.config) if args.config is not None else zenoh.Config() +if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) +if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) +if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) +key = args.key + +# Zenoh code --- --- --- --- --- --- --- --- --- --- --- + + +def main(): + # initiate logging + zenoh.init_logger() + + print("Opening session...") + session = zenoh.open(conf) + + print("Declaring Subscriber on '{}'...".format(key)) + # Subscriber doesn't receive messages over the RingBuffer size. + # The oldest message is overwritten by the latest one. + sub = session.declare_subscriber(key, zenoh.RingBuffer(args.size), reliability=Reliability.RELIABLE()) + + print("Press CTRL-C to quit...") + while True: + time.sleep(args.interval) + for sample in sub.receiver: + if sample is not None: + print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')") + + sub.undeclare() + session.close() + +main() \ No newline at end of file diff --git a/src/closures.rs b/src/closures.rs index 3da7e6e6..7ab1d934 100644 --- a/src/closures.rs +++ b/src/closures.rs @@ -21,6 +21,7 @@ use pyo3::{ types::{PyList, PyTuple}, }; use zenoh::handlers::IntoHandler; +use zenoh_collections::RingBuffer as RingBufferInner; trait CallbackUnwrap { type Output; @@ -184,3 +185,29 @@ impl _Queue { self.send.lock().unwrap().is_none() } } + +#[pyclass(subclass)] +pub struct _RingBuffer { + ring: Arc>>, +} +#[pymethods] +impl _RingBuffer { + #[new] + pub fn pynew(capacity: usize) -> Self { + _RingBuffer { + ring: Arc::new(Mutex::new(RingBufferInner::new(capacity))), + } + } + pub fn push_force(&self, value: PyObject, py: Python<'_>) -> PyResult<()> { + Python::allow_threads(py, || { + self.ring.lock().unwrap().push_force(value); + Ok(()) + }) + } + pub fn pull(&self, py: Python<'_>) -> PyResult { + Python::allow_threads(py, || match self.ring.lock().unwrap().pull() { + Some(value) => Ok(value), + None => Err(pyo3::exceptions::PyStopIteration::new_err(())), + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index ef42ad00..642a9c5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,6 +64,7 @@ impl PyExtract for Bound<'_, PyDict> { fn zenoh(_py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/zenoh/__init__.py b/zenoh/__init__.py index a66ffcff..bb0e8c44 100644 --- a/zenoh/__init__.py +++ b/zenoh/__init__.py @@ -17,7 +17,7 @@ from .session import Session, Publisher, Subscriber, Info from .enums import CongestionControl, Encoding, Priority, QueryConsolidation, QueryTarget, Reliability, SampleKind from .value import into_payload, from_payload, Hello, Sample, ZenohId, Timestamp, Reply -from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue +from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue, RingBuffer from .queryable import Queryable, Query from typing import Any diff --git a/zenoh/closures.py b/zenoh/closures.py index dda93b73..aad9700f 100644 --- a/zenoh/closures.py +++ b/zenoh/closures.py @@ -17,7 +17,7 @@ from collections import deque import time -from .zenoh import _Queue +from .zenoh import _Queue, _RingBuffer In = TypeVar("In") Out = TypeVar("Out") @@ -249,6 +249,53 @@ def __iter__(self): def __next__(self): return self.get() + +class RingBuffer(IHandler[In, None, 'RingBuffer'], Generic[In]): + """ + A binding for Zenoh ringbuffer implementation + + When used as a handler, it provides itself as the receiver, and will provide a + callback that appends elements to the ringbuffer. + + Users can assign ``capacity`` to RingBuffer, which means the oldest message will + be overwritten if the number is over the capacity. + """ + def __init__(self, capacity: int = 10): + self.__inner__ = _RingBuffer(capacity) + + @property + def closure(self) -> IClosure[In, None]: + def call(x): self.push_force(x) + # We don't need drop while using ringbuffer + return Closure((call, None)) + + @property + def receiver(self) -> 'RingBuffer': + return self + + def push_force(self, value): + """ + Push one element into ringbuffer. + + If the capacity is full, the oldest one will be overwritten. + """ + self.__inner__.push_force(value) + + def pull(self): + """ + Gets one element from the ringbuffer. + + Raise a ``StopIteration`` exception if the ringbuffer is empty. + """ + return self.__inner__.pull() + + def __iter__(self): + return self + + def __next__(self): + return self.pull() + + if __name__ == "__main__": def get(collector): import time From ea97c33f2c37f8518cf0b6be3e502835f3f9e506 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Tue, 16 Apr 2024 14:16:01 +0800 Subject: [PATCH 2/2] Add test code for z_pull. Signed-off-by: ChenYing Kuo --- tests/examples_check.py | 80 ++++++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 17 deletions(-) diff --git a/tests/examples_check.py b/tests/examples_check.py index 8a43edbf..998fcc7a 100644 --- a/tests/examples_check.py +++ b/tests/examples_check.py @@ -70,6 +70,8 @@ def time(self): errors = [] +# Test z_info & z_scout +print("=> Test z_info & z_scout") info = Pyrun("z_info.py") if info.status(): info.dbg() @@ -79,19 +81,12 @@ def time(self): scout.dbg() errors.append(scout.status()) -storage = Pyrun("z_storage.py") -sub = Pyrun("z_sub.py") -time.sleep(1) -put = Pyrun("z_put.py") -if put.status(): - put.dbg() - errors.append(put.status()) -time.sleep(1) -pub = Pyrun("z_pub.py", ["--iter=2"]) -time.sleep(4) - +# Test z_get & z_queryable +print("=> Test z_get & z_queryable") +## Run z_queryable queryable = Pyrun("z_queryable.py", ["-k=demo/example/zenoh-python-queryable"]) time.sleep(1) +## z_get: Able to get reply from queryable get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-queryable"]) if get.status(): get.dbg() @@ -100,7 +95,7 @@ def time(self): get.dbg() queryable.dbg() errors.append("z_get didn't get a response from z_queryable") - +## Stop z_queryable queryable.interrupt() if queryable.status(KILL): queryable.dbg() @@ -111,7 +106,21 @@ def time(self): if any(("z_queryable" in error) for error in errors): queryable.dbg() +# Test z_storage & z_sub +print("=> Test z_storage & z_sub") +storage = Pyrun("z_storage.py") +sub = Pyrun("z_sub.py") +time.sleep(1) +## z_put: Put one message (to storage & sub) +put = Pyrun("z_put.py") +if put.status(): + put.dbg() + errors.append(put.status()) time.sleep(1) +## z_pub: Put two messages (to storage & sub) +pub = Pyrun("z_pub.py", ["--iter=2"]) +time.sleep(4) +## z_get: Able to get put from storage get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-put"]) if get.status(): get.dbg() @@ -121,14 +130,14 @@ def time(self): errors.append("z_get didn't get a response from z_storage about put") if any(("z_get" in error) for error in errors): get.dbg() - time.sleep(1) +## z_delete: Delete put in storage delete = Pyrun("z_delete.py") if delete.status(): delete.dbg() errors.append(delete.status()) - time.sleep(1) +## z_get: Unable to get put from storage get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-put"]) if get.status(): get.dbg() @@ -138,7 +147,8 @@ def time(self): errors.append("z_get did get a response from z_storage about put after delete") if any(("z_get" in error) for error in errors): get.dbg() - +time.sleep(1) +## z_sub: Should receive put, pub and delete sub.interrupt() if sub.status(KILL): sub.dbg() @@ -152,7 +162,7 @@ def time(self): errors.append("z_sub didn't catch delete") if any(("z_sub" in error) for error in errors): sub.dbg() - +## z_storage: Should receive put, pub, delete, and query storage.interrupt() if storage.status(KILL): storage.dbg() @@ -169,6 +179,40 @@ def time(self): if any(("z_storage" in error) for error in errors): storage.dbg() +# Test z_pull & s_sub_queued +print("=> Test z_pull & z_sub_queued") +## Run z_pull and z_sub_queued +sub_queued = Pyrun("z_sub_queued.py") +time.sleep(1) +pull = Pyrun("z_pull.py", ["--size=1", "--interval=5"]) +time.sleep(1) +## z_pub: Put two messages (to storage & sub) +pub = Pyrun("z_pub.py", ["--iter=2"]) +time.sleep(4) +## z_sub_queued: Should receive two messages +sub_queued.interrupt() +if sub_queued.status(KILL): + sub_queued.dbg() + errors.append(sub_queued.status(KILL)) +sub_queued_out = "".join(sub_queued.stdout) +if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 0] Pub from Python!')" in sub_queued_out): + errors.append("z_sub_queued didn't catch the first z_pub") +if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 1] Pub from Python!')" in sub_queued_out): + errors.append("z_sub_queued didn't catch the second z_pub") +if any(("z_sub_queued" in error) for error in errors): + sub_queued.dbg() +## z_pull: Should only receive the last messages +pull.interrupt() +pullout = "".join(pull.stdout) +if ("Received PUT ('demo/example/zenoh-python-pub': '[ 0] Pub from Python!')" in pullout): + errors.append("z_pull shouldn't catch the old z_pub") +if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 1] Pub from Python!')" in pullout): + errors.append("z_pull didn't catch the last z_pub") +if any(("z_pull" in error) for error in errors): + pull.dbg() + +# Test z_sub_thr & z_pub_thr +print("=> Test z_sub_thr & z_pub_thr") sub_thr = Pyrun("z_sub_thr.py") pub_thr = Pyrun("z_pub_thr.py", ["128"]) time.sleep(5) @@ -184,4 +228,6 @@ def time(self): if len(errors): message = f"Found {len(errors)} errors: {(ret+tab) + (ret+tab).join(errors)}" - raise Exception(message) \ No newline at end of file + raise Exception(message) +else: + print("Pass examples_check") \ No newline at end of file