Skip to content

Commit

Permalink
NEXT-37310 - Added single row import strategy on import error
Browse files Browse the repository at this point in the history
  • Loading branch information
CR0YD committed Aug 28, 2024
1 parent 5666f34 commit d41610a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# NEXT-RELEASE
- NEXT-37310 - Added single row import strategy when encountering an error that cannot be handled automatically during a chunk import.

# v0.8.0

Expand Down
32 changes: 30 additions & 2 deletions src/data/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ fn sync_chunk(
row_indices: &[usize],
mut chunk: Vec<Entity>,
context: &Arc<SyncContext>,
) -> anyhow::Result<()> {
if let Ok(()) = attempt_chunk_sync_with_retries(row_indices, &mut chunk, context) {
return Ok(());
}

println!("chunk import failed; starting with single row import to filter faulty rows");

for (entity, index) in chunk.into_iter().zip(row_indices.iter()) {
match attempt_chunk_sync_with_retries(row_indices, &mut vec![entity], context) {
Ok(_) => {}
Err(error) => {
println!("{error:?}");
println!("invalid entry at row {index} will be skipped");
}
}
}

Ok(())
}

fn attempt_chunk_sync_with_retries(
row_indices: &[usize],
chunk: &mut Vec<Entity>,
context: &Arc<SyncContext>,
) -> anyhow::Result<()> {
let mut try_count = context.try_count.get();
loop {
Expand All @@ -115,7 +139,7 @@ fn sync_chunk(
let (error_status, error_body) =
match context
.sw_client
.sync(&context.profile.entity, SyncAction::Upsert, &chunk)
.sync(&context.profile.entity, SyncAction::Upsert, chunk)
{
Ok(()) => {
return Ok(());
Expand All @@ -138,7 +162,7 @@ fn sync_chunk(
.any(|e| matches!(e, SwError::WriteError { .. })) =>
{
println!("write error occurred; retry initialized");
remove_invalid_entries_from_chunk(row_indices, &mut chunk, body);
remove_invalid_entries_from_chunk(row_indices, chunk, body);

if chunk.is_empty() {
return Ok(());
Expand Down Expand Up @@ -197,6 +221,10 @@ fn remove_invalid_entries_from_chunk(

// sort descending to remove by index
to_be_removed.sort_unstable_by(|a, b| b.cmp(a));

// filtering duplicate rows
to_be_removed.dedup();

for index in to_be_removed {
chunk.remove(index);
}
Expand Down

0 comments on commit d41610a

Please sign in to comment.