Skip to content

Commit

Permalink
Feature/priority in sample (#145)
Browse files Browse the repository at this point in the history
* add support for qos in sample

* fixed sample new

* remove empty line

* update for 1-byte qos with accessors

* add default trait to _QoS to make clippy happy

* fix zenoh branch to main
  • Loading branch information
DenisBiryukov91 authored Feb 23, 2024
1 parent 043ab84 commit 352a1d7
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl _Encoding {
}

#[pyclass(subclass)]
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct _Priority(pub(crate) Priority);
#[pymethods]
impl _Priority {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl core::fmt::Debug for _SampleKind {
}

#[pyclass(subclass)]
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct _CongestionControl(pub(crate) CongestionControl);
#[pymethods]
impl _CongestionControl {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn zenoh(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<queryable::_Queryable>()?;
m.add_class::<value::_Value>()?;
m.add_class::<value::_Sample>()?;
m.add_class::<value::_QoS>()?;
m.add_class::<value::_Reply>()?;
m.add_class::<value::_Timestamp>()?;
m.add_class::<value::_Hello>()?;
Expand Down
42 changes: 41 additions & 1 deletion src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use uhlc::Timestamp;
use zenoh::{
prelude::{Encoding, KeyExpr, Sample, Value, ZenohId},
query::Reply,
sample::QoS,
scouting::Hello,
};
use zenoh_buffers::{
Expand All @@ -23,7 +24,7 @@ use zenoh_buffers::{
};

use crate::{
enums::{_Encoding, _SampleKind},
enums::{_CongestionControl, _Encoding, _Priority, _SampleKind},
keyexpr::_KeyExpr,
ToPyErr,
};
Expand Down Expand Up @@ -159,13 +160,42 @@ impl PyAnyToValue for &PyAny {
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug, Default)]
pub struct _QoS(pub(crate) QoS);

#[pymethods]
impl _QoS {
#[new]
pub fn pynew(this: Self) -> Self {
this
}
#[getter]
pub fn priority(&self) -> _Priority {
_Priority(self.0.priority())
}
#[getter]
pub fn congestion_control(&self) -> _CongestionControl {
_CongestionControl(self.0.congestion_control())
}
#[getter]
pub fn express(&self) -> bool {
self.0.express()
}
#[staticmethod]
pub fn new() -> Self {
Self::default()
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Sample {
key_expr: KeyExpr<'static>,
value: _Value,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
qos: _QoS,
}
impl From<Sample> for _Sample {
fn from(sample: Sample) -> Self {
Expand All @@ -174,13 +204,15 @@ impl From<Sample> for _Sample {
value,
kind,
timestamp,
qos,
..
} = sample;
_Sample {
key_expr,
value: value.into(),
kind: _SampleKind(kind),
timestamp: timestamp.map(_Timestamp),
qos: _QoS(qos),
}
}
}
Expand Down Expand Up @@ -267,6 +299,10 @@ impl _Sample {
_Encoding(self.value.encoding.clone())
}
#[getter]
pub fn qos(&self) -> _QoS {
self.qos.clone()
}
#[getter]
pub fn kind(&self) -> _SampleKind {
self.kind.clone()
}
Expand All @@ -278,12 +314,14 @@ impl _Sample {
pub fn new(
key_expr: _KeyExpr,
value: _Value,
qos: _QoS,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
) -> Self {
_Sample {
key_expr: key_expr.0,
value,
qos,
kind,
timestamp,
}
Expand All @@ -300,10 +338,12 @@ impl From<_Sample> for Sample {
value,
kind,
timestamp,
qos,
} = sample;
let mut sample = Sample::new(key_expr, value);
sample.kind = kind.0;
sample.timestamp = timestamp.map(|t| t.0);
sample.qos = qos.0;
sample
}
}
Expand Down
52 changes: 48 additions & 4 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import zenoh
import json
from zenoh import Session, Query, Sample
from zenoh import Session, Query, Sample, Priority, CongestionControl
from typing import List, Tuple
import time

Expand Down Expand Up @@ -28,9 +28,9 @@ def open_session(endpoints: List[str]) -> Tuple[Session, Session]:


def close_session(peer01: Session, peer02: Session):
print("[ ][01d] Closing peer01 session");
print("[ ][01e] Closing peer01 session");
peer01.close()
print("[ ][02d] Closing peer02 session");
print("[ ][02e] Closing peer02 session");
peer02.close()


Expand Down Expand Up @@ -81,8 +81,52 @@ def queryable_callback(query: Query):
queryable.undeclare()


def run_session_pubsub(peer01: Session, peer02: Session):
keyexpr = "test_pub/session"
msg = 'Pub Message'.encode()

num_received = 0
num_errors = 0

def sub_callback(sample: Sample):
nonlocal num_received
nonlocal num_errors
if sample.key_expr != keyexpr \
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
num_errors += 1
num_received += 1

print("[PS][01d] Publisher on peer01 session");
publisher = peer01.declare_publisher(
keyexpr,
Priority.DATA_HIGH(),
CongestionControl.BLOCK()
)
time.sleep(SLEEP)

print(f"[PS][02d] Subscriber on peer02 session. {MSG_COUNT} msgs.")
subscriber = peer02.declare_subscriber(keyexpr, sub_callback)
time.sleep(SLEEP)

for _ in range(0, MSG_COUNT):
publisher.put('Pub Message')

time.sleep(SLEEP)
print(f"[PS][02d] Received on peer02 session. {num_received}/{MSG_COUNT} msgs.");
assert num_received == MSG_COUNT
assert num_errors == 0

print("[PS][03d] Undeclare publisher on peer01 session");
publisher.undeclare()
print("[PS][04d] Undeclare subscriber on peer02 session");
subscriber.undeclare()


def test_session():
zenoh.init_logger()
(peer01, peer02) = open_session(["tcp/127.0.0.1:17447"])
run_session_qryrep(peer01, peer02)
close_session(peer01, peer02)
run_session_pubsub(peer01, peer02)
close_session(peer01, peer02)
6 changes: 5 additions & 1 deletion zenoh/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Priority(_Priority):
They are ordered à la Linux priority:
``Priority.REAL_TIME() < Priority.INTERACTIVE_HIGH() < Priority.INTERACTIVE_LOW() < Priority.DATA() < Priority.BACKGROUND()``
"""
def __new__(cls, inner: _SampleKind):
def __new__(cls, inner: _Priority):
return super().__new__(cls, inner)
@staticmethod
def REAL_TIME() -> 'Priority':
Expand Down Expand Up @@ -57,6 +57,8 @@ def __gt__(self, other) -> bool:
return super().__gt__(other)
def __ge__(self, other) -> bool:
return super().__ge__(other)

Priority.DEFAULT = Priority.DATA()

class SampleKind(_SampleKind):
"Similar to an HTTP METHOD: only PUT and DELETE are currently supported."
Expand Down Expand Up @@ -94,6 +96,8 @@ def __eq__(self, other) -> bool:
return super().__eq__(other)
def __ne__(self, other) -> bool:
return not self.__eq__(other)

CongestionControl.DEFAULT = CongestionControl.DROP()

class Encoding(_Encoding):
def __new__(cls, inner: _Encoding):
Expand Down
39 changes: 35 additions & 4 deletions zenoh/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from typing import Union, Tuple, Optional, List
import json

from .enums import Encoding, SampleKind
from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello
from .enums import Encoding, SampleKind, Priority, CongestionControl
from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello, _QoS
from .keyexpr import KeyExpr, IntoKeyExpr

class IValue:
Expand Down Expand Up @@ -129,15 +129,42 @@ def seconds_since_unix_epoch(self) -> float:
"""
return super().seconds_since_unix_epoch

class QoS(_QoS):
"""
Quality of Service settings.
"""
def __new__(cls):
return super().new()
@property
def priority(self) -> Priority:
"Priority"
return Priority(super().priority)
@property
def congestion_control(self) -> CongestionControl:
"Congestion control"
return CongestionControl(super().congestion_control)
@property
def express(self) -> bool:
"Express flag: if True, the message is not batched during transmission, in order to reduce latency."
return super().express
@staticmethod
def _upgrade_(inner: _QoS) -> 'QoS':
if isinstance(inner, QoS):
return inner
return _QoS.__new__(QoS, inner)

QoS.DEFAULT = QoS()


IntoSample = Union[_Sample, Tuple[IntoKeyExpr, IntoValue, SampleKind], Tuple[KeyExpr, IntoValue]]
class Sample(_Sample):
"""
A KeyExpr-Value pair, annotated with the kind (PUT or DELETE) of publication used to emit it and a timestamp.
"""
def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, timestamp: Timestamp = None):
def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, qos:QoS = None, timestamp: Timestamp = None):
kind = _SampleKind.PUT if kind is None else kind
return Sample._upgrade_(super().new(KeyExpr(key), Value(value), kind, timestamp))
qos = QoS.DEFAULT if qos is None else qos
return Sample._upgrade_(super().new(KeyExpr(key), Value(value), qos, kind, timestamp))
@property
def key_expr(self) -> KeyExpr:
"The sample's key expression"
Expand All @@ -163,6 +190,10 @@ def timestamp(self) -> Optional[Timestamp]:
"The sample's timestamp. May be None."
ts = super().timestamp
return None if ts is None else Timestamp._upgrade_(ts)
@property
def qos(self) -> QoS:
"Quality of service settings the sample was sent with"
return QoS._upgrade_(super().qos)
@staticmethod
def _upgrade_(inner: _Sample) -> 'Sample':
if isinstance(inner, Sample):
Expand Down

0 comments on commit 352a1d7

Please sign in to comment.