Skip to content

Commit

Permalink
import data change to use filebatchproducer
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh committed Jan 16, 2025
1 parent a9a294c commit 1f32f7a
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,17 +1010,30 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f
if err != nil {
utils.ErrExit("preparing for file import: %s", err)
}
log.Infof("Collect all interrupted/remaining splits.")
pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)

fileBatchProducer, err := NewFileBatchProducer(task, state)
if err != nil {
utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err)
utils.ErrExit("creating file batch producer: %s", err)
}
for _, batch := range pendingBatches {

for !fileBatchProducer.Done() {
batch, err := fileBatchProducer.NextBatch()
if err != nil {
utils.ErrExit("getting next batch: %s", err)
}
submitBatch(batch, updateProgressFn, importBatchArgsProto)
}
if !fileFullySplit {
splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto)
}
// log.Infof("Collect all interrupted/remaining splits.")
// pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
// if err != nil {
// utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err)
// }
// for _, batch := range pendingBatches {
// submitBatch(batch, updateProgressFn, importBatchArgsProto)
// }
// if !fileFullySplit {
// splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto)
// }
}

func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple,
Expand Down

0 comments on commit 1f32f7a

Please sign in to comment.