Skip to content

Commit

Permalink
Remove pull API and protocol support (#821)
Browse files Browse the repository at this point in the history
* Remove Pull subscriber

* Fix doctest. Remove unused code.

* Remove routing code for pull subscriptions

* Remove pull mode from DeclareSubscriber

* Remove unsupported Put/Del in Request/Response (#839)

* Address review comments

---------

Co-authored-by: OlivierHecart <[email protected]>
  • Loading branch information
Mallets and OlivierHecart authored Mar 19, 2024
1 parent 665c90f commit 7300f4c
Show file tree
Hide file tree
Showing 34 changed files with 242 additions and 1,079 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//
pub mod del;
pub mod err;
pub mod pull;
pub mod put;
pub mod query;
pub mod reply;
Expand Down Expand Up @@ -81,9 +80,6 @@ where
fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
match x {
RequestBody::Query(b) => self.write(&mut *writer, b),
RequestBody::Put(b) => self.write(&mut *writer, b),
RequestBody::Del(b) => self.write(&mut *writer, b),
RequestBody::Pull(b) => self.write(&mut *writer, b),
}
}
}
Expand All @@ -100,9 +96,6 @@ where
let codec = Zenoh080Header::new(header);
let body = match imsg::mid(codec.header) {
id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
id::PUT => RequestBody::Put(codec.read(&mut *reader)?),
id::DEL => RequestBody::Del(codec.read(&mut *reader)?),
id::PULL => RequestBody::Pull(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand All @@ -121,7 +114,6 @@ where
match x {
ResponseBody::Reply(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Put(b) => self.write(&mut *writer, b),
}
}
}
Expand All @@ -139,7 +131,6 @@ where
let body = match imsg::mid(codec.header) {
id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::PUT => ResponseBody::Put(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand Down
93 changes: 0 additions & 93 deletions commons/zenoh-codec/src/zenoh/pull.rs

This file was deleted.

5 changes: 0 additions & 5 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,3 @@ fn codec_reply() {
fn codec_err() {
run!(zenoh::Err, zenoh::Err::rand());
}

#[test]
fn codec_pull() {
run!(zenoh::Pull, zenoh::Pull::rand());
}
9 changes: 9 additions & 0 deletions commons/zenoh-collections/src/ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ impl<T> RingBuffer<T> {
Some(elem)
}

#[inline]
pub fn push_force(&mut self, elem: T) -> Option<T> {
self.push(elem).and_then(|elem| {
let ret = self.buffer.pop_front();
self.buffer.push_back(elem);
ret
})
}

#[inline]
pub fn pull(&mut self) -> Option<T> {
let x = self.buffer.pop_front();
Expand Down
48 changes: 4 additions & 44 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,6 @@ impl Declare {
}
}

#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum Mode {
#[default]
Push,
Pull,
}

impl Mode {
pub const DEFAULT: Self = Self::Push;

#[cfg(feature = "test")]
fn rand() -> Self {
use rand::Rng;

let mut rng = rand::thread_rng();

if rng.gen_bool(0.5) {
Mode::Push
} else {
Mode::Pull
}
}
}

pub mod common {
use super::*;

Expand Down Expand Up @@ -320,9 +295,7 @@ pub mod subscriber {
/// ~ [decl_exts] ~ if Z==1
/// +---------------+
///
/// - if R==1 then the subscription is reliable, else it is best effort
/// - if P==1 then the subscription is pull, else it is push
///
/// - if R==1 then the subscription is reliable, else it is best effort ///
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeclareSubscriber {
Expand All @@ -343,34 +316,29 @@ pub mod subscriber {
/// +-+-+-+-+-+-+-+-+
/// |Z|0_1| ID |
/// +-+-+-+---------+
/// % reserved |P|R%
/// % reserved |R%
/// +---------------+
///
/// - if R==1 then the subscription is reliable, else it is best effort
/// - if P==1 then the subscription is pull, else it is push
/// - rsv: Reserved
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubscriberInfo {
pub reliability: Reliability,
pub mode: Mode,
}

impl SubscriberInfo {
pub const R: u64 = 1;
pub const P: u64 = 1 << 1;

pub const DEFAULT: Self = Self {
reliability: Reliability::DEFAULT,
mode: Mode::DEFAULT,
};

#[cfg(feature = "test")]
pub fn rand() -> Self {
let reliability = Reliability::rand();
let mode = Mode::rand();

Self { reliability, mode }
Self { reliability }
}
}

Expand All @@ -387,12 +355,7 @@ pub mod subscriber {
} else {
Reliability::BestEffort
};
let mode = if imsg::has_option(ext.value, SubscriberInfo::P) {
Mode::Pull
} else {
Mode::Push
};
Self { reliability, mode }
Self { reliability }
}
}

Expand All @@ -402,9 +365,6 @@ pub mod subscriber {
if ext.reliability == Reliability::Reliable {
v |= SubscriberInfo::R;
}
if ext.mode == Mode::Pull {
v |= SubscriberInfo::P;
}
Info::new(v)
}
}
Expand Down
27 changes: 2 additions & 25 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
//
pub mod del;
pub mod err;
pub mod pull;
pub mod put;
pub mod query;
pub mod reply;

use crate::core::Encoding;
pub use del::Del;
pub use err::Err;
pub use pull::Pull;
pub use put::Put;
pub use query::{Consolidation, Query};
pub use reply::Reply;
Expand All @@ -33,7 +31,6 @@ pub mod id {
pub const QUERY: u8 = 0x03;
pub const REPLY: u8 = 0x04;
pub const ERR: u8 = 0x05;
pub const PULL: u8 = 0x06;
}

// DataInfo
Expand Down Expand Up @@ -80,9 +77,6 @@ impl From<Del> for PushBody {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RequestBody {
Query(Query),
Put(Put),
Del(Del),
Pull(Pull),
}

impl RequestBody {
Expand All @@ -92,11 +86,8 @@ impl RequestBody {

let mut rng = rand::thread_rng();

match rng.gen_range(0..4) {
match rng.gen_range(0..1) {
0 => RequestBody::Query(Query::rand()),
1 => RequestBody::Put(Put::rand()),
2 => RequestBody::Del(Del::rand()),
3 => RequestBody::Pull(Pull::rand()),
_ => unreachable!(),
}
}
Expand All @@ -108,24 +99,11 @@ impl From<Query> for RequestBody {
}
}

impl From<Put> for RequestBody {
fn from(p: Put) -> RequestBody {
RequestBody::Put(p)
}
}

impl From<Del> for RequestBody {
fn from(d: Del) -> RequestBody {
RequestBody::Del(d)
}
}

// Response
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResponseBody {
Reply(Reply),
Err(Err),
Put(Put),
}

impl ResponseBody {
Expand All @@ -134,10 +112,9 @@ impl ResponseBody {
use rand::Rng;
let mut rng = rand::thread_rng();

match rng.gen_range(0..3) {
match rng.gen_range(0..2) {
0 => ResponseBody::Reply(Reply::rand()),
1 => ResponseBody::Err(Err::rand()),
2 => ResponseBody::Put(Put::rand()),
_ => unreachable!(),
}
}
Expand Down
Loading

0 comments on commit 7300f4c

Please sign in to comment.