Skip to content

Commit

Permalink
persist: asynchronously retract in filter_and_retract_future_updates()
Browse files Browse the repository at this point in the history
Before, we would write retractions individually and block on the write.
This was very slow when retracting larger amounts.

Now, we instead write retractions in batches and asynchronously wait for
the write futures to complete, similarly to how we do it in the
`persist()` operator.

Fixes #9123
  • Loading branch information
aljoscha committed Dec 1, 2021
1 parent 47c3d72 commit ae2b41e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 33 deletions.
117 changes: 90 additions & 27 deletions src/persist/src/operators/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,19 @@ pub trait RetractFutureUpdates<G: Scope<Timestamp = u64>, K: TimelyData, V: Time
/// Passes through each element of the stream and sends retractions to the given
/// [`StreamWriteHandle`] for updates that are beyond the given `upper_ts`.
///
/// This does wait on writing each retraction before emitting more data (or advancing the
/// frontier).
/// This does not wait for retractions to be persisted before passing through the data. We do,
/// however, wait for data to be persisted before allowing the frontier to advance. In other
/// words, this operator is holding on to capabilities as long as retractions belonging to
/// their timestamp is not persisted.
fn filter_and_retract_future_updates(
&self,
name: &str,
write: StreamWriteHandle<K, V>,
upper_ts: u64,
) -> Stream<G, ((K, V), u64, isize)>;
) -> (
Stream<G, ((K, V), u64, isize)>,
Stream<G, (String, u64, isize)>,
);
}

impl<G, K, V> RetractFutureUpdates<G, K, V> for Stream<G, ((K, V), u64, isize)>
Expand All @@ -725,40 +730,44 @@ where
K: TimelyData + Codec + Debug,
V: TimelyData + Codec + Debug,
{
// TODO: This operator is not optimized at all. For example, instead of waiting for each write
// to succeed, we could do the same thing as the persist() operator and batch a bunch of
// futures and only wait on them once the frontier advances.
// For now, that should be ok because this only causes restarts to be slightly slower but
// doesn't affect the steady-state write path.
fn filter_and_retract_future_updates(
&self,
name: &str,
write: StreamWriteHandle<K, V>,
upper_ts: u64,
) -> Stream<G, ((K, V), u64, isize)> {
) -> (
Stream<G, ((K, V), u64, isize)>,
Stream<G, (String, u64, isize)>,
) {
let scope = self.scope();
let operator_name = format!("retract_future_updates({})", name);
let mut persist_op = OperatorBuilder::new(operator_name, self.scope());
let mut persist_op = OperatorBuilder::new(operator_name.clone(), self.scope());

let mut input = persist_op.new_input(&self, Pipeline);

let (mut data_output, data_output_stream) = persist_op.new_output();
let (mut error_output, error_output_stream) = persist_op.new_output();

let mut buffer = Vec::new();
let error_output_port = error_output_stream.name().port;

persist_op.build_async(
self.scope(),
async_op!(|capabilities, _frontiers| {
// Drop initial capabilities
capabilities.clear();
// An activator that allows futures to re-schedule this operator when ready.
let activator = Arc::new(scope.sync_activator_for(&persist_op.operator_info().address[..]));

let mut pending_futures = VecDeque::new();

// Reusable buffer for collecting updates that we need to retract. We retract in batches in
// order to not make too many persist calls and keep too many write futures around.
let mut to_retract = Vec::new();

persist_op.build(move |_capabilities| {
move |_frontiers| {
let mut data_output = data_output.activate();
let mut error_output = error_output.activate();

// Write out everything and forward, keeping the write futures.
while let Some((cap, data)) = input.next() {
// TODO(petrosagg): remove this unconditional retain once this is released:
// https://github.com/TimelyDataflow/timely-dataflow/pull/429
let cap = cap.retain();
input.for_each(|cap, data| {
data.swap(&mut buffer);

let mut session = data_output.session(&cap);
for update in buffer.drain(..) {
if update.1 >= upper_ts {
Expand All @@ -769,19 +778,73 @@ where
);
let (data, ts, diff) = update;
let anti_update = (data, ts, -diff);
write
.write(&[anti_update])
.await
.expect("error persisting retraction");
to_retract.push(anti_update);
continue;
}
session.give(update);
}

let write_future = write.write(&to_retract);
to_retract.clear();

// We are not using the capability for the main output later, but we are
// holding on to it to keep the frontier from advancing because that frontier
// is used downstream to track how far we have persisted. This is used, for
// example, by upsert()/persist()/seal()/conditional_seal() operators
// and await_frontier().
pending_futures.push_back((
cap.delayed(cap.time()),
cap.retain_for_output(error_output_port),
write_future,
));
});

// Swing through all pending futures and see if they're ready. Ready futures will
// invoke the Activator, which will make sure that we arrive here, even when there
// are no changes in the input frontier or new input.
let waker = futures_util::task::waker_ref(&activator);
let mut context = Context::from_waker(&waker);

while let Some((cap, error_cap, pending_future)) = pending_futures.front_mut() {
match Pin::new(pending_future).poll(&mut context) {
std::task::Poll::Ready(result) => {
match result {
Ok(seq_no) => {
log::trace!(
"In {}, finished writing for time: {}, seq_no: {:?}",
&operator_name,
cap.time(),
seq_no,
);
}
Err(e) => {
let mut session = error_output.session(&error_cap);
let error = format!(
"In {}, error writing data for time {}: {}",
&operator_name,
error_cap.time(),
e
);
log::error!("{}", error);

// TODO: make error retractable? Probably not...
session.give((error, *error_cap.time(), 1));
}
}

let _ = pending_futures.pop_front().expect("known to exist");
}
std::task::Poll::Pending => {
// We assume that write requests are worked off in order and stop
// trying for the first write that is not done.
break;
}
}
}
}),
);
}
});

data_output_stream
(data_output_stream, error_output_stream)
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/persist/src/operators/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ where
let snapshot = persist_config.read_handle.snapshot();
let (restored_oks, restored_errs) =
self.scope().replay(snapshot).ok_err(|x| split_ok_err(x));
let restored_upsert_oks = restored_oks.filter_and_retract_future_updates(
name,
persist_config.write_handle.clone(),
persist_config.upper_seal_ts,
);
(restored_upsert_oks, restored_errs)
let (restored_upsert_oks, retract_errs) = restored_oks
.filter_and_retract_future_updates(
name,
persist_config.write_handle.clone(),
persist_config.upper_seal_ts,
);
let combined_errs = restored_errs.concat(&retract_errs);
(restored_upsert_oks, combined_errs)
};

let mut differential_state_ingester = Some(DifferentialStateIngester::new());
Expand Down

0 comments on commit ae2b41e

Please sign in to comment.