diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index a2dd3e1b0..0633c8c4d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -563,15 +563,22 @@ where // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - let (activator, effort) = + let activator = Some(self.scope().activator_for(&info.address[..])); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + + // If idle merge effort exists, configure aggressive idle merging logic. if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(self.scope().activator_for(&info.address[..])), Some(effort)) + empty_trace.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); } - else { - (None, None) - }; - let empty_trace = Tr::new(info.clone(), logger.clone(), activator); let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); *reader = Some(reader_local); @@ -672,9 +679,7 @@ where prev_frontier.extend(input.frontier().frontier().iter().cloned()); } - if let Some(mut fuel) = effort.clone() { - writer.exert(&mut fuel); - } + writer.exert(); } }) }; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 468398ae6..dbcd38c8e 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -165,20 +165,25 @@ where register.get::<::logging::DifferentialEvent>("differential/arrange") }; - // Establish compaction effort to apply even without updates. - let (activator, effort) = - if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(stream.scope().activator_for(&info.address[..])), Some(effort)) - } - else { - (None, None) - }; - // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); let mut buffer = Vec::new(); // Form the trace we will both use internally and publish. - let empty_trace = Tr::new(info.clone(), logger.clone(), activator); + let activator = Some(stream.scope().activator_for(&info.address[..])); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + // If idle merge effort exists, configure aggressive idle merging logic. + if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { + empty_trace.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); + } + let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); @@ -334,9 +339,7 @@ where reader_local.set_physical_compaction(prev_frontier.borrow()); } - if let Some(mut fuel) = effort.clone() { - writer.exert(&mut fuel); - } + writer.exert(); } }) }; diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index a42924b75..b53f298f1 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -52,9 +52,9 @@ where } /// Exerts merge effort, even without additional updates. - pub fn exert(&mut self, fuel: &mut isize) { + pub fn exert(&mut self) { if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.exert(fuel); + trace.borrow_mut().trace.exert(); } } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 45ad92460..712843e1e 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -351,17 +351,21 @@ where register.get::<::logging::DifferentialEvent>("differential/arrange") }; - // Determine if we should regularly exert the trace maintenance machinery, - // and with what amount of effort each time. - let (activator, effort) = + let activator = Some(self.stream.scope().activator_for(&operator_info.address[..])); + let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); + // If idle merge effort exists, configure aggressive idle merging logic. if let Some(effort) = self.stream.scope().config().get::("differential/idle_merge_effort").cloned() { - (Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort)) + empty.set_exert_logic(Some(Box::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }))); } - else { - (None, None) - }; - let empty = T2::new(operator_info.clone(), logger.clone(), activator); let mut source_trace = self.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); @@ -629,9 +633,7 @@ where } // Exert trace maintenance if we have been so requested. - if let Some(mut fuel) = effort.clone() { - output_writer.exert(&mut fuel); - } + output_writer.exert(); } } ) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f3b61ec94..4425b46c5 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -97,6 +97,8 @@ pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { upper: Antichain, effort: usize, activator: Option, + /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. + exert_logic: Option Fn(Box+'a>)->Option>>, } impl TraceReader for Spine @@ -264,22 +266,21 @@ where /// Apply some amount of effort to trace maintenance. /// - /// The units of effort are updates, and the method should be - /// thought of as analogous to inserting as many empty updates, - /// where the trace is permitted to perform proportionate work. - fn exert(&mut self, effort: &mut isize) { + /// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set. + fn exert(&mut self) { // If there is work to be done, ... self.tidy_layers(); - if !self.reduced() { + // Determine whether we should apply effort independent of updates. + if let Some(effort) = self.exert_effort() { // If any merges exist, we can directly call `apply_fuel`. if self.merging.iter().any(|b| b.is_double()) { - self.apply_fuel(effort); + self.apply_fuel(&mut (effort as isize)); } // Otherwise, we'll need to introduce fake updates to move merges along. else { // Introduce an empty batch with roughly *effort number of virtual updates. - let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize; + let level = effort.next_power_of_two().trailing_zeros() as usize; self.introduce_batch(None, level); } // We were not in reduced form, so let's check again in the future. @@ -289,6 +290,10 @@ where } } + fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>) { + self.exert_logic = logic; + } + // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin // merging the batch. This means it is a good time to perform amortized work proportional // to the size of batch. @@ -388,19 +393,20 @@ where B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, { - /// True iff there is at most one non-empty batch in `self.merging`. + /// Determine the amount of effort we should exert in the absence of updates. /// - /// When true, there is no maintenance work to perform in the trace, other than compaction. - /// We do not yet have logic in place to determine if compaction would improve a trace, so - /// for now we are ignoring that. - fn reduced(&self) -> bool { - let mut non_empty = 0; - for index in 0 .. self.merging.len() { - if self.merging[index].is_double() { return false; } - if self.merging[index].len() > 0 { non_empty += 1; } - if non_empty > 1 { return false; } - } - true + /// This method prepares an iterator over batches, including the level, count, and length of each layer. + /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply. + fn exert_effort(&self) -> Option { + self.exert_logic.as_ref().and_then(|l| (**l)( + Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| { + match batch { + MergeState::Vacant => (index, 0, 0), + MergeState::Single(_) => (index, 1, batch.len()), + MergeState::Double(_) => (index, 2, batch.len()), + } + })) + )) } /// Describes the merge progress of layers in the trace. @@ -443,6 +449,7 @@ where upper: Antichain::from_elem(::minimum()), effort, activator, + exert_logic: None, } } @@ -483,7 +490,7 @@ where } // Having performed all of our work, if more than one batch remains reschedule ourself. - if !self.reduced() { + if !self.exert_effort().is_some() { if let Some(activator) = &self.activator { activator.activate(); } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6e411475e..4593629e6 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -208,8 +208,15 @@ where ::Batch: Batch { activator: Option, ) -> Self; - /// Exert merge effort, even without updates. - fn exert(&mut self, effort: &mut isize); + /// Exert merge effort, even without updates. + fn exert(&mut self); + + /// Sets the logic for exertion in the absence of updates. + /// + /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`, + /// indicating the level, the number of batches, and their total length in updates. It should return a number of + /// updates to perform, or `None` if no work is required. + fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>); /// Introduces a batch of updates to the trace. ///