Skip to content

Commit

Permalink
Merge pull request MaterializeInc#9229 from aljoscha/fix-9123-retract…
Browse files Browse the repository at this point in the history
…ion-takes-too-long

persist: asynchronously retract in filter_and_retract_future_updates()
  • Loading branch information
aljoscha authored Dec 2, 2021
2 parents a67e9ce + ae2b41e commit b731435
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 33 deletions.
117 changes: 90 additions & 27 deletions src/persist/src/operators/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,19 @@ pub trait RetractFutureUpdates<G: Scope<Timestamp = u64>, K: TimelyData, V: Time
/// Passes through each element of the stream and sends retractions to the given
/// [`StreamWriteHandle`] for updates that are beyond the given `upper_ts`.
///
/// This does wait on writing each retraction before emitting more data (or advancing the
/// frontier).
/// This does not wait for retractions to be persisted before passing through the data. We do,
/// however, wait for data to be persisted before allowing the frontier to advance. In other
/// words, this operator is holding on to capabilities as long as retractions belonging to
/// their timestamp is not persisted.
fn filter_and_retract_future_updates(
&self,
name: &str,
write: StreamWriteHandle<K, V>,
upper_ts: u64,
) -> Stream<G, ((K, V), u64, isize)>;
) -> (
Stream<G, ((K, V), u64, isize)>,
Stream<G, (String, u64, isize)>,
);
}

impl<G, K, V> RetractFutureUpdates<G, K, V> for Stream<G, ((K, V), u64, isize)>
Expand All @@ -725,40 +730,44 @@ where
K: TimelyData + Codec + Debug,
V: TimelyData + Codec + Debug,
{
// TODO: This operator is not optimized at all. For example, instead of waiting for each write
// to succeed, we could do the same thing as the persist() operator and batch a bunch of
// futures and only wait on them once the frontier advances.
// For now, that should be ok because this only causes restarts to be slightly slower but
// doesn't affect the steady-state write path.
fn filter_and_retract_future_updates(
&self,
name: &str,
write: StreamWriteHandle<K, V>,
upper_ts: u64,
) -> Stream<G, ((K, V), u64, isize)> {
) -> (
Stream<G, ((K, V), u64, isize)>,
Stream<G, (String, u64, isize)>,
) {
let scope = self.scope();
let operator_name = format!("retract_future_updates({})", name);
let mut persist_op = OperatorBuilder::new(operator_name, self.scope());
let mut persist_op = OperatorBuilder::new(operator_name.clone(), self.scope());

let mut input = persist_op.new_input(&self, Pipeline);

let (mut data_output, data_output_stream) = persist_op.new_output();
let (mut error_output, error_output_stream) = persist_op.new_output();

let mut buffer = Vec::new();
let error_output_port = error_output_stream.name().port;

persist_op.build_async(
self.scope(),
async_op!(|capabilities, _frontiers| {
// Drop initial capabilities
capabilities.clear();
// An activator that allows futures to re-schedule this operator when ready.
let activator = Arc::new(scope.sync_activator_for(&persist_op.operator_info().address[..]));

let mut pending_futures = VecDeque::new();

// Reusable buffer for collecting updates that we need to retract. We retract in batches in
// order to not make too many persist calls and keep too many write futures around.
let mut to_retract = Vec::new();

persist_op.build(move |_capabilities| {
move |_frontiers| {
let mut data_output = data_output.activate();
let mut error_output = error_output.activate();

// Write out everything and forward, keeping the write futures.
while let Some((cap, data)) = input.next() {
// TODO(petrosagg): remove this unconditional retain once this is released:
// https://github.com/TimelyDataflow/timely-dataflow/pull/429
let cap = cap.retain();
input.for_each(|cap, data| {
data.swap(&mut buffer);

let mut session = data_output.session(&cap);
for update in buffer.drain(..) {
if update.1 >= upper_ts {
Expand All @@ -769,19 +778,73 @@ where
);
let (data, ts, diff) = update;
let anti_update = (data, ts, -diff);
write
.write(&[anti_update])
.await
.expect("error persisting retraction");
to_retract.push(anti_update);
continue;
}
session.give(update);
}

let write_future = write.write(&to_retract);
to_retract.clear();

// We are not using the capability for the main output later, but we are
// holding on to it to keep the frontier from advancing because that frontier
// is used downstream to track how far we have persisted. This is used, for
// example, by upsert()/persist()/seal()/conditional_seal() operators
// and await_frontier().
pending_futures.push_back((
cap.delayed(cap.time()),
cap.retain_for_output(error_output_port),
write_future,
));
});

// Swing through all pending futures and see if they're ready. Ready futures will
// invoke the Activator, which will make sure that we arrive here, even when there
// are no changes in the input frontier or new input.
let waker = futures_util::task::waker_ref(&activator);
let mut context = Context::from_waker(&waker);

while let Some((cap, error_cap, pending_future)) = pending_futures.front_mut() {
match Pin::new(pending_future).poll(&mut context) {
std::task::Poll::Ready(result) => {
match result {
Ok(seq_no) => {
log::trace!(
"In {}, finished writing for time: {}, seq_no: {:?}",
&operator_name,
cap.time(),
seq_no,
);
}
Err(e) => {
let mut session = error_output.session(&error_cap);
let error = format!(
"In {}, error writing data for time {}: {}",
&operator_name,
error_cap.time(),
e
);
log::error!("{}", error);

// TODO: make error retractable? Probably not...
session.give((error, *error_cap.time(), 1));
}
}

let _ = pending_futures.pop_front().expect("known to exist");
}
std::task::Poll::Pending => {
// We assume that write requests are worked off in order and stop
// trying for the first write that is not done.
break;
}
}
}
}),
);
}
});

data_output_stream
(data_output_stream, error_output_stream)
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/persist/src/operators/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ where
let snapshot = persist_config.read_handle.snapshot();
let (restored_oks, restored_errs) =
self.scope().replay(snapshot).ok_err(|x| split_ok_err(x));
let restored_upsert_oks = restored_oks.filter_and_retract_future_updates(
name,
persist_config.write_handle.clone(),
persist_config.upper_seal_ts,
);
(restored_upsert_oks, restored_errs)
let (restored_upsert_oks, retract_errs) = restored_oks
.filter_and_retract_future_updates(
name,
persist_config.write_handle.clone(),
persist_config.upper_seal_ts,
);
let combined_errs = restored_errs.concat(&retract_errs);
(restored_upsert_oks, combined_errs)
};

let mut differential_state_ingester = Some(DifferentialStateIngester::new());
Expand Down

0 comments on commit b731435

Please sign in to comment.