From ae2b41e45535e15080246e575d1fa521feb07568 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 22 Nov 2021 17:15:18 +0100 Subject: [PATCH] persist: asynchronously retract in filter_and_retract_future_updates() Before, we would write retractions individually and block on the write. This was very slow when retracting larger amounts. Now, we instead write retractions in batches and asynchronously wait for the write futures to complete, similarly to how we do it in the `persist()` operator. Fixes #9123 --- src/persist/src/operators/stream.rs | 117 +++++++++++++++++++++------- src/persist/src/operators/upsert.rs | 14 ++-- 2 files changed, 98 insertions(+), 33 deletions(-) diff --git a/src/persist/src/operators/stream.rs b/src/persist/src/operators/stream.rs index 0cefbd55b479b..9d5fef056ed6f 100644 --- a/src/persist/src/operators/stream.rs +++ b/src/persist/src/operators/stream.rs @@ -709,14 +709,19 @@ pub trait RetractFutureUpdates, K: TimelyData, V: Time /// Passes through each element of the stream and sends retractions to the given /// [`StreamWriteHandle`] for updates that are beyond the given `upper_ts`. /// - /// This does wait on writing each retraction before emitting more data (or advancing the - /// frontier). + /// This does not wait for retractions to be persisted before passing through the data. We do, + /// however, wait for data to be persisted before allowing the frontier to advance. In other + /// words, this operator is holding on to capabilities as long as retractions belonging to + /// their timestamp is not persisted. fn filter_and_retract_future_updates( &self, name: &str, write: StreamWriteHandle, upper_ts: u64, - ) -> Stream; + ) -> ( + Stream, + Stream, + ); } impl RetractFutureUpdates for Stream @@ -725,40 +730,44 @@ where K: TimelyData + Codec + Debug, V: TimelyData + Codec + Debug, { - // TODO: This operator is not optimized at all. For example, instead of waiting for each write - // to succeed, we could do the same thing as the persist() operator and batch a bunch of - // futures and only wait on them once the frontier advances. - // For now, that should be ok because this only causes restarts to be slightly slower but - // doesn't affect the steady-state write path. fn filter_and_retract_future_updates( &self, name: &str, write: StreamWriteHandle, upper_ts: u64, - ) -> Stream { + ) -> ( + Stream, + Stream, + ) { + let scope = self.scope(); let operator_name = format!("retract_future_updates({})", name); - let mut persist_op = OperatorBuilder::new(operator_name, self.scope()); + let mut persist_op = OperatorBuilder::new(operator_name.clone(), self.scope()); let mut input = persist_op.new_input(&self, Pipeline); let (mut data_output, data_output_stream) = persist_op.new_output(); + let (mut error_output, error_output_stream) = persist_op.new_output(); let mut buffer = Vec::new(); + let error_output_port = error_output_stream.name().port; - persist_op.build_async( - self.scope(), - async_op!(|capabilities, _frontiers| { - // Drop initial capabilities - capabilities.clear(); + // An activator that allows futures to re-schedule this operator when ready. + let activator = Arc::new(scope.sync_activator_for(&persist_op.operator_info().address[..])); + + let mut pending_futures = VecDeque::new(); + + // Reusable buffer for collecting updates that we need to retract. We retract in batches in + // order to not make too many persist calls and keep too many write futures around. + let mut to_retract = Vec::new(); + + persist_op.build(move |_capabilities| { + move |_frontiers| { let mut data_output = data_output.activate(); + let mut error_output = error_output.activate(); // Write out everything and forward, keeping the write futures. - while let Some((cap, data)) = input.next() { - // TODO(petrosagg): remove this unconditional retain once this is released: - // https://github.com/TimelyDataflow/timely-dataflow/pull/429 - let cap = cap.retain(); + input.for_each(|cap, data| { data.swap(&mut buffer); - let mut session = data_output.session(&cap); for update in buffer.drain(..) { if update.1 >= upper_ts { @@ -769,19 +778,73 @@ where ); let (data, ts, diff) = update; let anti_update = (data, ts, -diff); - write - .write(&[anti_update]) - .await - .expect("error persisting retraction"); + to_retract.push(anti_update); continue; } session.give(update); } + + let write_future = write.write(&to_retract); + to_retract.clear(); + + // We are not using the capability for the main output later, but we are + // holding on to it to keep the frontier from advancing because that frontier + // is used downstream to track how far we have persisted. This is used, for + // example, by upsert()/persist()/seal()/conditional_seal() operators + // and await_frontier(). + pending_futures.push_back(( + cap.delayed(cap.time()), + cap.retain_for_output(error_output_port), + write_future, + )); + }); + + // Swing through all pending futures and see if they're ready. Ready futures will + // invoke the Activator, which will make sure that we arrive here, even when there + // are no changes in the input frontier or new input. + let waker = futures_util::task::waker_ref(&activator); + let mut context = Context::from_waker(&waker); + + while let Some((cap, error_cap, pending_future)) = pending_futures.front_mut() { + match Pin::new(pending_future).poll(&mut context) { + std::task::Poll::Ready(result) => { + match result { + Ok(seq_no) => { + log::trace!( + "In {}, finished writing for time: {}, seq_no: {:?}", + &operator_name, + cap.time(), + seq_no, + ); + } + Err(e) => { + let mut session = error_output.session(&error_cap); + let error = format!( + "In {}, error writing data for time {}: {}", + &operator_name, + error_cap.time(), + e + ); + log::error!("{}", error); + + // TODO: make error retractable? Probably not... + session.give((error, *error_cap.time(), 1)); + } + } + + let _ = pending_futures.pop_front().expect("known to exist"); + } + std::task::Poll::Pending => { + // We assume that write requests are worked off in order and stop + // trying for the first write that is not done. + break; + } + } } - }), - ); + } + }); - data_output_stream + (data_output_stream, error_output_stream) } } diff --git a/src/persist/src/operators/upsert.rs b/src/persist/src/operators/upsert.rs index 05e2470094710..f75b728354e70 100644 --- a/src/persist/src/operators/upsert.rs +++ b/src/persist/src/operators/upsert.rs @@ -128,12 +128,14 @@ where let snapshot = persist_config.read_handle.snapshot(); let (restored_oks, restored_errs) = self.scope().replay(snapshot).ok_err(|x| split_ok_err(x)); - let restored_upsert_oks = restored_oks.filter_and_retract_future_updates( - name, - persist_config.write_handle.clone(), - persist_config.upper_seal_ts, - ); - (restored_upsert_oks, restored_errs) + let (restored_upsert_oks, retract_errs) = restored_oks + .filter_and_retract_future_updates( + name, + persist_config.write_handle.clone(), + persist_config.upper_seal_ts, + ); + let combined_errs = restored_errs.concat(&retract_errs); + (restored_upsert_oks, combined_errs) }; let mut differential_state_ingester = Some(DifferentialStateIngester::new());