Skip to content

Commit

Permalink
Allow DeclareInterest for any keyexpr (#739)
Browse files Browse the repository at this point in the history
* Allow to DeclareInterest for any keyexpr

* Remove forgotten println
  • Loading branch information
Mallets authored Feb 26, 2024
1 parent d6ffebf commit cc8d4a1
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 77 deletions.
41 changes: 22 additions & 19 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use zenoh_protocol::{
network::{
declare::{
self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody,
Interest,
},
id, Mapping,
},
Expand Down Expand Up @@ -845,24 +846,20 @@ where
fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output {
let interest::DeclareInterest {
id,
interest: _,
wire_expr,
interest,
} = x;

// Header
let mut header = declare::id::D_INTEREST;
if wire_expr.mapping != Mapping::DEFAULT {
header |= subscriber::flag::M;
}
if wire_expr.has_suffix() {
header |= subscriber::flag::N;
}
let header = declare::id::D_INTEREST | x.flags();
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;
self.write(&mut *writer, wire_expr)?;
self.write(&mut *writer, interest.as_u8())?;
self.write(&mut *writer, x.options())?;
if let Some(we) = wire_expr.as_ref() {
self.write(&mut *writer, we)?;
}

Ok(())
}
Expand Down Expand Up @@ -894,14 +891,20 @@ where

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;
let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, token::flag::N));
let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
wire_expr.mapping = if imsg::has_flag(self.header, token::flag::M) {
Mapping::Sender
} else {
Mapping::Receiver
};
let interest: u8 = self.codec.read(&mut *reader)?;
let options: u8 = self.codec.read(&mut *reader)?;
let interest = Interest::from((imsg::flags(self.header), options));

let mut wire_expr = None;
if interest.restricted() {
let ccond = Zenoh080Condition::new(interest.named());
let mut we: WireExpr<'static> = ccond.read(&mut *reader)?;
we.mapping = if interest.mapping() {
Mapping::Sender
} else {
Mapping::Receiver
};
wire_expr = Some(we);
}

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
Expand All @@ -911,8 +914,8 @@ where

Ok(interest::DeclareInterest {
id,
interest,
wire_expr,
interest: interest.into(),
})
}
}
Expand Down
Loading

0 comments on commit cc8d4a1

Please sign in to comment.