diff --git a/CHANGELOG.md b/CHANGELOG.md index e6798c1..785b79d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # NEXT-RELEASE +- NEXT-37310 - Added single row import strategy when encountering an error that cannot be handled automatically during a chunk import. # v0.8.0 diff --git a/src/data/import.rs b/src/data/import.rs index 5c8199c..3f58da0 100644 --- a/src/data/import.rs +++ b/src/data/import.rs @@ -105,6 +105,30 @@ fn sync_chunk( row_indices: &[usize], mut chunk: Vec, context: &Arc, +) -> anyhow::Result<()> { + if let Ok(()) = attempt_chunk_sync_with_retries(row_indices, &mut chunk, context) { + return Ok(()); + } + + println!("chunk import failed; starting with single row import to filter faulty rows"); + + for (entity, index) in chunk.into_iter().zip(row_indices.iter()) { + match attempt_chunk_sync_with_retries(row_indices, &mut vec![entity], context) { + Ok(_) => {} + Err(error) => { + println!("{error:?}"); + println!("invalid entry at row {index} will be skipped"); + } + } + } + + Ok(()) +} + +fn attempt_chunk_sync_with_retries( + row_indices: &[usize], + chunk: &mut Vec, + context: &Arc, ) -> anyhow::Result<()> { let mut try_count = context.try_count.get(); loop { @@ -115,7 +139,7 @@ fn sync_chunk( let (error_status, error_body) = match context .sw_client - .sync(&context.profile.entity, SyncAction::Upsert, &chunk) + .sync(&context.profile.entity, SyncAction::Upsert, chunk) { Ok(()) => { return Ok(()); @@ -138,7 +162,7 @@ fn sync_chunk( .any(|e| matches!(e, SwError::WriteError { .. })) => { println!("write error occurred; retry initialized"); - remove_invalid_entries_from_chunk(row_indices, &mut chunk, body); + remove_invalid_entries_from_chunk(row_indices, chunk, body); if chunk.is_empty() { return Ok(()); @@ -197,6 +221,10 @@ fn remove_invalid_entries_from_chunk( // sort descending to remove by index to_be_removed.sort_unstable_by(|a, b| b.cmp(a)); + + // filtering duplicate rows + to_be_removed.dedup(); + for index in to_be_removed { chunk.remove(index); }