diff --git a/src/enums.rs b/src/enums.rs index 70651ef8..d6da1bb6 100644 --- a/src/enums.rs +++ b/src/enums.rs @@ -103,7 +103,7 @@ impl _Encoding { } #[pyclass(subclass)] -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct _Priority(pub(crate) Priority); #[pymethods] impl _Priority { @@ -187,7 +187,7 @@ impl core::fmt::Debug for _SampleKind { } #[pyclass(subclass)] -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct _CongestionControl(pub(crate) CongestionControl); #[pymethods] impl _CongestionControl { diff --git a/src/lib.rs b/src/lib.rs index 368398ed..fb4cc3ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ fn zenoh(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/value.rs b/src/value.rs index 10994de5..33157866 100644 --- a/src/value.rs +++ b/src/value.rs @@ -15,6 +15,7 @@ use uhlc::Timestamp; use zenoh::{ prelude::{Encoding, KeyExpr, Sample, Value, ZenohId}, query::Reply, + sample::QoS, scouting::Hello, }; use zenoh_buffers::{ @@ -23,7 +24,7 @@ use zenoh_buffers::{ }; use crate::{ - enums::{_Encoding, _SampleKind}, + enums::{_CongestionControl, _Encoding, _Priority, _SampleKind}, keyexpr::_KeyExpr, ToPyErr, }; @@ -159,6 +160,34 @@ impl PyAnyToValue for &PyAny { } } +#[pyclass(subclass)] +#[derive(Clone, Debug, Default)] +pub struct _QoS(pub(crate) QoS); + +#[pymethods] +impl _QoS { + #[new] + pub fn pynew(this: Self) -> Self { + this + } + #[getter] + pub fn priority(&self) -> _Priority { + _Priority(self.0.priority()) + } + #[getter] + pub fn congestion_control(&self) -> _CongestionControl { + _CongestionControl(self.0.congestion_control()) + } + #[getter] + pub fn express(&self) -> bool { + self.0.express() + } + #[staticmethod] + pub fn new() -> Self { + Self::default() + } +} + #[pyclass(subclass)] #[derive(Clone, Debug)] pub struct _Sample { @@ -166,6 +195,7 @@ pub struct _Sample { value: _Value, kind: _SampleKind, timestamp: Option<_Timestamp>, + qos: _QoS, } impl From for _Sample { fn from(sample: Sample) -> Self { @@ -174,6 +204,7 @@ impl From for _Sample { value, kind, timestamp, + qos, .. } = sample; _Sample { @@ -181,6 +212,7 @@ impl From for _Sample { value: value.into(), kind: _SampleKind(kind), timestamp: timestamp.map(_Timestamp), + qos: _QoS(qos), } } } @@ -267,6 +299,10 @@ impl _Sample { _Encoding(self.value.encoding.clone()) } #[getter] + pub fn qos(&self) -> _QoS { + self.qos.clone() + } + #[getter] pub fn kind(&self) -> _SampleKind { self.kind.clone() } @@ -278,12 +314,14 @@ impl _Sample { pub fn new( key_expr: _KeyExpr, value: _Value, + qos: _QoS, kind: _SampleKind, timestamp: Option<_Timestamp>, ) -> Self { _Sample { key_expr: key_expr.0, value, + qos, kind, timestamp, } @@ -300,10 +338,12 @@ impl From<_Sample> for Sample { value, kind, timestamp, + qos, } = sample; let mut sample = Sample::new(key_expr, value); sample.kind = kind.0; sample.timestamp = timestamp.map(|t| t.0); + sample.qos = qos.0; sample } } diff --git a/tests/test_session.py b/tests/test_session.py index 3890a737..4833323f 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,6 +1,6 @@ import zenoh import json -from zenoh import Session, Query, Sample +from zenoh import Session, Query, Sample, Priority, CongestionControl from typing import List, Tuple import time @@ -28,9 +28,9 @@ def open_session(endpoints: List[str]) -> Tuple[Session, Session]: def close_session(peer01: Session, peer02: Session): - print("[ ][01d] Closing peer01 session"); + print("[ ][01e] Closing peer01 session"); peer01.close() - print("[ ][02d] Closing peer02 session"); + print("[ ][02e] Closing peer02 session"); peer02.close() @@ -81,8 +81,52 @@ def queryable_callback(query: Query): queryable.undeclare() +def run_session_pubsub(peer01: Session, peer02: Session): + keyexpr = "test_pub/session" + msg = 'Pub Message'.encode() + + num_received = 0 + num_errors = 0 + + def sub_callback(sample: Sample): + nonlocal num_received + nonlocal num_errors + if sample.key_expr != keyexpr \ + or sample.qos.priority != Priority.DATA_HIGH() \ + or sample.qos.congestion_control != CongestionControl.BLOCK() \ + or sample.payload != msg: + num_errors += 1 + num_received += 1 + + print("[PS][01d] Publisher on peer01 session"); + publisher = peer01.declare_publisher( + keyexpr, + Priority.DATA_HIGH(), + CongestionControl.BLOCK() + ) + time.sleep(SLEEP) + + print(f"[PS][02d] Subscriber on peer02 session. {MSG_COUNT} msgs.") + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + for _ in range(0, MSG_COUNT): + publisher.put('Pub Message') + + time.sleep(SLEEP) + print(f"[PS][02d] Received on peer02 session. {num_received}/{MSG_COUNT} msgs."); + assert num_received == MSG_COUNT + assert num_errors == 0 + + print("[PS][03d] Undeclare publisher on peer01 session"); + publisher.undeclare() + print("[PS][04d] Undeclare subscriber on peer02 session"); + subscriber.undeclare() + + def test_session(): zenoh.init_logger() (peer01, peer02) = open_session(["tcp/127.0.0.1:17447"]) run_session_qryrep(peer01, peer02) - close_session(peer01, peer02) + run_session_pubsub(peer01, peer02) + close_session(peer01, peer02) \ No newline at end of file diff --git a/zenoh/enums.py b/zenoh/enums.py index 8f5da667..1d9281cb 100644 --- a/zenoh/enums.py +++ b/zenoh/enums.py @@ -20,7 +20,7 @@ class Priority(_Priority): They are ordered à la Linux priority: ``Priority.REAL_TIME() < Priority.INTERACTIVE_HIGH() < Priority.INTERACTIVE_LOW() < Priority.DATA() < Priority.BACKGROUND()`` """ - def __new__(cls, inner: _SampleKind): + def __new__(cls, inner: _Priority): return super().__new__(cls, inner) @staticmethod def REAL_TIME() -> 'Priority': @@ -57,6 +57,8 @@ def __gt__(self, other) -> bool: return super().__gt__(other) def __ge__(self, other) -> bool: return super().__ge__(other) + +Priority.DEFAULT = Priority.DATA() class SampleKind(_SampleKind): "Similar to an HTTP METHOD: only PUT and DELETE are currently supported." @@ -94,6 +96,8 @@ def __eq__(self, other) -> bool: return super().__eq__(other) def __ne__(self, other) -> bool: return not self.__eq__(other) + +CongestionControl.DEFAULT = CongestionControl.DROP() class Encoding(_Encoding): def __new__(cls, inner: _Encoding): diff --git a/zenoh/value.py b/zenoh/value.py index e0708f0b..22ca40f2 100644 --- a/zenoh/value.py +++ b/zenoh/value.py @@ -15,8 +15,8 @@ from typing import Union, Tuple, Optional, List import json -from .enums import Encoding, SampleKind -from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello +from .enums import Encoding, SampleKind, Priority, CongestionControl +from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello, _QoS from .keyexpr import KeyExpr, IntoKeyExpr class IValue: @@ -129,15 +129,42 @@ def seconds_since_unix_epoch(self) -> float: """ return super().seconds_since_unix_epoch +class QoS(_QoS): + """ + Quality of Service settings. + """ + def __new__(cls): + return super().new() + @property + def priority(self) -> Priority: + "Priority" + return Priority(super().priority) + @property + def congestion_control(self) -> CongestionControl: + "Congestion control" + return CongestionControl(super().congestion_control) + @property + def express(self) -> bool: + "Express flag: if True, the message is not batched during transmission, in order to reduce latency." + return super().express + @staticmethod + def _upgrade_(inner: _QoS) -> 'QoS': + if isinstance(inner, QoS): + return inner + return _QoS.__new__(QoS, inner) + +QoS.DEFAULT = QoS() + IntoSample = Union[_Sample, Tuple[IntoKeyExpr, IntoValue, SampleKind], Tuple[KeyExpr, IntoValue]] class Sample(_Sample): """ A KeyExpr-Value pair, annotated with the kind (PUT or DELETE) of publication used to emit it and a timestamp. """ - def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, timestamp: Timestamp = None): + def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, qos:QoS = None, timestamp: Timestamp = None): kind = _SampleKind.PUT if kind is None else kind - return Sample._upgrade_(super().new(KeyExpr(key), Value(value), kind, timestamp)) + qos = QoS.DEFAULT if qos is None else qos + return Sample._upgrade_(super().new(KeyExpr(key), Value(value), qos, kind, timestamp)) @property def key_expr(self) -> KeyExpr: "The sample's key expression" @@ -163,6 +190,10 @@ def timestamp(self) -> Optional[Timestamp]: "The sample's timestamp. May be None." ts = super().timestamp return None if ts is None else Timestamp._upgrade_(ts) + @property + def qos(self) -> QoS: + "Quality of service settings the sample was sent with" + return QoS._upgrade_(super().qos) @staticmethod def _upgrade_(inner: _Sample) -> 'Sample': if isinstance(inner, Sample):