diff --git a/block/gs/adapter.go b/block/gs/adapter.go index 87d21cef976..06368351744 100644 --- a/block/gs/adapter.go +++ b/block/gs/adapter.go @@ -424,7 +424,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error { return nil } -func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) { +func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) { return nil, fmt.Errorf("inventory %w", ErrNotImplemented) } diff --git a/block/inventory.go b/block/inventory.go index b4824c189f3..e418c99c9b5 100644 --- a/block/inventory.go +++ b/block/inventory.go @@ -9,7 +9,7 @@ import ( ) type InventoryGenerator interface { - GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool) (Inventory, error) + GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool, prefixes []string) (Inventory, error) } // Inventory represents a snapshot of the storage space diff --git a/block/local/adapter.go b/block/local/adapter.go index 5353ceb0852..f19c86ad941 100644 --- a/block/local/adapter.go +++ b/block/local/adapter.go @@ -331,7 +331,7 @@ func (l *Adapter) ValidateConfiguration(_ string) error { return nil } -func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) { +func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) { return nil, ErrInventoryNotSupported } diff --git a/block/mem/adapter.go b/block/mem/adapter.go index d5dcf7abdf3..2999b01ece0 100644 --- a/block/mem/adapter.go +++ b/block/mem/adapter.go @@ -227,7 +227,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error { return nil } -func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) { +func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) { return nil, ErrInventoryNotImplemented } diff --git a/block/s3/inventory.go b/block/s3/inventory.go index 92c97f63c22..8b720efaa83 100644 --- a/block/s3/inventory.go +++ b/block/s3/inventory.go @@ -7,6 +7,7 @@ import ( "fmt" "net/url" "sort" + "strings" "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/service/s3" @@ -29,14 +30,16 @@ type Manifest struct { } type inventoryFile struct { - Key string `json:"key"` // an s3 key for an inventory list file + Key string `json:"key"` // an s3 key for an inventory list file + firstKey string + lastKey string } -func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool) (block.Inventory, error) { - return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort) +func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool, prefixes []string) (block.Inventory, error) { + return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort, prefixes) } -func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool) (block.Inventory, error) { +func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool, prefixes []string) (block.Inventory, error) { if logger == nil { logger = logging.Default() } @@ -44,13 +47,18 @@ func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3A if err != nil { return nil, err } - if shouldSort { + if shouldSort || len(prefixes) > 0 { err = sortManifest(m, logger, inventoryReader) } if err != nil { return nil, err } - return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader}, nil + if len(prefixes) > 0 { + manifestFileCount := len(m.Files) + m.Files = filterFiles(m.Files, prefixes) + logger.Debugf("manifest filtered from %d to %d files", manifestFileCount, len(m.Files)) + } + return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader, prefixes: prefixes}, nil } type Inventory struct { @@ -58,6 +66,7 @@ type Inventory struct { logger logging.Logger shouldSort bool reader s3inventory.IReader + prefixes []string } func (inv *Inventory) Iterator() block.InventoryIterator { @@ -98,30 +107,58 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) { return &m, nil } +func filterFiles(files []inventoryFile, prefixes []string) []inventoryFile { + sort.Strings(prefixes) + currentPrefixIdx := 0 + filteredFiles := make([]inventoryFile, 0) + for i := 0; i < len(files); i++ { + for { + // find a prefix that may have suitable keys in the current file + if prefixes[currentPrefixIdx] >= files[i].firstKey { + // prefix may be in scope of current file + break + } + if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) { + // first object in file starts with prefix + break + } + // current prefix ends before this file - move to next prefix + currentPrefixIdx++ + if currentPrefixIdx == len(prefixes) { + // no more prefixes - other files are irrelevant + return filteredFiles + } + } + if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) || + (prefixes[currentPrefixIdx] >= files[i].firstKey && prefixes[currentPrefixIdx] < files[i].lastKey) { + // file may contain keys starting with this prefix + filteredFiles = append(filteredFiles, files[i]) + } + } + return filteredFiles +} + func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader) error { - firstKeyByInventoryFile := make(map[string]string) - lastKeyByInventoryFile := make(map[string]string) - for _, f := range m.Files { + for i, f := range m.Files { mr, err := reader.GetMetadataReader(m.Format, m.inventoryBucket, f.Key) if err != nil { return fmt.Errorf("failed to sort inventory files in manifest: %w", err) } - firstKeyByInventoryFile[f.Key] = mr.FirstObjectKey() - lastKeyByInventoryFile[f.Key] = mr.LastObjectKey() + m.Files[i].firstKey = mr.FirstObjectKey() + m.Files[i].lastKey = mr.LastObjectKey() err = mr.Close() if err != nil { logger.Errorf("failed to close inventory file. file=%s, err=%w", f, err) } } sort.Slice(m.Files, func(i, j int) bool { - return firstKeyByInventoryFile[m.Files[i].Key] < firstKeyByInventoryFile[m.Files[j].Key] || - (firstKeyByInventoryFile[m.Files[i].Key] == firstKeyByInventoryFile[m.Files[j].Key] && - lastKeyByInventoryFile[m.Files[i].Key] < lastKeyByInventoryFile[m.Files[j].Key]) + return m.Files[i].firstKey < m.Files[j].firstKey || + (m.Files[i].firstKey == m.Files[j].firstKey && m.Files[i].lastKey < m.Files[j].lastKey) }) // validate sorting: if a file begins before the next one ends - the files cover overlapping ranges, // which we don't know how to handle. for i := 0; i < len(m.Files)-1; i++ { - if firstKeyByInventoryFile[m.Files[i+1].Key] < lastKeyByInventoryFile[m.Files[i].Key] { + if m.Files[i+1].firstKey < m.Files[i].lastKey { return ErrInventoryFilesRangesOverlap } } diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index f1002d069d8..942e3991f4e 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "github.com/treeverse/lakefs/block" @@ -22,6 +23,7 @@ type InventoryIterator struct { valIndexInBuffer int inventoryFileProgress *cmdutils.Progress currentFileProgress *cmdutils.Progress + currentPrefix int } func NewInventoryIterator(inv *Inventory) *InventoryIterator { @@ -112,6 +114,24 @@ func (it *InventoryIterator) nextFromBuffer() *block.InventoryObject { if !obj.IsLatest || obj.IsDeleteMarker { continue } + if len(it.prefixes) > 0 { + // check file against prefix filter + // loop while current prefix may fit current object + for it.prefixes[it.currentPrefix] < obj.Key { + if strings.HasPrefix(obj.Key, it.prefixes[it.currentPrefix]) { + // found current prefix + break + } + it.currentPrefix++ + if it.currentPrefix == len(it.prefixes) { + // no more prefixes + return nil + } + } + if !strings.HasPrefix(obj.Key, it.prefixes[it.currentPrefix]) { + continue + } + } res := block.InventoryObject{ Bucket: obj.Bucket, Key: obj.Key, diff --git a/block/s3/inventory_test.go b/block/s3/inventory_test.go index 505779e8e68..a8287f1f192 100644 --- a/block/s3/inventory_test.go +++ b/block/s3/inventory_test.go @@ -66,6 +66,11 @@ var fileContents = map[string][]string{ "f_overlap3": {"fo_row2", "fo_row6"}, "f_overlap4": {"fo_row1", "fo_row4"}, "f_overlap5": {"fo_row2", "fo_row4"}, + "f1_prefix": {"a1", "a2", "b1", "b2"}, + "f2_prefix": {"b3", "b4", "c1", "c2"}, + "f3_prefix": {"d1", "d2", "e1", "e2"}, + "f4_prefix": {"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}, + "f5_prefix": {"e1", "e2", "f1", "f2", "g1", "g2", "h1", "h2"}, } func TestIterator(t *testing.T) { @@ -76,136 +81,189 @@ func TestIterator(t *testing.T) { lastModified[r] = now.Add(time.Hour * time.Duration(-i)) } } - testdata := []struct { - InventoryFiles []string - ExpectedObjects []string - ErrExpected error - ShouldSort bool + testdata := map[string]struct { + InventoryFiles []string + ExpectedObjects []string + Prefixes []string + ErrExpected error + ExpectedCountReadRows int + ExpectedCountGetFileReader int + ShouldSort bool }{ - { + "new inventory": { InventoryFiles: []string{"f1", "f2", "f3"}, ExpectedObjects: []string{"f1row2", "f1row3", "f2row1", "f2row2", "f3row1", "f3row2"}, }, - { + "new inventory - sort before": { InventoryFiles: []string{"f3", "f2", "f1"}, ShouldSort: true, ExpectedObjects: []string{"f1row2", "f1row3", "f2row1", "f2row2", "f3row1", "f3row2"}, }, - { + "new inventory - unsorted": { + InventoryFiles: []string{"f3", "f2", "f1"}, + ExpectedObjects: []string{"f3row1", "f3row2", "f2row1", "f2row2", "f1row2", "f1row3"}, + }, + "empty inventory": { InventoryFiles: []string{}, ExpectedObjects: []string{}, }, - { + "single file": { InventoryFiles: []string{"f4"}, ExpectedObjects: []string{"f4row1", "f4row2", "f4row3", "f4row4", "f4row5", "f4row6", "f4row7"}, }, - { - InventoryFiles: []string{"f1", "f4"}, - ExpectedObjects: []string{"f1row2", "f1row3", "f4row1", "f4row2", "f4row3", "f4row4", "f4row5", "f4row6", "f4row7"}, - }, - { - InventoryFiles: []string{"f5", "f6"}, - ExpectedObjects: []string{"f5row1", "f5row2", "f5row3", "f6row1", "f6row2", "f6row3", "f6row4"}, - }, - { + "unsorted inventory file": { InventoryFiles: []string{"f1", "unsorted_file"}, ErrExpected: s3.ErrInventoryNotSorted, ShouldSort: true, }, - { + "file with error": { InventoryFiles: []string{"f5", "err_file1"}, ErrExpected: ErrReadFile, }, - { + "file with error in the middle": { InventoryFiles: []string{"f1", "f2", "f3", "f4", "f5", "f6", "err_file2"}, ErrExpected: ErrReadFile, }, - { + "file with many deletions": { InventoryFiles: []string{"f7"}, ExpectedObjects: []string{"f7row1", "f7row11"}, }, - { + "inventory with everything deleted": { InventoryFiles: []string{"all_deleted1", "all_deleted2", "all_deleted3"}, ExpectedObjects: []string{}, }, - { + "many files with everything deleted": { InventoryFiles: []string{"all_deleted1", "all_deleted2", "f1", "all_deleted3", "all_deleted4", "all_deleted5", "all_deleted6", "all_deleted7", "f2", "all_deleted8", "all_deleted9"}, ExpectedObjects: []string{"f1row2", "f1row3", "f2row1", "f2row2"}, }, - { + "many files with everything deleted - sort before test": { InventoryFiles: []string{"all_deleted1", "all_deleted2", "f2", "all_deleted3", "all_deleted4", "all_deleted5", "all_deleted6", "all_deleted7", "f1", "all_deleted8", "all_deleted9"}, ExpectedObjects: []string{"f1row2", "f1row3", "f2row1", "f2row2"}, ShouldSort: true, }, - { + "empty file": { InventoryFiles: []string{"empty_file"}, ExpectedObjects: []string{}, }, - { + "overlapping inventory files": { InventoryFiles: []string{"f_overlap1", "f_overlap2"}, ShouldSort: true, ErrExpected: s3.ErrInventoryFilesRangesOverlap, }, - { + "overlapping inventory files - type 2": { InventoryFiles: []string{"f_overlap1", "f_overlap3"}, ShouldSort: true, ErrExpected: s3.ErrInventoryFilesRangesOverlap, }, - { + "overlapping inventory files - type 3": { InventoryFiles: []string{"f_overlap1", "f_overlap4"}, ShouldSort: true, ErrExpected: s3.ErrInventoryFilesRangesOverlap, }, - { + "overlapping inventory files - type 4": { InventoryFiles: []string{"f_overlap4", "f_overlap5"}, ShouldSort: true, ErrExpected: s3.ErrInventoryFilesRangesOverlap, }, + "import with prefix": { + InventoryFiles: []string{"f1_prefix", "f2_prefix"}, + Prefixes: []string{"b"}, + ExpectedObjects: []string{"b1", "b2", "b3", "b4"}, + ExpectedCountReadRows: 8, + ExpectedCountGetFileReader: 2, + }, + "import with prefix - skip entire file": { + InventoryFiles: []string{"f1_prefix", "f2_prefix", "f3_prefix"}, + Prefixes: []string{"b"}, + ExpectedObjects: []string{"b1", "b2", "b3", "b4"}, + ExpectedCountReadRows: 8, + ExpectedCountGetFileReader: 2, + }, + "import with prefix - skip first file": { + InventoryFiles: []string{"f1", "f2", "f3"}, + Prefixes: []string{"f2", "f3"}, + ExpectedObjects: []string{"f2row1", "f2row2", "f3row1", "f3row2"}, + ExpectedCountReadRows: 4, + ExpectedCountGetFileReader: 2, + }, + "import with prefix - unsorted prefixes": { + InventoryFiles: []string{"f1", "f2", "f3"}, + Prefixes: []string{"f3", "f2"}, + ExpectedObjects: []string{"f2row1", "f2row2", "f3row1", "f3row2"}, + ExpectedCountReadRows: 4, + ExpectedCountGetFileReader: 2, + }, + "import with prefix - unsorted inventory": { + InventoryFiles: []string{"f3", "f2", "f1"}, + Prefixes: []string{"f2", "f3"}, + ExpectedObjects: []string{"f2row1", "f2row2", "f3row1", "f3row2"}, + ShouldSort: true, + ExpectedCountReadRows: 4, + ExpectedCountGetFileReader: 2, + }, + "import with prefix - prefix in middle": { + InventoryFiles: []string{"f4_prefix", "f5_prefix"}, + Prefixes: []string{"b", "f"}, + ExpectedObjects: []string{"b1", "b2", "f1", "f2"}, + ExpectedCountReadRows: 16, + ExpectedCountGetFileReader: 2, + }, } manifestURL := "s3://example-bucket/manifest1.json" - for _, test := range testdata { - s3api := &mockS3Client{ - FilesByManifestURL: map[string][]string{manifestURL: test.InventoryFiles}, - } - reader := &mockInventoryReader{openFiles: make(map[string]bool), lastModified: lastModified} - inv, err := s3.GenerateInventory(logging.Default(), manifestURL, s3api, reader, test.ShouldSort) - if err != nil { - if errors.Is(err, test.ErrExpected) { - continue + for name, test := range testdata { + t.Run(name, func(t *testing.T) { + + s3api := &mockS3Client{ + FilesByManifestURL: map[string][]string{manifestURL: test.InventoryFiles}, } - t.Fatalf("error: %v", err) - } - it := inv.Iterator() - objects := make([]*block.InventoryObject, 0, len(test.ExpectedObjects)) - for it.Next() { - objects = append(objects, it.Get()) - } - if len(reader.openFiles) != 0 { - t.Errorf("some files stayed open: %v", reader.openFiles) - } - if !errors.Is(it.Err(), test.ErrExpected) { - t.Fatalf("got unexpected error. expected=%v, got=%v.", test.ErrExpected, it.Err()) - } - if test.ErrExpected != nil { - continue - } - if len(objects) != len(test.ExpectedObjects) { - t.Fatalf("unexpected number of objects in inventory. expected=%d, got=%d", len(test.ExpectedObjects), len(objects)) - } - for i, obj := range objects { - if obj.Key != test.ExpectedObjects[i] { - t.Fatalf("at index %d: expected=%s, got=%s", i, test.ExpectedObjects[i], obj.Key) + reader := &mockInventoryReader{openFiles: make(map[string]bool), lastModified: lastModified} + inv, err := s3.GenerateInventory(logging.Default(), manifestURL, s3api, reader, test.ShouldSort, test.Prefixes) + if err != nil { + if errors.Is(err, test.ErrExpected) { + return + } + t.Fatalf("error: %v", err) } - if *obj.LastModified != lastModified[obj.Key] { - t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, lastModified[obj.Key], obj.LastModified) + it := inv.Iterator() + objects := make([]*block.InventoryObject, 0, len(test.ExpectedObjects)) + for it.Next() { + objects = append(objects, it.Get()) } - } + if len(reader.openFiles) != 0 { + t.Errorf("some files stayed open: %v", reader.openFiles) + } + if !errors.Is(it.Err(), test.ErrExpected) { + t.Fatalf("got unexpected error. expected=%v, got=%v.", test.ErrExpected, it.Err()) + } + if test.ErrExpected != nil { + return + } + if len(objects) != len(test.ExpectedObjects) { + t.Fatalf("unexpected number of objects in inventory. expected=%d, got=%d", len(test.ExpectedObjects), len(objects)) + } + if test.ExpectedCountReadRows > 0 && test.ExpectedCountReadRows != reader.countReadRows { + t.Fatalf("total number of read rows different than expected. expected=%d, got=%d", test.ExpectedCountReadRows, reader.countReadRows) + } + if test.ExpectedCountGetFileReader > 0 && test.ExpectedCountGetFileReader != reader.countGetFileReader { + t.Fatalf("total number of get file reader different than expected. expected=%d, got=%d", test.ExpectedCountGetFileReader, reader.countGetFileReader) + } + for i, obj := range objects { + if obj.Key != test.ExpectedObjects[i] { + t.Fatalf("at index %d: expected=%s, got=%s", i, test.ExpectedObjects[i], obj.Key) + } + if *obj.LastModified != lastModified[obj.Key] { + t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, lastModified[obj.Key], obj.LastModified) + } + } + }) } } type mockInventoryReader struct { - openFiles map[string]bool - lastModified map[string]time.Time + openFiles map[string]bool + lastModified map[string]time.Time + countReadRows int + countGetFileReader int } type mockInventoryFileReader struct { @@ -254,6 +312,7 @@ func (m *mockInventoryFileReader) Read(n int) ([]*s3inventory.InventoryObject, e res = append(res, m.rows[i]) } m.nextIdx = m.nextIdx + len(res) + m.inventoryReader.countReadRows += len(res) return res, nil } @@ -263,6 +322,7 @@ func (m *mockInventoryFileReader) GetNumRows() int64 { func (m *mockInventoryReader) GetFileReader(_ string, _ string, key string) (s3inventory.FileReader, error) { m.openFiles[key] = true + m.countGetFileReader++ return &mockInventoryFileReader{rows: rows(fileContents[key], m.lastModified), inventoryReader: m, key: key}, nil } diff --git a/block/transient/adapter.go b/block/transient/adapter.go index 496a4d9b4c3..5ff6efe8da5 100644 --- a/block/transient/adapter.go +++ b/block/transient/adapter.go @@ -110,7 +110,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error { return nil } -func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) { +func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) { return nil, ErrInventoryNotImplemented } diff --git a/cmd/lakefs/cmd/import.go b/cmd/lakefs/cmd/import.go index aac2bf547e8..211c218d658 100644 --- a/cmd/lakefs/cmd/import.go +++ b/cmd/lakefs/cmd/import.go @@ -1,6 +1,7 @@ package cmd import ( + "bufio" "context" "errors" "fmt" @@ -22,13 +23,14 @@ import ( ) const ( - DryRunFlagName = "dry-run" - WithMergeFlagName = "with-merge" - HideProgress = "hide-progress" - ManifestURLFlagName = "manifest" - ManifestURLFormat = "s3://example-bucket/inventory/YYYY-MM-DDT00-00Z/manifest.json" - ImportCmdNumArgs = 1 - CommitterName = "lakefs" + DryRunFlagName = "dry-run" + WithMergeFlagName = "with-merge" + HideProgressFlagName = "hide-progress" + ManifestURLFlagName = "manifest" + PrefixesFileFlagName = "prefix-file" + ManifestURLFormat = "s3://example-bucket/inventory/YYYY-MM-DDT00-00Z/manifest.json" + ImportCmdNumArgs = 1 + CommitterName = "lakefs" ) var importCmd = &cobra.Command{ @@ -44,7 +46,9 @@ var importCmd = &cobra.Command{ dryRun, _ := flags.GetBool(DryRunFlagName) manifestURL, _ := flags.GetString(ManifestURLFlagName) withMerge, _ := flags.GetBool(WithMergeFlagName) - hideProgress, _ := flags.GetBool(HideProgress) + hideProgress, _ := flags.GetBool(HideProgressFlagName) + prefixFile, _ := flags.GetString(PrefixesFileFlagName) + ctx := context.Background() conf := config.NewConfig() err := db.ValidateSchemaUpToDate(conf.GetDatabaseParams()) @@ -92,12 +96,36 @@ var importCmd = &cobra.Command{ if dryRun { fmt.Print("Starting import dry run. Will not perform any changes.\n\n") } + var prefixes []string + if prefixFile != "" { + file, err := os.Open(prefixFile) + if err != nil { + fmt.Printf("Failed to read prefix filter: %s\n", err) + os.Exit(1) + } + defer func() { + _ = file.Close() + }() + scanner := bufio.NewScanner(file) + for scanner.Scan() { + prefix := scanner.Text() + if prefix != "" { + prefixes = append(prefixes, prefix) + } + } + if err := scanner.Err(); err != nil { + fmt.Printf("Failed to read prefix filter: %s\n", err) + os.Exit(1) + } + fmt.Printf("Filtering according to %d prefixes\n", len(prefixes)) + } importConfig := &onboard.Config{ CommitUsername: CommitterName, InventoryURL: manifestURL, Repository: repoName, InventoryGenerator: blockStore, Cataloger: cataloger, + KeyPrefixes: prefixes, } importer, err := onboard.CreateImporter(ctx, logger, importConfig) if err != nil { @@ -183,5 +211,6 @@ func init() { importCmd.Flags().StringP(ManifestURLFlagName, "m", "", fmt.Sprintf("S3 uri to the manifest.json to use for the import. Format: %s", ManifestURLFormat)) _ = importCmd.MarkFlagRequired(ManifestURLFlagName) importCmd.Flags().Bool(WithMergeFlagName, false, "Merge imported data to the repository's main branch") - importCmd.Flags().Bool(HideProgress, false, "Suppress progress bar") + importCmd.Flags().Bool(HideProgressFlagName, false, "Suppress progress bar") + importCmd.Flags().StringP(PrefixesFileFlagName, "p", "", "File with a list of key prefixes. Imported object keys will be filtered according to these prefixes") } diff --git a/gateway/operations/mock_adapter_test.go b/gateway/operations/mock_adapter_test.go index 98e96397ef6..1baedb3e77a 100644 --- a/gateway/operations/mock_adapter_test.go +++ b/gateway/operations/mock_adapter_test.go @@ -82,7 +82,7 @@ func (a *mockAdapter) ValidateConfiguration(_ string) error { return nil } -func (a *mockAdapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) { +func (a *mockAdapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) { return nil, nil } diff --git a/onboard/import.go b/onboard/import.go index 67d17c631f9..30cd9ebe032 100644 --- a/onboard/import.go +++ b/onboard/import.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/treeverse/lakefs/block" @@ -22,6 +23,7 @@ type Importer struct { logger logging.Logger previousCommit *catalog.CommitLog progress []*cmdutils.Progress + prefixes []string } type Config struct { @@ -31,6 +33,7 @@ type Config struct { InventoryGenerator block.InventoryGenerator Cataloger catalog.Cataloger CatalogActions RepoActions + KeyPrefixes []string } type Stats struct { @@ -45,6 +48,7 @@ type Stats struct { var ( ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata") ErrInventoryAlreadyImported = errors.New("given inventory was already imported") + ErrIncompatiblePrefixes = errors.New("prefix filter should cover at least the same keys as the previous import") ) func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) { @@ -62,7 +66,12 @@ func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) return nil, fmt.Errorf("failed to get previous commit: %w", err) } res.previousCommit = previousCommit - res.inventory, err = config.InventoryGenerator.GenerateInventory(ctx, logger, config.InventoryURL, res.previousCommit != nil) + shouldSort := res.previousCommit != nil + res.inventory, err = config.InventoryGenerator.GenerateInventory(ctx, logger, config.InventoryURL, shouldSort, config.KeyPrefixes) + res.prefixes = config.KeyPrefixes + if !res.validatePrefixes() { + return nil, ErrIncompatiblePrefixes + } if err != nil { return nil, err } @@ -77,7 +86,8 @@ func (s *Importer) diffIterator(ctx context.Context, commit catalog.CommitLog) ( if previousInventoryURL == s.inventory.InventoryURL() { return nil, fmt.Errorf("%w. commit_ref=%s", ErrInventoryAlreadyImported, commit.Reference) } - previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true) + previousPrefixes := ExtractPrefixes(commit.Metadata) + previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true, previousPrefixes) if err != nil { return nil, fmt.Errorf("failed to create inventory for previous state: %w", err) } @@ -110,7 +120,7 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { stats.PreviousInventoryURL = s.previousCommit.Metadata["inventory_url"] } if !dryRun { - commitMetadata := CreateCommitMetadata(s.inventory, *stats) + commitMetadata := CreateCommitMetadata(s.inventory, *stats, s.prefixes) commitLog, err := s.CatalogActions.Commit(ctx, fmt.Sprintf(CommitMsgTemplate, s.inventory.SourceName()), commitMetadata) if err != nil { return nil, err @@ -120,6 +130,27 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { return stats, nil } +// validatePrefixes validates that the new set of prefixes covers all strings covered by the previous set. +func (s *Importer) validatePrefixes() bool { + if s.previousCommit == nil { + return true + } + previousPrefixes := ExtractPrefixes(s.previousCommit.Metadata) + for _, p1 := range previousPrefixes { + ok := false + for _, p2 := range s.prefixes { + if strings.HasPrefix(p1, p2) { + ok = true + break + } + } + if !ok { + return false + } + } + return true +} + func (s *Importer) Progress() []*cmdutils.Progress { return s.progress } diff --git a/onboard/import_test.go b/onboard/import_test.go index 0a440e97411..89e2e80d0bd 100644 --- a/onboard/import_test.go +++ b/onboard/import_test.go @@ -2,6 +2,7 @@ package onboard_test import ( "context" + "errors" "reflect" "strconv" "testing" @@ -11,141 +12,218 @@ import ( ) func TestImport(t *testing.T) { - testdata := []struct { + testdata := map[string]struct { NewInventory []string PreviousInventory []string ExpectedAdded []string ExpectedDeleted []string - ExpectedErr bool + ExpectedErr error OverrideNewInventoryURL string OverridePreviousInventoryURL string + Prefixes []string + PreviousPrefixes []string }{ - { + "new inventory": { NewInventory: []string{"f1", "f2"}, ExpectedAdded: []string{"f1", "f2"}, }, - { + "new inventory - unsorted": { + NewInventory: []string{"f2", "f1"}, + ExpectedAdded: []string{"f2", "f1"}, + }, + "with previous inventory": { NewInventory: []string{"f1", "f2", "f3", "f4"}, PreviousInventory: []string{"f1", "f2"}, ExpectedAdded: []string{"f3", "f4"}, ExpectedDeleted: nil, }, - { + "with previous inventory - unsorted": { + NewInventory: []string{"f4", "f3", "f2", "f1"}, + PreviousInventory: []string{"f1", "f2"}, + ExpectedAdded: []string{"f3", "f4"}, + ExpectedDeleted: nil, + }, + "delete objects": { NewInventory: []string{"f1", "f2"}, PreviousInventory: []string{"f1", "f2", "f3", "f4"}, ExpectedAdded: nil, ExpectedDeleted: []string{"f3", "f4"}, }, - { + "add and delete objects": { NewInventory: []string{"f1", "f2", "s1"}, PreviousInventory: []string{"f1", "f2", "f3", "f4"}, ExpectedAdded: []string{"s1"}, ExpectedDeleted: []string{"f3", "f4"}, }, - { + "delete all objects": { NewInventory: []string{}, PreviousInventory: []string{"f1", "f2", "f3", "f4"}, ExpectedAdded: nil, ExpectedDeleted: []string{"f1", "f2", "f3", "f4"}, }, - { + "do nothing": { // do nothing, expect no errors }, - { + "complex add and delete": { NewInventory: []string{"a01", "a02", "a03", "a04", "a05", "a06", "a07"}, PreviousInventory: []string{"a01", "a02", "a04", "a08", "a09", "a10"}, ExpectedDeleted: []string{"a08", "a09", "a10"}, ExpectedAdded: []string{"a03", "a05", "a06", "a07"}, }, - { + "add many objects": { NewInventory: []string{"a1", "a2", "a3", "a4", "a5", "a6", "a7"}, ExpectedAdded: []string{"a1", "a2", "a3", "a4", "a5", "a6", "a7"}, }, + "import with prefix - prefix at start": { + NewInventory: []string{"a1", "a2", "b1", "b2"}, + Prefixes: []string{"a"}, + ExpectedAdded: []string{"a1", "a2"}, + }, + "import with prefix - multiple prefixes": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}, + Prefixes: []string{"b", "d"}, + ExpectedAdded: []string{"b1", "b2", "d1", "d2"}, + }, + "import with prefix - prefixes unsorted": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2", "d1", "d2"}, + Prefixes: []string{"d", "b"}, + ExpectedAdded: []string{"b1", "b2", "d1", "d2"}, + }, + "import with prefix - prefix at end": { + NewInventory: []string{"a1", "a2", "b1", "b2"}, + Prefixes: []string{"b"}, + ExpectedAdded: []string{"b1", "b2"}, + }, + "prefix incompatible with previous": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2"}, + PreviousInventory: []string{"a1", "a2", "b1", "b2"}, + Prefixes: []string{"a"}, + PreviousPrefixes: []string{"a", "c"}, + ExpectedErr: onboard.ErrIncompatiblePrefixes, + }, + "import with prefix - with previous import": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2"}, + PreviousInventory: []string{"a1", "a2", "a3", "b1", "b2", "b3"}, + Prefixes: []string{"a"}, + PreviousPrefixes: []string{"a"}, + ExpectedDeleted: []string{"a3"}, + }, + "prefix when previous import was without prefixes": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2"}, + PreviousInventory: []string{"a1", "a2", "a3", "b1", "b2", "b3"}, + PreviousPrefixes: []string{"a"}, + ExpectedErr: onboard.ErrIncompatiblePrefixes, + }, + "new prefixes superset of previous": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2"}, + PreviousInventory: []string{"a1", "a2", "a3", "b1", "b2"}, + Prefixes: []string{"a"}, + PreviousPrefixes: []string{"a1", "a2"}, + }, + "new prefixes superset of previous - type 2": { + NewInventory: []string{"aa1", "ab1", "ab2", "b1", "b2"}, + PreviousInventory: []string{"aa1", "aa2", "ab1", "b1", "b2"}, + Prefixes: []string{"a"}, + PreviousPrefixes: []string{"aa", "ac"}, + ExpectedAdded: []string{"ab1", "ab2"}, + ExpectedDeleted: []string{"aa2"}, + }, + "prefix incompatible with previous - type 2": { + NewInventory: []string{"a1", "a2", "b1", "b2", "c1", "c2"}, + PreviousInventory: []string{"a1", "a2", "a3", "b1", "b2"}, + Prefixes: []string{"a"}, + PreviousPrefixes: []string{"a1", "a2", "b"}, + ExpectedErr: onboard.ErrIncompatiblePrefixes, + }, } for _, dryRun := range []bool{true, false} { - for _, test := range testdata { - newInventoryURL := NewInventoryURL - previousInventoryURL := PreviousInventoryURL - if test.OverrideNewInventoryURL != "" { - newInventoryURL = test.OverrideNewInventoryURL - } - if test.OverridePreviousInventoryURL != "" { - newInventoryURL = test.OverridePreviousInventoryURL - } - catalogActionsMock := mockCatalogActions{} - if len(test.PreviousInventory) > 0 { - catalogActionsMock = mockCatalogActions{ - previousCommitInventory: previousInventoryURL, - } - } - inventoryGenerator := &mockInventoryGenerator{ - newInventoryURL: newInventoryURL, - previousInventoryURL: previousInventoryURL, - newInventory: test.NewInventory, - previousInventory: test.PreviousInventory, - sourceBucket: "example-repo", - } - config := &onboard.Config{ - CommitUsername: "committer", - InventoryURL: newInventoryURL, - Repository: "example-repo", - InventoryGenerator: inventoryGenerator, - CatalogActions: &catalogActionsMock, - } - importer, err := onboard.CreateImporter(context.TODO(), logging.Default(), config) - if err != nil { - t.Fatalf("failed to create importer: %v", err) - } - stats, err := importer.Import(context.Background(), dryRun) - if err != nil { - if !test.ExpectedErr { + for name, test := range testdata { + t.Run(name, func(t *testing.T) { + newInventoryURL := NewInventoryURL + previousInventoryURL := PreviousInventoryURL + if test.OverrideNewInventoryURL != "" { + newInventoryURL = test.OverrideNewInventoryURL + } + if test.OverridePreviousInventoryURL != "" { + newInventoryURL = test.OverridePreviousInventoryURL + } + catalogActionsMock := mockCatalogActions{} + if len(test.PreviousInventory) > 0 { + catalogActionsMock = mockCatalogActions{ + previousCommitInventory: previousInventoryURL, + previousCommitPrefixes: test.PreviousPrefixes, + } + } + inventoryGenerator := &mockInventoryGenerator{ + newInventoryURL: newInventoryURL, + previousInventoryURL: previousInventoryURL, + newInventory: test.NewInventory, + previousInventory: test.PreviousInventory, + sourceBucket: "example-repo", + } + config := &onboard.Config{ + CommitUsername: "committer", + InventoryURL: newInventoryURL, + Repository: "example-repo", + InventoryGenerator: inventoryGenerator, + CatalogActions: &catalogActionsMock, + KeyPrefixes: test.Prefixes, + } + importer, err := onboard.CreateImporter(context.TODO(), logging.Default(), config) + if err != nil { + if !errors.Is(err, test.ExpectedErr) { + t.Fatalf("unexpected error: %v", err) + } else { + return + } + } + if test.ExpectedErr != nil { + t.Fatalf("error was expected but none was returned") + } + stats, err := importer.Import(context.Background(), dryRun) + if err != nil { t.Fatalf("unexpected error: %v", err) - } else { - continue } - } - if test.ExpectedErr { - t.Fatalf("error was expected but none was returned") - } - - if !reflect.DeepEqual(stats.AddedOrChanged, len(test.ExpectedAdded)) { - t.Fatalf("number of added objects in return value different than expected. expected=%v, got=%v", len(test.ExpectedAdded), stats.AddedOrChanged) - } - if !reflect.DeepEqual(stats.Deleted, len(test.ExpectedDeleted)) { - t.Fatalf("number of deleted objects in return value different than expected. expected=%v, got=%v", len(test.ExpectedDeleted), stats.Deleted) - } - var expectedAddedToCatalog, expectedDeletedFromCatalog []string - if !dryRun { - expectedAddedToCatalog = test.ExpectedAdded - expectedDeletedFromCatalog = test.ExpectedDeleted - } - if !reflect.DeepEqual(catalogActionsMock.objectActions.Added, expectedAddedToCatalog) { - t.Fatalf("objects added to catalog different than expected. expected=%v, got=%v.", expectedAddedToCatalog, catalogActionsMock.objectActions.Added) - } - if !reflect.DeepEqual(catalogActionsMock.objectActions.Deleted, expectedDeletedFromCatalog) { - t.Fatalf("objects deleted from catalog different than expected. expected=%v, got=%v.", expectedDeletedFromCatalog, catalogActionsMock.objectActions.Deleted) - } - if stats.DryRun != dryRun { - t.Fatalf("dryRun boolean on return value different than expected, expected=%t, got=%t", dryRun, stats.DryRun) - } - if dryRun { - if len(catalogActionsMock.lastCommitMetadata) > 0 { - t.Fatalf("found commit metadata in dry run: %v", catalogActionsMock.lastCommitMetadata) - } - continue - } - if catalogActionsMock.lastCommitMetadata["inventory_url"] != newInventoryURL { - t.Fatalf("unexpected inventory_url in commit metadata. expected=%s, got=%s", newInventoryURL, catalogActionsMock.lastCommitMetadata["inventory_url"]) - } + if !reflect.DeepEqual(stats.AddedOrChanged, len(test.ExpectedAdded)) { + t.Fatalf("number of added objects in return value different than expected. expected=%v, got=%v", len(test.ExpectedAdded), stats.AddedOrChanged) + } + if !reflect.DeepEqual(stats.Deleted, len(test.ExpectedDeleted)) { + t.Fatalf("number of deleted objects in return value different than expected. expected=%v, got=%v", len(test.ExpectedDeleted), stats.Deleted) + } + var expectedAddedToCatalog, expectedDeletedFromCatalog []string + if !dryRun { + expectedAddedToCatalog = test.ExpectedAdded + expectedDeletedFromCatalog = test.ExpectedDeleted + } + if !reflect.DeepEqual(catalogActionsMock.objectActions.Added, expectedAddedToCatalog) { + t.Fatalf("objects added to catalog different than expected. expected=%v, got=%v.", expectedAddedToCatalog, catalogActionsMock.objectActions.Added) + } + if !reflect.DeepEqual(catalogActionsMock.objectActions.Deleted, expectedDeletedFromCatalog) { + t.Fatalf("objects deleted from catalog different than expected. expected=%v, got=%v.", expectedDeletedFromCatalog, catalogActionsMock.objectActions.Deleted) + } + if stats.DryRun != dryRun { + t.Fatalf("dryRun boolean on return value different than expected, expected=%t, got=%t", dryRun, stats.DryRun) + } + if dryRun { + if len(catalogActionsMock.lastCommitMetadata) > 0 { + t.Fatalf("found commit metadata in dry run: %v", catalogActionsMock.lastCommitMetadata) + } + return + } + if catalogActionsMock.lastCommitMetadata["inventory_url"] != newInventoryURL { + t.Fatalf("unexpected inventory_url in commit metadata. expected=%s, got=%s", newInventoryURL, catalogActionsMock.lastCommitMetadata["inventory_url"]) + } - addedOrChangedCount, err := strconv.Atoi(catalogActionsMock.lastCommitMetadata["added_or_changed_objects"]) - if err != nil || addedOrChangedCount != len(expectedAddedToCatalog) { - t.Fatalf("unexpected added_or_changed_objects in commit metadata. expected=%d, got=%d", len(expectedDeletedFromCatalog), addedOrChangedCount) - } - deletedCount, err := strconv.Atoi(catalogActionsMock.lastCommitMetadata["deleted_objects"]) - if err != nil || deletedCount != len(expectedDeletedFromCatalog) { - t.Fatalf("unexpected deleted_objects in commit metadata. expected=%d, got=%d", len(expectedDeletedFromCatalog), deletedCount) - } + addedOrChangedCount, err := strconv.Atoi(catalogActionsMock.lastCommitMetadata["added_or_changed_objects"]) + if err != nil || addedOrChangedCount != len(expectedAddedToCatalog) { + t.Fatalf("unexpected added_or_changed_objects in commit metadata. expected=%d, got=%d", len(expectedDeletedFromCatalog), addedOrChangedCount) + } + deletedCount, err := strconv.Atoi(catalogActionsMock.lastCommitMetadata["deleted_objects"]) + if err != nil || deletedCount != len(expectedDeletedFromCatalog) { + t.Fatalf("unexpected deleted_objects in commit metadata. expected=%d, got=%d", len(expectedDeletedFromCatalog), deletedCount) + } + }) } + } } diff --git a/onboard/inventory.go b/onboard/inventory.go index 430ca1e1db9..c7fb4d9596d 100644 --- a/onboard/inventory.go +++ b/onboard/inventory.go @@ -1,6 +1,7 @@ package onboard import ( + "encoding/json" "fmt" "strconv" "time" @@ -116,13 +117,24 @@ func CompareKeys(row1 *block.InventoryObject, row2 *block.InventoryObject) bool return row1.Key < row2.Key } -func CreateCommitMetadata(inv block.Inventory, stats Stats) catalog.Metadata { - return catalog.Metadata{ +func CreateCommitMetadata(inv block.Inventory, stats Stats, prefixes []string) catalog.Metadata { + metadata := catalog.Metadata{ "inventory_url": inv.InventoryURL(), "source": inv.SourceName(), "added_or_changed_objects": strconv.Itoa(stats.AddedOrChanged), "deleted_objects": strconv.Itoa(stats.Deleted), } + if len(prefixes) > 0 { + prefixesSerialized, _ := json.Marshal(prefixes) + metadata["key_prefixes"] = string(prefixesSerialized) + } + return metadata +} + +func ExtractPrefixes(metadata catalog.Metadata) []string { + var prefixes []string + _ = json.Unmarshal([]byte(metadata["key_prefixes"]), &prefixes) + return prefixes } func ExtractInventoryURL(metadata catalog.Metadata) string { diff --git a/onboard/inventory_test.go b/onboard/inventory_test.go index 573a05d66d4..4dac198f6ee 100644 --- a/onboard/inventory_test.go +++ b/onboard/inventory_test.go @@ -15,6 +15,7 @@ func generateLastModified(keys []string, times map[string]time.Time) []time.Time } return res } + func TestDiff(t *testing.T) { data := []struct { LeftInv []string diff --git a/onboard/utils_test.go b/onboard/utils_test.go index 4c3a4c26420..719f177365c 100644 --- a/onboard/utils_test.go +++ b/onboard/utils_test.go @@ -2,8 +2,10 @@ package onboard_test import ( "context" + "encoding/json" "errors" "sort" + "strings" "time" "github.com/go-openapi/swag" @@ -26,6 +28,7 @@ type mockInventory struct { shouldSort bool lastModified []time.Time checksum func(string) string + prefixes []string } type objectActions struct { @@ -35,6 +38,7 @@ type objectActions struct { type mockCatalogActions struct { previousCommitInventory string + previousCommitPrefixes []string objectActions objectActions lastCommitMetadata catalog.Metadata } @@ -47,12 +51,12 @@ type mockInventoryGenerator struct { sourceBucket string } -func (m mockInventoryGenerator) GenerateInventory(_ context.Context, _ logging.Logger, inventoryURL string, shouldSort bool) (block.Inventory, error) { +func (m mockInventoryGenerator) GenerateInventory(_ context.Context, _ logging.Logger, inventoryURL string, shouldSort bool, prefixes []string) (block.Inventory, error) { if inventoryURL == m.newInventoryURL { - return &mockInventory{keys: m.newInventory, inventoryURL: inventoryURL, sourceBucket: m.sourceBucket, shouldSort: shouldSort}, nil + return &mockInventory{keys: m.newInventory, inventoryURL: inventoryURL, sourceBucket: m.sourceBucket, shouldSort: shouldSort, prefixes: prefixes}, nil } if inventoryURL == m.previousInventoryURL { - return &mockInventory{keys: m.previousInventory, inventoryURL: inventoryURL, sourceBucket: m.sourceBucket, shouldSort: shouldSort}, nil + return &mockInventory{keys: m.previousInventory, inventoryURL: inventoryURL, sourceBucket: m.sourceBucket, shouldSort: shouldSort, prefixes: prefixes}, nil } return nil, errors.New("failed to create inventory") } @@ -65,8 +69,20 @@ func (m *mockInventory) rows() []block.InventoryObject { if m.checksum == nil { m.checksum = func(s string) string { return s } } + sort.Strings(m.prefixes) + currentPrefix := 0 for i, key := range m.keys { - + if len(m.prefixes) > 0 { + for currentPrefix < len(m.prefixes) && m.prefixes[currentPrefix] < key && !strings.HasPrefix(key, m.prefixes[currentPrefix]) { + currentPrefix++ + } + if currentPrefix == len(m.prefixes) { + break + } + if !strings.HasPrefix(key, m.prefixes[currentPrefix]) { + continue + } + } res = append(res, block.InventoryObject{Key: key, LastModified: swag.Time(m.lastModified[i%len(m.lastModified)]), Checksum: m.checksum(key)}) } return res @@ -95,10 +111,15 @@ func (m *mockCatalogActions) ApplyImport(_ context.Context, it onboard.Iterator, } func (m *mockCatalogActions) GetPreviousCommit(_ context.Context) (commit *catalog.CommitLog, err error) { - if m.previousCommitInventory != "" { - return &catalog.CommitLog{Metadata: catalog.Metadata{"inventory_url": m.previousCommitInventory}}, nil + if m.previousCommitInventory == "" { + return nil, nil + } + metadata := catalog.Metadata{"inventory_url": m.previousCommitInventory} + if len(m.previousCommitPrefixes) > 0 { + prefixesSerialized, _ := json.Marshal(m.previousCommitPrefixes) + metadata["key_prefixes"] = string(prefixesSerialized) } - return nil, nil + return &catalog.CommitLog{Metadata: metadata}, nil } func (m *mockCatalogActions) Commit(_ context.Context, _ string, metadata catalog.Metadata) (*catalog.CommitLog, error) {