Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RingBuffer support in zenoh-python #1

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ validated_struct = "2.1.0"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes", features = ["unstable"], default-features = false }
zenoh-buffers = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" }
zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" }
zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "protocol_changes" }
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,4 @@ Timestamp
:members:

.. automodule:: zenoh
:members: Queue, ListCollector, Closure, Handler, IClosure, IHandler, IValue
:members: Queue, RingBuffer, ListCollector, Closure, Handler, IClosure, IHandler, IValue
18 changes: 18 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ or
python3 z_sub.py -k 'demo/**'
```

### z_pull

Creates a subscriber with a key expression and using RingBuffer.
The subscriber will receive the last N messages matching the key
expression and will print the received key/value.

Typical usage:

```bash
python3 z_pull.py
```

or

```bash
python3 z_pull.py -k 'demo/**' --size 3 --interval 5
```

### z_get

Sends a query message for a selector.
Expand Down
92 changes: 92 additions & 0 deletions examples/z_pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright (c) 2022 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <[email protected]>
#

import time
import argparse
import json
import zenoh
from zenoh import Reliability

# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='z_pull',
description='zenoh pull example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to listen on.')
parser.add_argument('--key', '-k', dest='key',
default='demo/example/**',
type=str,
help='The key expression matching resources to pull.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
parser.add_argument('--size', dest='size',
default=3,
type=int,
help='The size of the ringbuffer')
parser.add_argument('--interval', dest='interval',
default=5.0,
type=float,
help='The interval for pulling the ringbuffer')

args = parser.parse_args()
conf = zenoh.Config.from_file(
args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key

# Zenoh code --- --- --- --- --- --- --- --- --- --- ---


def main():
# initiate logging
zenoh.init_logger()

print("Opening session...")
session = zenoh.open(conf)

print("Declaring Subscriber on '{}'...".format(key))
# Subscriber doesn't receive messages over the RingBuffer size.
# The oldest message is overwritten by the latest one.
sub = session.declare_subscriber(key, zenoh.RingBuffer(args.size), reliability=Reliability.RELIABLE())

print("Press CTRL-C to quit...")
while True:
time.sleep(args.interval)
for sample in sub.receiver:
if sample is not None:
print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')")

sub.undeclare()
session.close()

main()
27 changes: 27 additions & 0 deletions src/closures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use pyo3::{
types::{PyList, PyTuple},
};
use zenoh::handlers::IntoHandler;
use zenoh_collections::RingBuffer as RingBufferInner;

trait CallbackUnwrap {
type Output;
Expand Down Expand Up @@ -184,3 +185,29 @@ impl _Queue {
self.send.lock().unwrap().is_none()
}
}

#[pyclass(subclass)]
pub struct _RingBuffer {
ring: Arc<Mutex<RingBufferInner<PyObject>>>,
}
#[pymethods]
impl _RingBuffer {
#[new]
pub fn pynew(capacity: usize) -> Self {
_RingBuffer {
ring: Arc::new(Mutex::new(RingBufferInner::new(capacity))),
}
}
pub fn push_force(&self, value: PyObject, py: Python<'_>) -> PyResult<()> {
Python::allow_threads(py, || {
self.ring.lock().unwrap().push_force(value);
Ok(())
})
}
pub fn pull(&self, py: Python<'_>) -> PyResult<PyObject> {
Python::allow_threads(py, || match self.ring.lock().unwrap().pull() {
Some(value) => Ok(value),
None => Err(pyo3::exceptions::PyStopIteration::new_err(())),
})
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl<K: ToPyObject> PyExtract<K> for Bound<'_, PyDict> {
fn zenoh(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<config::_Config>()?;
m.add_class::<closures::_Queue>()?;
m.add_class::<closures::_RingBuffer>()?;
m.add_class::<keyexpr::_KeyExpr>()?;
m.add_class::<keyexpr::_Selector>()?;
m.add_class::<session::_Session>()?;
Expand Down
80 changes: 63 additions & 17 deletions tests/examples_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def time(self):

errors = []

# Test z_info & z_scout
print("=> Test z_info & z_scout")
info = Pyrun("z_info.py")
if info.status():
info.dbg()
Expand All @@ -79,19 +81,12 @@ def time(self):
scout.dbg()
errors.append(scout.status())

storage = Pyrun("z_storage.py")
sub = Pyrun("z_sub.py")
time.sleep(1)
put = Pyrun("z_put.py")
if put.status():
put.dbg()
errors.append(put.status())
time.sleep(1)
pub = Pyrun("z_pub.py", ["--iter=2"])
time.sleep(4)

# Test z_get & z_queryable
print("=> Test z_get & z_queryable")
## Run z_queryable
queryable = Pyrun("z_queryable.py", ["-k=demo/example/zenoh-python-queryable"])
time.sleep(1)
## z_get: Able to get reply from queryable
get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-queryable"])
if get.status():
get.dbg()
Expand All @@ -100,7 +95,7 @@ def time(self):
get.dbg()
queryable.dbg()
errors.append("z_get didn't get a response from z_queryable")

## Stop z_queryable
queryable.interrupt()
if queryable.status(KILL):
queryable.dbg()
Expand All @@ -111,7 +106,21 @@ def time(self):
if any(("z_queryable" in error) for error in errors):
queryable.dbg()

# Test z_storage & z_sub
print("=> Test z_storage & z_sub")
storage = Pyrun("z_storage.py")
sub = Pyrun("z_sub.py")
time.sleep(1)
## z_put: Put one message (to storage & sub)
put = Pyrun("z_put.py")
if put.status():
put.dbg()
errors.append(put.status())
time.sleep(1)
## z_pub: Put two messages (to storage & sub)
pub = Pyrun("z_pub.py", ["--iter=2"])
time.sleep(4)
## z_get: Able to get put from storage
get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-put"])
if get.status():
get.dbg()
Expand All @@ -121,14 +130,14 @@ def time(self):
errors.append("z_get didn't get a response from z_storage about put")
if any(("z_get" in error) for error in errors):
get.dbg()

time.sleep(1)
## z_delete: Delete put in storage
delete = Pyrun("z_delete.py")
if delete.status():
delete.dbg()
errors.append(delete.status())

time.sleep(1)
## z_get: Unable to get put from storage
get = Pyrun("z_get.py", ["-s=demo/example/zenoh-python-put"])
if get.status():
get.dbg()
Expand All @@ -138,7 +147,8 @@ def time(self):
errors.append("z_get did get a response from z_storage about put after delete")
if any(("z_get" in error) for error in errors):
get.dbg()

time.sleep(1)
## z_sub: Should receive put, pub and delete
sub.interrupt()
if sub.status(KILL):
sub.dbg()
Expand All @@ -152,7 +162,7 @@ def time(self):
errors.append("z_sub didn't catch delete")
if any(("z_sub" in error) for error in errors):
sub.dbg()

## z_storage: Should receive put, pub, delete, and query
storage.interrupt()
if storage.status(KILL):
storage.dbg()
Expand All @@ -169,6 +179,40 @@ def time(self):
if any(("z_storage" in error) for error in errors):
storage.dbg()

# Test z_pull & s_sub_queued
print("=> Test z_pull & z_sub_queued")
## Run z_pull and z_sub_queued
sub_queued = Pyrun("z_sub_queued.py")
time.sleep(1)
pull = Pyrun("z_pull.py", ["--size=1", "--interval=5"])
time.sleep(1)
## z_pub: Put two messages (to storage & sub)
pub = Pyrun("z_pub.py", ["--iter=2"])
time.sleep(4)
## z_sub_queued: Should receive two messages
sub_queued.interrupt()
if sub_queued.status(KILL):
sub_queued.dbg()
errors.append(sub_queued.status(KILL))
sub_queued_out = "".join(sub_queued.stdout)
if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 0] Pub from Python!')" in sub_queued_out):
errors.append("z_sub_queued didn't catch the first z_pub")
if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 1] Pub from Python!')" in sub_queued_out):
errors.append("z_sub_queued didn't catch the second z_pub")
if any(("z_sub_queued" in error) for error in errors):
sub_queued.dbg()
## z_pull: Should only receive the last messages
pull.interrupt()
pullout = "".join(pull.stdout)
if ("Received PUT ('demo/example/zenoh-python-pub': '[ 0] Pub from Python!')" in pullout):
errors.append("z_pull shouldn't catch the old z_pub")
if not ("Received PUT ('demo/example/zenoh-python-pub': '[ 1] Pub from Python!')" in pullout):
errors.append("z_pull didn't catch the last z_pub")
if any(("z_pull" in error) for error in errors):
pull.dbg()

# Test z_sub_thr & z_pub_thr
print("=> Test z_sub_thr & z_pub_thr")
sub_thr = Pyrun("z_sub_thr.py")
pub_thr = Pyrun("z_pub_thr.py", ["128"])
time.sleep(5)
Expand All @@ -184,4 +228,6 @@ def time(self):

if len(errors):
message = f"Found {len(errors)} errors: {(ret+tab) + (ret+tab).join(errors)}"
raise Exception(message)
raise Exception(message)
else:
print("Pass examples_check")
2 changes: 1 addition & 1 deletion zenoh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .session import Session, Publisher, Subscriber, Info
from .enums import CongestionControl, Encoding, Priority, QueryConsolidation, QueryTarget, Reliability, SampleKind
from .value import into_payload, from_payload, Hello, Sample, ZenohId, Timestamp, Reply
from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue
from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue, RingBuffer
from .queryable import Queryable, Query
from typing import Any

Expand Down
Loading