Skip to content

Commit

Permalink
Protocol batchsize (#873)
Browse files Browse the repository at this point in the history
* Use BatchSize typedef instead of u16

* Use BatchSize typedef instead of u16 for vsock
  • Loading branch information
Mallets authored Mar 29, 2024
1 parent 1562a17 commit 21fb083
Show file tree
Hide file tree
Showing 28 changed files with 125 additions and 92 deletions.
68 changes: 36 additions & 32 deletions commons/zenoh-codec/src/core/zint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,42 @@ use zenoh_buffers::{
writer::{DidntWrite, Writer},
};

const VLE_LEN: usize = 9;
const VLE_LEN_MAX: usize = vle_len(u64::MAX);

const fn vle_len(x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
}

impl LCodec<u64> for Zenoh080 {
fn w_len(self, x: u64) -> usize {
const B1: u64 = u64::MAX << 7;
const B2: u64 = u64::MAX << (7 * 2);
const B3: u64 = u64::MAX << (7 * 3);
const B4: u64 = u64::MAX << (7 * 4);
const B5: u64 = u64::MAX << (7 * 5);
const B6: u64 = u64::MAX << (7 * 6);
const B7: u64 = u64::MAX << (7 * 7);
const B8: u64 = u64::MAX << (7 * 8);

if (x & B1) == 0 {
1
} else if (x & B2) == 0 {
2
} else if (x & B3) == 0 {
3
} else if (x & B4) == 0 {
4
} else if (x & B5) == 0 {
5
} else if (x & B6) == 0 {
6
} else if (x & B7) == 0 {
7
} else if (x & B8) == 0 {
8
} else {
9
}
vle_len(x)
}
}

Expand Down Expand Up @@ -107,7 +111,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, mut x: u64) -> Self::Output {
writer.with_slot(VLE_LEN, move |buffer| {
writer.with_slot(VLE_LEN_MAX, move |buffer| {
let mut len = 0;
while (x & !0x7f_u64) != 0 {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
Expand All @@ -122,7 +126,7 @@ where
}
// In case len == VLE_LEN then all the bits have already been written in the latest iteration.
// Else we haven't written all the necessary bytes yet.
if len != VLE_LEN {
if len != VLE_LEN_MAX {
// SAFETY: buffer is guaranteed to be VLE_LEN long where VLE_LEN is
// the maximum number of bytes a VLE can take once encoded.
// I.e.: x is shifted 7 bits to the right every iteration,
Expand Down Expand Up @@ -151,7 +155,7 @@ where
let mut v = 0;
let mut i = 0;
// 7 * VLE_LEN is beyond the maximum number of shift bits
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN - 1) {
while (b & 0x80_u8) != 0 && i != 7 * (VLE_LEN_MAX - 1) {
v |= ((b & 0x7f_u8) as u64) << i;
b = reader.read_u8()?;
i += 7;
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InitSyn {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -221,7 +221,7 @@ impl InitAck {
} else {
Resolution::rand()
};
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/transport/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Join {
let whatami = WhatAmI::rand();
let zid = ZenohId::default();
let resolution = Resolution::rand();
let batch_size: u16 = rng.gen();
let batch_size: BatchSize = rng.gen();
let lease = if rng.gen_bool(0.5) {
Duration::from_secs(rng.gen())
} else {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-protocol/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::network::NetworkMessage;
/// the boundary of the serialized messages. The length is encoded as little-endian.
/// In any case, the length of a message must not exceed 65_535 bytes.
pub type BatchSize = u16;
pub type AtomicBatchSize = core::sync::atomic::AtomicU16;

pub mod batch_size {
use super::BatchSize;
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
use zenoh_protocol::core::Locator;
use zenoh_protocol::transport::BatchSize;
use zenoh_result::ZResult;

/*************************************/
Expand All @@ -45,7 +46,7 @@ pub struct Link {
pub src: Locator,
pub dst: Locator,
pub group: Option<Locator>,
pub mtu: u16,
pub mtu: BatchSize,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-link-commons/src/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::TransportMessage,
transport::{BatchSize, TransportMessage},
};
use zenoh_result::{zerror, ZResult};

Expand All @@ -44,7 +44,7 @@ pub struct LinkMulticast(pub Arc<dyn LinkMulticastTrait>);

#[async_trait]
pub trait LinkMulticastTrait: Send + Sync {
fn get_mtu(&self) -> u16;
fn get_mtu(&self) -> BatchSize;
fn get_src(&self) -> &Locator;
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
Expand Down
7 changes: 5 additions & 2 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use core::{
ops::Deref,
};
use std::net::SocketAddr;
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::BatchSize,
};
use zenoh_result::ZResult;

pub type LinkManagerUnicast = Arc<dyn LinkManagerUnicastTrait>;
Expand All @@ -41,7 +44,7 @@ pub struct LinkUnicast(pub Arc<dyn LinkUnicastTrait>);

#[async_trait]
pub trait LinkUnicastTrait: Send + Sync {
fn get_mtu(&self) -> u16;
fn get_mtu(&self) -> BatchSize;
fn get_src(&self) -> &Locator;
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
Expand Down
13 changes: 8 additions & 5 deletions io/zenoh-links/zenoh-link-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use std::net::SocketAddr;
use zenoh_config::Config;
use zenoh_core::zconfigurable;
use zenoh_link_commons::{ConfigurationInspector, LocatorInspector};
use zenoh_protocol::core::{
endpoint::{Address, Parameters},
Locator,
use zenoh_protocol::{
core::{
endpoint::{Address, Parameters},
Locator,
},
transport::BatchSize,
};
use zenoh_result::{bail, zerror, ZResult};

Expand All @@ -47,7 +50,7 @@ pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];
// adopted in Zenoh and the usage of 16 bits in Zenoh to encode the
// payload length in byte-streamed, the QUIC MTU is constrained to
// 2^16 - 1 bytes (i.e., 65535).
const QUIC_MAX_MTU: u16 = u16::MAX;
const QUIC_MAX_MTU: BatchSize = BatchSize::MAX;
pub const QUIC_LOCATOR_PREFIX: &str = "quic";

#[derive(Default, Clone, Copy, Debug)]
Expand Down Expand Up @@ -137,7 +140,7 @@ impl ConfigurationInspector<Config> for QuicConfigurator {

zconfigurable! {
// Default MTU (QUIC PDU) in bytes.
static ref QUIC_DEFAULT_MTU: u16 = QUIC_MAX_MTU;
static ref QUIC_DEFAULT_MTU: BatchSize = QUIC_MAX_MTU;
// The LINGER option causes the shutdown() call to block until (1) all application data is delivered
// to the remote end or (2) a timeout expires. The timeout is expressed in seconds.
// More info on the LINGER option and its dynamics can be found at:
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use zenoh_link_commons::{
ListenersUnicastIP, NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{bail, zerror, ZError, ZResult};

pub struct LinkUnicastQuic {
Expand Down Expand Up @@ -135,7 +136,7 @@ impl LinkUnicastTrait for LinkUnicastQuic {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*QUIC_DEFAULT_MTU
}

Expand Down
7 changes: 4 additions & 3 deletions io/zenoh-links/zenoh-link-serial/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ pub use unicast::*;
use zenoh_core::zconfigurable;
use zenoh_link_commons::LocatorInspector;
use zenoh_protocol::core::{endpoint::Address, EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::ZResult;

// Maximum MTU (Serial PDU) in bytes.
const SERIAL_MAX_MTU: u16 = z_serial::MAX_MTU as u16;
const SERIAL_MAX_MTU: BatchSize = z_serial::MAX_MTU as BatchSize;

const DEFAULT_BAUDRATE: u32 = 9_600;

const DEFAULT_EXCLUSIVE: bool = true;

pub const SERIAL_LOCATOR_PREFIX: &str = "serial";

const SERIAL_MTU_LIMIT: u16 = SERIAL_MAX_MTU;
const SERIAL_MTU_LIMIT: BatchSize = SERIAL_MAX_MTU;

zconfigurable! {
// Default MTU (UDP PDU) in bytes.
static ref SERIAL_DEFAULT_MTU: u16 = SERIAL_MTU_LIMIT;
static ref SERIAL_DEFAULT_MTU: BatchSize = SERIAL_MTU_LIMIT;
// Amount of time in microseconds to throttle the accept loop upon an error.
// Default set to 100 ms.
static ref SERIAL_ACCEPT_THROTTLE_TIME: u64 = 100_000;
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use zenoh_link_commons::{
NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{zerror, ZResult};

use z_serial::ZSerial;
Expand Down Expand Up @@ -177,7 +178,7 @@ impl LinkUnicastTrait for LinkUnicastSerial {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*SERIAL_DEFAULT_MTU
}

Expand Down
5 changes: 3 additions & 2 deletions io/zenoh-links/zenoh-link-tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::net::SocketAddr;
use zenoh_core::zconfigurable;
use zenoh_link_commons::LocatorInspector;
use zenoh_protocol::core::{endpoint::Address, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{zerror, ZResult};

mod unicast;
Expand All @@ -33,7 +34,7 @@ pub use unicast::*;
// adopted in Zenoh and the usage of 16 bits in Zenoh to encode the
// payload length in byte-streamed, the TCP MTU is constrained to
// 2^16 - 1 bytes (i.e., 65535).
const TCP_MAX_MTU: u16 = u16::MAX;
const TCP_MAX_MTU: BatchSize = BatchSize::MAX;

pub const TCP_LOCATOR_PREFIX: &str = "tcp";

Expand All @@ -52,7 +53,7 @@ impl LocatorInspector for TcpLocatorInspector {

zconfigurable! {
// Default MTU (TCP PDU) in bytes.
static ref TCP_DEFAULT_MTU: u16 = TCP_MAX_MTU;
static ref TCP_DEFAULT_MTU: BatchSize = TCP_MAX_MTU;
// The LINGER option causes the shutdown() call to block until (1) all application data is delivered
// to the remote end or (2) a timeout expires. The timeout is expressed in seconds.
// More info on the LINGER option and its dynamics can be found at:
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use zenoh_link_commons::{
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_result::{bail, zerror, Error as ZError, ZResult};

use super::{
Expand Down Expand Up @@ -145,7 +146,7 @@ impl LinkUnicastTrait for LinkUnicastTcp {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*TCP_DEFAULT_MTU
}

Expand Down
13 changes: 8 additions & 5 deletions io/zenoh-links/zenoh-link-tls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ use std::{convert::TryFrom, net::SocketAddr};
use zenoh_config::Config;
use zenoh_core::zconfigurable;
use zenoh_link_commons::{ConfigurationInspector, LocatorInspector};
use zenoh_protocol::core::{
endpoint::{self, Address},
Locator,
use zenoh_protocol::{
core::{
endpoint::{self, Address},
Locator,
},
transport::BatchSize,
};
use zenoh_result::{bail, zerror, ZResult};

Expand All @@ -45,7 +48,7 @@ pub use unicast::*;
// adopted in Zenoh and the usage of 16 bits in Zenoh to encode the
// payload length in byte-streamed, the TLS MTU is constrained to
// 2^16 - 1 bytes (i.e., 65535).
const TLS_MAX_MTU: u16 = u16::MAX;
const TLS_MAX_MTU: BatchSize = BatchSize::MAX;
pub const TLS_LOCATOR_PREFIX: &str = "tls";

#[derive(Default, Clone, Copy)]
Expand Down Expand Up @@ -172,7 +175,7 @@ impl ConfigurationInspector<Config> for TlsConfigurator {

zconfigurable! {
// Default MTU (TLS PDU) in bytes.
static ref TLS_DEFAULT_MTU: u16 = TLS_MAX_MTU;
static ref TLS_DEFAULT_MTU: BatchSize = TLS_MAX_MTU;
// The LINGER option causes the shutdown() call to block until (1) all application data is delivered
// to the remote end or (2) a timeout expires. The timeout is expressed in seconds.
// More info on the LINGER option and its dynamics can be found at:
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use zenoh_link_commons::{
get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender,
};
use zenoh_protocol::core::endpoint::Config;
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::{core::endpoint::Config, transport::BatchSize};
use zenoh_result::{bail, zerror, ZError, ZResult};

pub struct LinkUnicastTls {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl LinkUnicastTrait for LinkUnicastTls {
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
fn get_mtu(&self) -> BatchSize {
*TLS_DEFAULT_MTU
}

Expand Down
Loading

0 comments on commit 21fb083

Please sign in to comment.