diff --git a/catalog/migrate/migrate.go b/catalog/migrate/migrate.go index d6761261b52..e93fb24ec8a 100644 --- a/catalog/migrate/migrate.go +++ b/catalog/migrate/migrate.go @@ -28,6 +28,7 @@ type Migrate struct { lastCommit graveler.CommitID branches map[graveler.BranchID]graveler.CommitID tags map[graveler.TagID]graveler.CommitID + reporter Reporter } type commitRecord struct { @@ -45,6 +46,12 @@ type commitRecord struct { MergeSourceCommit *int64 } +type Reporter interface { + BeginRepository(repository string) + BeginCommit(ref, message, committer, branch string) + EndRepository(err error) +} + const ( migrateFetchSize = 1000 @@ -61,6 +68,7 @@ func NewMigrate(db db.Database, entryCatalog *rocks.EntryCatalog, mvccCataloger entryCatalog: entryCatalog, mvccCataloger: mvccCataloger, log: logging.Default(), + reporter: &nullReporter{}, }, nil } @@ -94,11 +102,13 @@ func (m *Migrate) Run() error { "storage_namespace": repo.StorageNamespace, "default_branch": repo.DefaultBranch, }).Info("Start repository migrate") + m.reporter.BeginRepository(repo.Name) _, err := m.migrateRepository(ctx, repo) if err != nil { m.log.WithError(err).WithField("repository", repo.Name).Error("Migrate repository") merr = multierror.Append(merr, err) } + m.reporter.EndRepository(err) } if err = m.postMigrate(); err != nil { @@ -202,6 +212,7 @@ func (m *Migrate) getOrCreateTargetRepository(ctx context.Context, repository *c // migrateCommit migrate single commit from MVCC based repository to EntryCatalog format func (m *Migrate) migrateCommit(ctx context.Context, repo *graveler.RepositoryRecord, commit commitRecord) error { mvccRef := mvcc.MakeReference(commit.BranchName, mvcc.CommitID(commit.CommitID)) + m.reporter.BeginCommit(mvccRef, commit.Message, commit.Committer, commit.BranchName) // lookup tag, skip commit if we already tagged this commit tagID := graveler.TagID(mvccRef) @@ -295,7 +306,7 @@ func (m *Migrate) migrateCommits(ctx context.Context, repo *graveler.RepositoryR return fmt.Errorf("scan commit record: %w", err) } m.log.WithFields(logging.Fields{ - "repository": string(repo.RepositoryID), + "repository": repo.RepositoryID.String(), "commit_id": commit.CommitID, "branch_name": commit.BranchName, "committer": commit.Committer, @@ -350,8 +361,20 @@ func (m *Migrate) postMigrate() error { return err } +func (m *Migrate) SetReporter(r Reporter) { + m.reporter = r +} + func (c *commitRecord) Scan(rows pgx.Row) error { return rows.Scan(&c.BranchID, &c.BranchName, &c.CommitID, &c.PreviousCommitID, &c.Committer, &c.Message, &c.CreationDate, &c.Metadata, &c.MergeSourceBranch, &c.MergeSourceBranchName, &c.MergeSourceCommit, &c.MergeType) } + +type nullReporter struct{} + +func (n *nullReporter) BeginRepository(string) {} + +func (n *nullReporter) BeginCommit(string, string, string, string) {} + +func (n *nullReporter) EndRepository(error) {} diff --git a/catalog/migrate/migrate_test.go b/catalog/migrate/migrate_test.go index 6bae1fe99ab..e3750a004f4 100644 --- a/catalog/migrate/migrate_test.go +++ b/catalog/migrate/migrate_test.go @@ -4,8 +4,7 @@ import ( "context" "crypto" "fmt" - "hash/maphash" - "os" + "hash/fnv" "strconv" "testing" "time" @@ -40,12 +39,20 @@ func TestMigrate(t *testing.T) { ctx := context.Background() _, err := mvccCataloger.CreateRepository(ctx, "repo1", "mem://", "main") testutil.Must(t, err) + // create two commits with new files testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file1") testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file2") testCommit(t, ctx, mvccCataloger, "repo1", "main", "first on main") testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file0") testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file3") testCommit(t, ctx, mvccCataloger, "repo1", "main", "second on main") + // create a branch b1 with new files + testCreateBranch(t, ctx, mvccCataloger, "repo1", "b1", "main") + testCreateEntry(t, ctx, mvccCataloger, "repo1", "b1", "file11") + testCreateEntry(t, ctx, mvccCataloger, "repo1", "b1", "file22") + testCommit(t, ctx, mvccCataloger, "repo1", "b1", "first on b1") + // merge changes from b1 back to main + testMerge(t, ctx, mvccCataloger, "repo1", "b1", "main", "Merge b1 to main first") // migrate information migrateTool, err := NewMigrate(conn, entryCatalog, mvccCataloger) @@ -79,8 +86,10 @@ func TestMigrate(t *testing.T) { commitMessages = append(commitMessages, commit.Message) } testutil.Must(t, logIt.Err()) - expectedCommits := []string{ + "Merge b1 to main first", + "first on b1", + "Branch 'b1' created, source 'main'", "second on main", "first on main", "Repository created", @@ -90,36 +99,64 @@ func TestMigrate(t *testing.T) { if diff := deep.Equal(commitMessages, expectedCommits); diff != nil { t.Fatal("Log diff found:", diff) } -} -func newDefaultInstanceParams(name string) *params.InstanceParams { - const totalAllocatedBytes = 15 * 1024 * 1024 - const pebbleSSTableCacheSizeBytes = 8 * 1024 * 1024 - baseDir := os.TempDir() - return ¶ms.InstanceParams{ - SharedParams: params.SharedParams{ - Logger: logging.Default(), - Local: params.LocalDiskParams{ - TotalAllocatedBytes: totalAllocatedBytes, - BaseDir: baseDir, - }, - Adapter: mem.New(), - BlockStoragePrefix: "", - Eviction: nil, - PebbleSSTableCacheSizeBytes: pebbleSSTableCacheSizeBytes, - }, - FSName: name, - DiskAllocProportion: 1.0, + // verify branches + mainBranch, err := entryCatalog.GetBranch(ctx, "repo1", "main") + testutil.MustDo(t, "get main branch", err) + b1Branch, err := entryCatalog.GetBranch(ctx, "repo1", "main") + testutil.MustDo(t, "get b1 branch", err) + // verify each branch content + for i := 0; i < 4; i++ { + name := "file" + strconv.Itoa(i) + + // get entry and check address + ent, err := entryCatalog.GetEntry(ctx, "repo1", mainBranch.CommitID.Ref(), rocks.Path(name)) + testutil.MustDo(t, "get entry "+name, err) + h := calcPathHash("repo1", "main", name) + if ent.Address != h { + t.Errorf("GetEntry main branch, file %s address %s, expected %s", name, ent.Address, h) + } + + // same should be visible from 'b1' branch + ent, err = entryCatalog.GetEntry(ctx, "repo1", b1Branch.CommitID.Ref(), rocks.Path(name)) + testutil.MustDo(t, "get entry "+name, err) + if ent.Address != h { + t.Errorf("GetEntry b1 branch, file %s address %s, expected %s", name, ent.Address, h) + } + } + for _, i := range []int{11, 22} { + name := "file" + strconv.Itoa(i) + + // get entry and check address + ent, err := entryCatalog.GetEntry(ctx, "repo1", b1Branch.CommitID.Ref(), rocks.Path(name)) + testutil.MustDo(t, "get entry "+name, err) + h := calcPathHash("repo1", "b1", name) + if ent.Address != h { + t.Errorf("GetEntry b1 branch, file %s address %s, expected %s", name, ent.Address, h) + } } } -func newEntryCatalogInMem(conn db.Database) (*rocks.EntryCatalog, error) { - metaRangeFS, err := pyramid.NewFS(newDefaultInstanceParams("meta-range")) +func testMerge(t *testing.T, ctx context.Context, cataloger catalog.Cataloger, repo string, sourceBranch string, targetBranch string, msg string) { + t.Helper() + _, err := cataloger.Merge(ctx, repo, sourceBranch, targetBranch, "tester", msg, nil) + testutil.MustDo(t, "merge", err) +} + +func testCreateBranch(t testing.TB, ctx context.Context, cataloger catalog.Cataloger, repo string, branch string, parent string) { + t.Helper() + _, err := cataloger.CreateBranch(ctx, repo, branch, parent) + testutil.MustDo(t, "create branch", err) +} + +func NewEntryCatalogForTesting(t testing.TB, conn db.Database) (*rocks.EntryCatalog, error) { + t.Helper() + metaRangeFS, err := pyramid.NewFS(newDefaultInstanceParams(t, "meta-range")) if err != nil { return nil, fmt.Errorf("create tiered FS for committed meta-range: %w", err) } - rangeFS, err := pyramid.NewFS(newDefaultInstanceParams("rage")) + rangeFS, err := pyramid.NewFS(newDefaultInstanceParams(t, "rage")) if err != nil { return nil, fmt.Errorf("create tiered FS for committed range: %w", err) } @@ -157,25 +194,24 @@ func newEntryCatalogInMem(conn db.Database) (*rocks.EntryCatalog, error) { } func testSetupServices(t testing.TB) (db.Database, catalog.Cataloger, *rocks.EntryCatalog) { + t.Helper() conn, _ := testutil.GetDB(t, databaseURI) mvccCataloger := mvcc.NewCataloger(conn) - entryCataloger, err := newEntryCatalogInMem(conn) + entryCataloger, err := NewEntryCatalogForTesting(t, conn) testutil.MustDo(t, "new entry catalog", err) return conn, mvccCataloger, entryCataloger } func testCommit(t *testing.T, ctx context.Context, cataloger catalog.Cataloger, repo string, branch string, msg string) catalog.CommitLog { + t.Helper() commit, err := cataloger.Commit(ctx, repo, branch, msg, "tester", nil) testutil.MustDo(t, "commit", err) return *commit } func testCreateEntry(t testing.TB, ctx context.Context, cataloger catalog.Cataloger, repo, branch, path string) { - var h maphash.Hash - _, _ = h.Write([]byte(repo)) - _, _ = h.Write([]byte(branch)) - _, _ = h.Write([]byte(path)) - sum := h.Sum64() + t.Helper() + sum := calcPathSum(repo, branch, path) pathHash := strconv.FormatUint(sum, 16) err := cataloger.CreateEntry(ctx, repo, branch, catalog.Entry{ @@ -187,3 +223,38 @@ func testCreateEntry(t testing.TB, ctx context.Context, cataloger catalog.Catalo }, catalog.CreateEntryParams{}) testutil.MustDo(t, "create entry", err) } + +func calcPathSum(repo string, branch string, path string) uint64 { + h := fnv.New64() + _, _ = h.Write([]byte(repo)) + _, _ = h.Write([]byte(branch)) + _, _ = h.Write([]byte(path)) + sum := h.Sum64() + return sum +} +func calcPathHash(repo string, branch string, path string) string { + sum := calcPathSum(repo, branch, path) + return strconv.FormatUint(sum, 16) +} + +func newDefaultInstanceParams(t testing.TB, name string) *params.InstanceParams { + t.Helper() + const totalAllocatedBytes = 15 * 1024 * 1024 + const pebbleSSTableCacheSizeBytes = 8 * 1024 * 1024 + baseDir := t.TempDir() + return ¶ms.InstanceParams{ + SharedParams: params.SharedParams{ + Logger: logging.Default(), + Local: params.LocalDiskParams{ + TotalAllocatedBytes: totalAllocatedBytes, + BaseDir: baseDir, + }, + Adapter: mem.New(), + BlockStoragePrefix: "", + Eviction: nil, + PebbleSSTableCacheSizeBytes: pebbleSSTableCacheSizeBytes, + }, + FSName: name, + DiskAllocProportion: 1.0, + } +} diff --git a/cmd/lakefs/cmd/migrate_db.go b/cmd/lakefs/cmd/migrate_db.go index 058bea66e32..d43ceeabeae 100644 --- a/cmd/lakefs/cmd/migrate_db.go +++ b/cmd/lakefs/cmd/migrate_db.go @@ -47,7 +47,7 @@ var migrateDBCmd = &cobra.Command{ fmt.Println("Failed to create a new migrate:", err) os.Exit(1) } - + migrateTool.SetReporter(&migrateReporter{}) repoExpr, _ := cmd.Flags().GetString("repository") if err := migrateTool.FilterRepository(repoExpr); err != nil { fmt.Println("Failed to setup repository filter:", err) @@ -56,7 +56,6 @@ var migrateDBCmd = &cobra.Command{ err = migrateTool.Run() if err != nil { - fmt.Println("Migration failed") fmt.Println(err) os.Exit(1) } @@ -64,6 +63,22 @@ var migrateDBCmd = &cobra.Command{ }, } +type migrateReporter struct{} + +func (m *migrateReporter) BeginRepository(repository string) { + fmt.Println("Migrate repository:", repository) +} + +func (m *migrateReporter) BeginCommit(ref, message, committer, branch string) { + fmt.Printf(" Commit: %s (%s) [%s] <%s>\n", message, ref, committer, branch) +} + +func (m *migrateReporter) EndRepository(err error) { + if err != nil { + fmt.Println("Failed to migrate repository:", err) + } +} + //nolint:gochecknoinits func init() { migrateCmd.AddCommand(migrateDBCmd)