diff --git a/src/persist/src/operators/stream.rs b/src/persist/src/operators/stream.rs index 0cefbd55b479b..9d5fef056ed6f 100644 --- a/src/persist/src/operators/stream.rs +++ b/src/persist/src/operators/stream.rs @@ -709,14 +709,19 @@ pub trait RetractFutureUpdates, K: TimelyData, V: Time /// Passes through each element of the stream and sends retractions to the given /// [`StreamWriteHandle`] for updates that are beyond the given `upper_ts`. /// - /// This does wait on writing each retraction before emitting more data (or advancing the - /// frontier). + /// This does not wait for retractions to be persisted before passing through the data. We do, + /// however, wait for data to be persisted before allowing the frontier to advance. In other + /// words, this operator is holding on to capabilities as long as retractions belonging to + /// their timestamp is not persisted. fn filter_and_retract_future_updates( &self, name: &str, write: StreamWriteHandle, upper_ts: u64, - ) -> Stream; + ) -> ( + Stream, + Stream, + ); } impl RetractFutureUpdates for Stream @@ -725,40 +730,44 @@ where K: TimelyData + Codec + Debug, V: TimelyData + Codec + Debug, { - // TODO: This operator is not optimized at all. For example, instead of waiting for each write - // to succeed, we could do the same thing as the persist() operator and batch a bunch of - // futures and only wait on them once the frontier advances. - // For now, that should be ok because this only causes restarts to be slightly slower but - // doesn't affect the steady-state write path. fn filter_and_retract_future_updates( &self, name: &str, write: StreamWriteHandle, upper_ts: u64, - ) -> Stream { + ) -> ( + Stream, + Stream, + ) { + let scope = self.scope(); let operator_name = format!("retract_future_updates({})", name); - let mut persist_op = OperatorBuilder::new(operator_name, self.scope()); + let mut persist_op = OperatorBuilder::new(operator_name.clone(), self.scope()); let mut input = persist_op.new_input(&self, Pipeline); let (mut data_output, data_output_stream) = persist_op.new_output(); + let (mut error_output, error_output_stream) = persist_op.new_output(); let mut buffer = Vec::new(); + let error_output_port = error_output_stream.name().port; - persist_op.build_async( - self.scope(), - async_op!(|capabilities, _frontiers| { - // Drop initial capabilities - capabilities.clear(); + // An activator that allows futures to re-schedule this operator when ready. + let activator = Arc::new(scope.sync_activator_for(&persist_op.operator_info().address[..])); + + let mut pending_futures = VecDeque::new(); + + // Reusable buffer for collecting updates that we need to retract. We retract in batches in + // order to not make too many persist calls and keep too many write futures around. + let mut to_retract = Vec::new(); + + persist_op.build(move |_capabilities| { + move |_frontiers| { let mut data_output = data_output.activate(); + let mut error_output = error_output.activate(); // Write out everything and forward, keeping the write futures. - while let Some((cap, data)) = input.next() { - // TODO(petrosagg): remove this unconditional retain once this is released: - // https://github.com/TimelyDataflow/timely-dataflow/pull/429 - let cap = cap.retain(); + input.for_each(|cap, data| { data.swap(&mut buffer); - let mut session = data_output.session(&cap); for update in buffer.drain(..) { if update.1 >= upper_ts { @@ -769,19 +778,73 @@ where ); let (data, ts, diff) = update; let anti_update = (data, ts, -diff); - write - .write(&[anti_update]) - .await - .expect("error persisting retraction"); + to_retract.push(anti_update); continue; } session.give(update); } + + let write_future = write.write(&to_retract); + to_retract.clear(); + + // We are not using the capability for the main output later, but we are + // holding on to it to keep the frontier from advancing because that frontier + // is used downstream to track how far we have persisted. This is used, for + // example, by upsert()/persist()/seal()/conditional_seal() operators + // and await_frontier(). + pending_futures.push_back(( + cap.delayed(cap.time()), + cap.retain_for_output(error_output_port), + write_future, + )); + }); + + // Swing through all pending futures and see if they're ready. Ready futures will + // invoke the Activator, which will make sure that we arrive here, even when there + // are no changes in the input frontier or new input. + let waker = futures_util::task::waker_ref(&activator); + let mut context = Context::from_waker(&waker); + + while let Some((cap, error_cap, pending_future)) = pending_futures.front_mut() { + match Pin::new(pending_future).poll(&mut context) { + std::task::Poll::Ready(result) => { + match result { + Ok(seq_no) => { + log::trace!( + "In {}, finished writing for time: {}, seq_no: {:?}", + &operator_name, + cap.time(), + seq_no, + ); + } + Err(e) => { + let mut session = error_output.session(&error_cap); + let error = format!( + "In {}, error writing data for time {}: {}", + &operator_name, + error_cap.time(), + e + ); + log::error!("{}", error); + + // TODO: make error retractable? Probably not... + session.give((error, *error_cap.time(), 1)); + } + } + + let _ = pending_futures.pop_front().expect("known to exist"); + } + std::task::Poll::Pending => { + // We assume that write requests are worked off in order and stop + // trying for the first write that is not done. + break; + } + } } - }), - ); + } + }); - data_output_stream + (data_output_stream, error_output_stream) } } diff --git a/src/persist/src/operators/upsert.rs b/src/persist/src/operators/upsert.rs index 05e2470094710..f75b728354e70 100644 --- a/src/persist/src/operators/upsert.rs +++ b/src/persist/src/operators/upsert.rs @@ -128,12 +128,14 @@ where let snapshot = persist_config.read_handle.snapshot(); let (restored_oks, restored_errs) = self.scope().replay(snapshot).ok_err(|x| split_ok_err(x)); - let restored_upsert_oks = restored_oks.filter_and_retract_future_updates( - name, - persist_config.write_handle.clone(), - persist_config.upper_seal_ts, - ); - (restored_upsert_oks, restored_errs) + let (restored_upsert_oks, retract_errs) = restored_oks + .filter_and_retract_future_updates( + name, + persist_config.write_handle.clone(), + persist_config.upper_seal_ts, + ); + let combined_errs = restored_errs.concat(&retract_errs); + (restored_upsert_oks, combined_errs) }; let mut differential_state_ingester = Some(DifferentialStateIngester::new());