Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/priority in sample #145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl _Encoding {
}

#[pyclass(subclass)]
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct _Priority(pub(crate) Priority);
#[pymethods]
impl _Priority {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl core::fmt::Debug for _SampleKind {
}

#[pyclass(subclass)]
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct _CongestionControl(pub(crate) CongestionControl);
#[pymethods]
impl _CongestionControl {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn zenoh(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<queryable::_Queryable>()?;
m.add_class::<value::_Value>()?;
m.add_class::<value::_Sample>()?;
m.add_class::<value::_QoS>()?;
m.add_class::<value::_Reply>()?;
m.add_class::<value::_Timestamp>()?;
m.add_class::<value::_Hello>()?;
Expand Down
42 changes: 41 additions & 1 deletion src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use uhlc::Timestamp;
use zenoh::{
prelude::{Encoding, KeyExpr, Sample, Value, ZenohId},
query::Reply,
sample::QoS,
scouting::Hello,
};
use zenoh_buffers::{
Expand All @@ -23,7 +24,7 @@ use zenoh_buffers::{
};

use crate::{
enums::{_Encoding, _SampleKind},
enums::{_CongestionControl, _Encoding, _Priority, _SampleKind},
keyexpr::_KeyExpr,
ToPyErr,
};
Expand Down Expand Up @@ -159,13 +160,42 @@ impl PyAnyToValue for &PyAny {
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug, Default)]
pub struct _QoS(pub(crate) QoS);

#[pymethods]
impl _QoS {
#[new]
pub fn pynew(this: Self) -> Self {
this
}
#[getter]
pub fn priority(&self) -> _Priority {
_Priority(self.0.priority())
}
#[getter]
pub fn congestion_control(&self) -> _CongestionControl {
_CongestionControl(self.0.congestion_control())
}
#[getter]
pub fn express(&self) -> bool {
self.0.express()
}
#[staticmethod]
pub fn new() -> Self {
Self::default()
}
}

#[pyclass(subclass)]
#[derive(Clone, Debug)]
pub struct _Sample {
key_expr: KeyExpr<'static>,
value: _Value,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
qos: _QoS,
}
impl From<Sample> for _Sample {
fn from(sample: Sample) -> Self {
Expand All @@ -174,13 +204,15 @@ impl From<Sample> for _Sample {
value,
kind,
timestamp,
qos,
..
} = sample;
_Sample {
key_expr,
value: value.into(),
kind: _SampleKind(kind),
timestamp: timestamp.map(_Timestamp),
qos: _QoS(qos),
}
}
}
Expand Down Expand Up @@ -267,6 +299,10 @@ impl _Sample {
_Encoding(self.value.encoding.clone())
}
#[getter]
pub fn qos(&self) -> _QoS {
self.qos.clone()
}
#[getter]
pub fn kind(&self) -> _SampleKind {
self.kind.clone()
}
Expand All @@ -278,12 +314,14 @@ impl _Sample {
pub fn new(
key_expr: _KeyExpr,
value: _Value,
qos: _QoS,
kind: _SampleKind,
timestamp: Option<_Timestamp>,
) -> Self {
_Sample {
key_expr: key_expr.0,
value,
qos,
kind,
timestamp,
}
Expand All @@ -300,10 +338,12 @@ impl From<_Sample> for Sample {
value,
kind,
timestamp,
qos,
} = sample;
let mut sample = Sample::new(key_expr, value);
sample.kind = kind.0;
sample.timestamp = timestamp.map(|t| t.0);
sample.qos = qos.0;
sample
}
}
Expand Down
52 changes: 48 additions & 4 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import zenoh
import json
from zenoh import Session, Query, Sample
from zenoh import Session, Query, Sample, Priority, CongestionControl
from typing import List, Tuple
import time

Expand Down Expand Up @@ -28,9 +28,9 @@ def open_session(endpoints: List[str]) -> Tuple[Session, Session]:


def close_session(peer01: Session, peer02: Session):
print("[ ][01d] Closing peer01 session");
print("[ ][01e] Closing peer01 session");
peer01.close()
print("[ ][02d] Closing peer02 session");
print("[ ][02e] Closing peer02 session");
peer02.close()


Expand Down Expand Up @@ -81,8 +81,52 @@ def queryable_callback(query: Query):
queryable.undeclare()


def run_session_pubsub(peer01: Session, peer02: Session):
keyexpr = "test_pub/session"
msg = 'Pub Message'.encode()

num_received = 0
num_errors = 0

def sub_callback(sample: Sample):
nonlocal num_received
nonlocal num_errors
if sample.key_expr != keyexpr \
or sample.qos.priority != Priority.DATA_HIGH() \
or sample.qos.congestion_control != CongestionControl.BLOCK() \
or sample.payload != msg:
num_errors += 1
num_received += 1

print("[PS][01d] Publisher on peer01 session");
publisher = peer01.declare_publisher(
keyexpr,
Priority.DATA_HIGH(),
CongestionControl.BLOCK()
)
time.sleep(SLEEP)

print(f"[PS][02d] Subscriber on peer02 session. {MSG_COUNT} msgs.")
subscriber = peer02.declare_subscriber(keyexpr, sub_callback)
time.sleep(SLEEP)

for _ in range(0, MSG_COUNT):
publisher.put('Pub Message')

time.sleep(SLEEP)
print(f"[PS][02d] Received on peer02 session. {num_received}/{MSG_COUNT} msgs.");
assert num_received == MSG_COUNT
assert num_errors == 0

print("[PS][03d] Undeclare publisher on peer01 session");
publisher.undeclare()
print("[PS][04d] Undeclare subscriber on peer02 session");
subscriber.undeclare()


def test_session():
zenoh.init_logger()
(peer01, peer02) = open_session(["tcp/127.0.0.1:17447"])
run_session_qryrep(peer01, peer02)
close_session(peer01, peer02)
run_session_pubsub(peer01, peer02)
close_session(peer01, peer02)
6 changes: 5 additions & 1 deletion zenoh/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Priority(_Priority):
They are ordered à la Linux priority:
``Priority.REAL_TIME() < Priority.INTERACTIVE_HIGH() < Priority.INTERACTIVE_LOW() < Priority.DATA() < Priority.BACKGROUND()``
"""
def __new__(cls, inner: _SampleKind):
def __new__(cls, inner: _Priority):
return super().__new__(cls, inner)
@staticmethod
def REAL_TIME() -> 'Priority':
Expand Down Expand Up @@ -57,6 +57,8 @@ def __gt__(self, other) -> bool:
return super().__gt__(other)
def __ge__(self, other) -> bool:
return super().__ge__(other)

Priority.DEFAULT = Priority.DATA()

class SampleKind(_SampleKind):
"Similar to an HTTP METHOD: only PUT and DELETE are currently supported."
Expand Down Expand Up @@ -94,6 +96,8 @@ def __eq__(self, other) -> bool:
return super().__eq__(other)
def __ne__(self, other) -> bool:
return not self.__eq__(other)

CongestionControl.DEFAULT = CongestionControl.DROP()

class Encoding(_Encoding):
def __new__(cls, inner: _Encoding):
Expand Down
39 changes: 35 additions & 4 deletions zenoh/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from typing import Union, Tuple, Optional, List
import json

from .enums import Encoding, SampleKind
from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello
from .enums import Encoding, SampleKind, Priority, CongestionControl
from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello, _QoS
from .keyexpr import KeyExpr, IntoKeyExpr

class IValue:
Expand Down Expand Up @@ -129,15 +129,42 @@ def seconds_since_unix_epoch(self) -> float:
"""
return super().seconds_since_unix_epoch

class QoS(_QoS):
"""
Quality of Service settings.
"""
def __new__(cls):
return super().new()
@property
def priority(self) -> Priority:
"Priority"
return Priority(super().priority)
@property
def congestion_control(self) -> CongestionControl:
"Congestion control"
return CongestionControl(super().congestion_control)
@property
def express(self) -> bool:
"Express flag: if True, the message is not batched during transmission, in order to reduce latency."
return super().express
@staticmethod
def _upgrade_(inner: _QoS) -> 'QoS':
if isinstance(inner, QoS):
return inner
return _QoS.__new__(QoS, inner)

QoS.DEFAULT = QoS()


IntoSample = Union[_Sample, Tuple[IntoKeyExpr, IntoValue, SampleKind], Tuple[KeyExpr, IntoValue]]
class Sample(_Sample):
"""
A KeyExpr-Value pair, annotated with the kind (PUT or DELETE) of publication used to emit it and a timestamp.
"""
def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, timestamp: Timestamp = None):
def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, qos:QoS = None, timestamp: Timestamp = None):
kind = _SampleKind.PUT if kind is None else kind
return Sample._upgrade_(super().new(KeyExpr(key), Value(value), kind, timestamp))
qos = QoS.DEFAULT if qos is None else qos
return Sample._upgrade_(super().new(KeyExpr(key), Value(value), qos, kind, timestamp))
@property
def key_expr(self) -> KeyExpr:
"The sample's key expression"
Expand All @@ -163,6 +190,10 @@ def timestamp(self) -> Optional[Timestamp]:
"The sample's timestamp. May be None."
ts = super().timestamp
return None if ts is None else Timestamp._upgrade_(ts)
@property
def qos(self) -> QoS:
"Quality of service settings the sample was sent with"
return QoS._upgrade_(super().qos)
@staticmethod
def _upgrade_(inner: _Sample) -> 'Sample':
if isinstance(inner, Sample):
Expand Down