diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 27af64ef93..babdaa354b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -353,8 +353,6 @@ /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). /// The default batch size value is the maximum batch size: 65535. batch_size: 65535, - /// Perform batching of messages if they are smaller of the batch_size - batching: true, /// Each zenoh link has a transmission queue that can be configured queue: { /// The size of each priority queue indicates the number of batches a given queue can contain. @@ -380,9 +378,16 @@ /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. wait_before_drop: 1000, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: 100, + /// Perform batching of messages if they are smaller of the batch_size + batching: { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + enabled: true, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: 1, + } }, }, /// Configure the zenoh RX parameters of a link diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index a00edbacc4..cc6bf5854a 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -191,17 +191,6 @@ impl Default for LinkTxConf { batch_size: BatchSize::MAX, queue: QueueConf::default(), threads: num, - batching: true, - } - } -} - -impl Default for QueueConf { - fn default() -> Self { - Self { - size: QueueSizeConf::default(), - congestion_control: CongestionControlConf::default(), - backoff: 1_000_000, } } } @@ -234,6 +223,15 @@ impl Default for CongestionControlConf { } } +impl Default for BatchingConf { + fn default() -> Self { + BatchingConf { + enabled: true, + time_limit: 1, + } + } +} + impl Default for LinkRxConf { fn default() -> Self { Self { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b7b63e1602..2d361b8d96 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -407,9 +407,8 @@ validated_struct::validator! { keep_alive: usize, /// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1) batch_size: BatchSize, - /// Perform batching of messages if they are smaller of the batch_size - batching: bool, - pub queue: QueueConf { + pub queue: #[derive(Default)] + QueueConf { /// The size of each priority queue indicates the number of batches a given queue can contain. /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, @@ -432,9 +431,15 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. pub wait_before_drop: u64, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: u64, + pub batching: BatchingConf { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + enabled: bool, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: u64, + }, }, // Number of threads used for TX threads: usize, diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 530a6c0ced..6dc2fe9f23 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -513,8 +513,8 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: Duration, - pub(crate) batching: bool, - pub(crate) backoff: Duration, + pub(crate) batching_enabled: bool, + pub(crate) batching_time_limit: Duration, } // A 2-stage transmission pipeline @@ -578,7 +578,7 @@ impl TransmissionPipeline { priority: priority[prio].clone(), }, fragbuf: ZBuf::empty(), - batching: config.batching, + batching: config.batching_enabled, })); // The stage out for this priority @@ -586,7 +586,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(config.backoff, bytes), + backoff: Backoff::new(config.batching_time_limit, bytes), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, }); @@ -788,9 +788,9 @@ mod tests { is_compression: true, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { @@ -801,9 +801,9 @@ mod tests { is_compression: false, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 669744838f..305ccab574 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -130,10 +130,10 @@ pub struct TransportManagerBuilder { whatami: WhatAmI, resolution: Resolution, batch_size: BatchSize, - batching: bool, + batching_enabled: bool, + batching_time_limit: Duration, wait_before_drop: Duration, queue_size: QueueSizeConf, - queue_backoff: Duration, defrag_buff_size: usize, link_rx_buffer_size: usize, unicast: TransportManagerBuilderUnicast, @@ -172,8 +172,13 @@ impl TransportManagerBuilder { self } - pub fn batching(mut self, batching: bool) -> Self { - self.batching = batching; + pub fn batching_enabled(mut self, batching_enabled: bool) -> Self { + self.batching_enabled = batching_enabled; + self + } + + pub fn batching_time_limit(mut self, batching_time_limit: Duration) -> Self { + self.batching_time_limit = batching_time_limit; self } @@ -187,11 +192,6 @@ impl TransportManagerBuilder { self } - pub fn queue_backoff(mut self, queue_backoff: Duration) -> Self { - self.queue_backoff = queue_backoff; - self - } - pub fn defrag_buff_size(mut self, defrag_buff_size: usize) -> Self { self.defrag_buff_size = defrag_buff_size; self @@ -238,14 +238,16 @@ impl TransportManagerBuilder { resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution()); self = self.resolution(resolution); self = self.batch_size(*link.tx().batch_size()); - self = self.batching(*link.tx().batching()); + self = self.batching_enabled(*link.tx().queue().batching().enabled()); + self = self.batching_time_limit(Duration::from_millis( + *link.tx().queue().batching().time_limit(), + )); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); self = self.wait_before_drop(Duration::from_micros( *link.tx().queue().congestion_control().wait_before_drop(), )); self = self.queue_size(link.tx().queue().size().clone()); - self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff())); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -301,10 +303,10 @@ impl TransportManagerBuilder { whatami: self.whatami, resolution: self.resolution, batch_size: self.batch_size, - batching: self.batching, + batching: self.batching_enabled, wait_before_drop: self.wait_before_drop, queue_size, - queue_backoff: self.queue_backoff, + queue_backoff: self.batching_time_limit, defrag_buff_size: self.defrag_buff_size, link_rx_buffer_size: self.link_rx_buffer_size, unicast: unicast.config, @@ -340,7 +342,7 @@ impl Default for TransportManagerBuilder { fn default() -> Self { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); - let backoff = *queue.backoff(); + let backoff = *queue.batching().time_limit(); let wait_before_drop = *queue.congestion_control().wait_before_drop(); Self { version: VERSION, @@ -348,10 +350,10 @@ impl Default for TransportManagerBuilder { whatami: zenoh_config::defaults::mode, resolution: Resolution::default(), batch_size: BatchSize::MAX, - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_micros(wait_before_drop), queue_size: queue.size, - queue_backoff: Duration::from_nanos(backoff), + batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), link_rx_buffer_size: *link_rx.buffer_size(), endpoints: HashMap::new(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 90999d32ce..d0d5ef4fb0 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -321,8 +321,8 @@ impl TransportLinkMulticastUniversal { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, wait_before_drop: self.transport.manager.config.wait_before_drop, - batching: self.transport.manager.config.batching, - backoff: self.transport.manager.config.queue_backoff, + batching_enabled: self.transport.manager.config.batching, + batching_time_limit: self.transport.manager.config.queue_backoff, }; // The pipeline let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx); diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index cc3afc06e5..fff842c255 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -62,8 +62,8 @@ impl TransportLinkUnicastUniversal { }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, - batching: transport.manager.config.batching, - backoff: transport.manager.config.queue_backoff, + batching_enabled: transport.manager.config.batching, + batching_time_limit: transport.manager.config.queue_backoff, }; // The pipeline