Skip to content

Commit

Permalink
Merge pull request MaterializeInc#9401 from ruchirK/fix-unsealed-batches
Browse files Browse the repository at this point in the history
persist: fix error on empty writes
  • Loading branch information
ruchirK authored Dec 2, 2021
2 parents 899975a + ee77fa1 commit d6c4aee
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
34 changes: 33 additions & 1 deletion src/persist/src/indexed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ impl Pending {
}
}

// Add all non-empty writes to be persisted into an arrangement in the future.
fn add_writes(&mut self, updates: Vec<(Id, ColumnarRecords)>) {
for (id, updates) in updates {
self.writes.entry(id).or_default().push(updates);
if updates.len() != 0 {
self.writes.entry(id).or_default().push(updates);
}
}
}

Expand Down Expand Up @@ -856,6 +859,14 @@ impl AppliedState {
return Ok(());
}

// Sanity check the invariant that only non-empty writes get appended to
// unsealed.
if cfg!(debug_assertions) {
for update in updates.iter() {
assert!(update.len() > 0);
}
}

let batch = BlobUnsealedBatch {
desc: desc.clone(),
updates,
Expand Down Expand Up @@ -1337,6 +1348,27 @@ mod tests {
Ok(())
}

#[test]
fn regression_empty_unsealed_batch() -> Result<(), Error> {
let updates = vec![];

let mut i = MemRegistry::new().indexed_no_reentrance()?;
let id = block_on(|res| i.register("0", "", "", res))?;

// Write the data and move it into the unsealed part of the index.
assert_eq!(
block_on_drain(&mut i, |i, handle| {
i.write(
vec![(id, updates.iter().collect::<ColumnarRecords>())],
handle,
)
}),
Ok(SeqNo(1))
);

Ok(())
}

// Regression test for two similar bugs causing unsealed batches with
// non-adjacent seqno boundaries (which violates our invariants).
#[test]
Expand Down
28 changes: 15 additions & 13 deletions src/persist/src/operators/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,19 +784,21 @@ where
session.give(update);
}

let write_future = write.write(&to_retract);
to_retract.clear();

// We are not using the capability for the main output later, but we are
// holding on to it to keep the frontier from advancing because that frontier
// is used downstream to track how far we have persisted. This is used, for
// example, by upsert()/persist()/seal()/conditional_seal() operators
// and await_frontier().
pending_futures.push_back((
cap.delayed(cap.time()),
cap.retain_for_output(error_output_port),
write_future,
));
if !to_retract.is_empty() {
let write_future = write.write(&to_retract);
to_retract.clear();

// We are not using the capability for the main output later, but we are
// holding on to it to keep the frontier from advancing because that frontier
// is used downstream to track how far we have persisted. This is used, for
// example, by upsert()/persist()/seal()/conditional_seal() operators
// and await_frontier().
pending_futures.push_back((
cap.delayed(cap.time()),
cap.retain_for_output(error_output_port),
write_future,
));
}
});

// Swing through all pending futures and see if they're ready. Ready futures will
Expand Down

0 comments on commit d6c4aee

Please sign in to comment.