Skip to content

Commit

Permalink
Migrate progress and more test code (#1257)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored Jan 21, 2021
1 parent 4f16652 commit 8cc9966
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 34 deletions.
25 changes: 24 additions & 1 deletion catalog/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Migrate struct {
lastCommit graveler.CommitID
branches map[graveler.BranchID]graveler.CommitID
tags map[graveler.TagID]graveler.CommitID
reporter Reporter
}

type commitRecord struct {
Expand All @@ -45,6 +46,12 @@ type commitRecord struct {
MergeSourceCommit *int64
}

type Reporter interface {
BeginRepository(repository string)
BeginCommit(ref, message, committer, branch string)
EndRepository(err error)
}

const (
migrateFetchSize = 1000

Expand All @@ -61,6 +68,7 @@ func NewMigrate(db db.Database, entryCatalog *rocks.EntryCatalog, mvccCataloger
entryCatalog: entryCatalog,
mvccCataloger: mvccCataloger,
log: logging.Default(),
reporter: &nullReporter{},
}, nil
}

Expand Down Expand Up @@ -94,11 +102,13 @@ func (m *Migrate) Run() error {
"storage_namespace": repo.StorageNamespace,
"default_branch": repo.DefaultBranch,
}).Info("Start repository migrate")
m.reporter.BeginRepository(repo.Name)
_, err := m.migrateRepository(ctx, repo)
if err != nil {
m.log.WithError(err).WithField("repository", repo.Name).Error("Migrate repository")
merr = multierror.Append(merr, err)
}
m.reporter.EndRepository(err)
}

if err = m.postMigrate(); err != nil {
Expand Down Expand Up @@ -202,6 +212,7 @@ func (m *Migrate) getOrCreateTargetRepository(ctx context.Context, repository *c
// migrateCommit migrate single commit from MVCC based repository to EntryCatalog format
func (m *Migrate) migrateCommit(ctx context.Context, repo *graveler.RepositoryRecord, commit commitRecord) error {
mvccRef := mvcc.MakeReference(commit.BranchName, mvcc.CommitID(commit.CommitID))
m.reporter.BeginCommit(mvccRef, commit.Message, commit.Committer, commit.BranchName)

// lookup tag, skip commit if we already tagged this commit
tagID := graveler.TagID(mvccRef)
Expand Down Expand Up @@ -295,7 +306,7 @@ func (m *Migrate) migrateCommits(ctx context.Context, repo *graveler.RepositoryR
return fmt.Errorf("scan commit record: %w", err)
}
m.log.WithFields(logging.Fields{
"repository": string(repo.RepositoryID),
"repository": repo.RepositoryID.String(),
"commit_id": commit.CommitID,
"branch_name": commit.BranchName,
"committer": commit.Committer,
Expand Down Expand Up @@ -350,8 +361,20 @@ func (m *Migrate) postMigrate() error {
return err
}

func (m *Migrate) SetReporter(r Reporter) {
m.reporter = r
}

func (c *commitRecord) Scan(rows pgx.Row) error {
return rows.Scan(&c.BranchID, &c.BranchName, &c.CommitID,
&c.PreviousCommitID, &c.Committer, &c.Message, &c.CreationDate, &c.Metadata,
&c.MergeSourceBranch, &c.MergeSourceBranchName, &c.MergeSourceCommit, &c.MergeType)
}

type nullReporter struct{}

func (n *nullReporter) BeginRepository(string) {}

func (n *nullReporter) BeginCommit(string, string, string, string) {}

func (n *nullReporter) EndRepository(error) {}
133 changes: 102 additions & 31 deletions catalog/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"context"
"crypto"
"fmt"
"hash/maphash"
"os"
"hash/fnv"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -40,12 +39,20 @@ func TestMigrate(t *testing.T) {
ctx := context.Background()
_, err := mvccCataloger.CreateRepository(ctx, "repo1", "mem://", "main")
testutil.Must(t, err)
// create two commits with new files
testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file1")
testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file2")
testCommit(t, ctx, mvccCataloger, "repo1", "main", "first on main")
testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file0")
testCreateEntry(t, ctx, mvccCataloger, "repo1", "main", "file3")
testCommit(t, ctx, mvccCataloger, "repo1", "main", "second on main")
// create a branch b1 with new files
testCreateBranch(t, ctx, mvccCataloger, "repo1", "b1", "main")
testCreateEntry(t, ctx, mvccCataloger, "repo1", "b1", "file11")
testCreateEntry(t, ctx, mvccCataloger, "repo1", "b1", "file22")
testCommit(t, ctx, mvccCataloger, "repo1", "b1", "first on b1")
// merge changes from b1 back to main
testMerge(t, ctx, mvccCataloger, "repo1", "b1", "main", "Merge b1 to main first")

// migrate information
migrateTool, err := NewMigrate(conn, entryCatalog, mvccCataloger)
Expand Down Expand Up @@ -79,8 +86,10 @@ func TestMigrate(t *testing.T) {
commitMessages = append(commitMessages, commit.Message)
}
testutil.Must(t, logIt.Err())

expectedCommits := []string{
"Merge b1 to main first",
"first on b1",
"Branch 'b1' created, source 'main'",
"second on main",
"first on main",
"Repository created",
Expand All @@ -90,36 +99,64 @@ func TestMigrate(t *testing.T) {
if diff := deep.Equal(commitMessages, expectedCommits); diff != nil {
t.Fatal("Log diff found:", diff)
}
}

func newDefaultInstanceParams(name string) *params.InstanceParams {
const totalAllocatedBytes = 15 * 1024 * 1024
const pebbleSSTableCacheSizeBytes = 8 * 1024 * 1024
baseDir := os.TempDir()
return &params.InstanceParams{
SharedParams: params.SharedParams{
Logger: logging.Default(),
Local: params.LocalDiskParams{
TotalAllocatedBytes: totalAllocatedBytes,
BaseDir: baseDir,
},
Adapter: mem.New(),
BlockStoragePrefix: "",
Eviction: nil,
PebbleSSTableCacheSizeBytes: pebbleSSTableCacheSizeBytes,
},
FSName: name,
DiskAllocProportion: 1.0,
// verify branches
mainBranch, err := entryCatalog.GetBranch(ctx, "repo1", "main")
testutil.MustDo(t, "get main branch", err)
b1Branch, err := entryCatalog.GetBranch(ctx, "repo1", "main")
testutil.MustDo(t, "get b1 branch", err)
// verify each branch content
for i := 0; i < 4; i++ {
name := "file" + strconv.Itoa(i)

// get entry and check address
ent, err := entryCatalog.GetEntry(ctx, "repo1", mainBranch.CommitID.Ref(), rocks.Path(name))
testutil.MustDo(t, "get entry "+name, err)
h := calcPathHash("repo1", "main", name)
if ent.Address != h {
t.Errorf("GetEntry main branch, file %s address %s, expected %s", name, ent.Address, h)
}

// same should be visible from 'b1' branch
ent, err = entryCatalog.GetEntry(ctx, "repo1", b1Branch.CommitID.Ref(), rocks.Path(name))
testutil.MustDo(t, "get entry "+name, err)
if ent.Address != h {
t.Errorf("GetEntry b1 branch, file %s address %s, expected %s", name, ent.Address, h)
}
}
for _, i := range []int{11, 22} {
name := "file" + strconv.Itoa(i)

// get entry and check address
ent, err := entryCatalog.GetEntry(ctx, "repo1", b1Branch.CommitID.Ref(), rocks.Path(name))
testutil.MustDo(t, "get entry "+name, err)
h := calcPathHash("repo1", "b1", name)
if ent.Address != h {
t.Errorf("GetEntry b1 branch, file %s address %s, expected %s", name, ent.Address, h)
}
}
}

func newEntryCatalogInMem(conn db.Database) (*rocks.EntryCatalog, error) {
metaRangeFS, err := pyramid.NewFS(newDefaultInstanceParams("meta-range"))
func testMerge(t *testing.T, ctx context.Context, cataloger catalog.Cataloger, repo string, sourceBranch string, targetBranch string, msg string) {
t.Helper()
_, err := cataloger.Merge(ctx, repo, sourceBranch, targetBranch, "tester", msg, nil)
testutil.MustDo(t, "merge", err)
}

func testCreateBranch(t testing.TB, ctx context.Context, cataloger catalog.Cataloger, repo string, branch string, parent string) {
t.Helper()
_, err := cataloger.CreateBranch(ctx, repo, branch, parent)
testutil.MustDo(t, "create branch", err)
}

func NewEntryCatalogForTesting(t testing.TB, conn db.Database) (*rocks.EntryCatalog, error) {
t.Helper()
metaRangeFS, err := pyramid.NewFS(newDefaultInstanceParams(t, "meta-range"))
if err != nil {
return nil, fmt.Errorf("create tiered FS for committed meta-range: %w", err)
}

rangeFS, err := pyramid.NewFS(newDefaultInstanceParams("rage"))
rangeFS, err := pyramid.NewFS(newDefaultInstanceParams(t, "rage"))
if err != nil {
return nil, fmt.Errorf("create tiered FS for committed range: %w", err)
}
Expand Down Expand Up @@ -157,25 +194,24 @@ func newEntryCatalogInMem(conn db.Database) (*rocks.EntryCatalog, error) {
}

func testSetupServices(t testing.TB) (db.Database, catalog.Cataloger, *rocks.EntryCatalog) {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI)
mvccCataloger := mvcc.NewCataloger(conn)
entryCataloger, err := newEntryCatalogInMem(conn)
entryCataloger, err := NewEntryCatalogForTesting(t, conn)
testutil.MustDo(t, "new entry catalog", err)
return conn, mvccCataloger, entryCataloger
}

func testCommit(t *testing.T, ctx context.Context, cataloger catalog.Cataloger, repo string, branch string, msg string) catalog.CommitLog {
t.Helper()
commit, err := cataloger.Commit(ctx, repo, branch, msg, "tester", nil)
testutil.MustDo(t, "commit", err)
return *commit
}

func testCreateEntry(t testing.TB, ctx context.Context, cataloger catalog.Cataloger, repo, branch, path string) {
var h maphash.Hash
_, _ = h.Write([]byte(repo))
_, _ = h.Write([]byte(branch))
_, _ = h.Write([]byte(path))
sum := h.Sum64()
t.Helper()
sum := calcPathSum(repo, branch, path)
pathHash := strconv.FormatUint(sum, 16)

err := cataloger.CreateEntry(ctx, repo, branch, catalog.Entry{
Expand All @@ -187,3 +223,38 @@ func testCreateEntry(t testing.TB, ctx context.Context, cataloger catalog.Catalo
}, catalog.CreateEntryParams{})
testutil.MustDo(t, "create entry", err)
}

func calcPathSum(repo string, branch string, path string) uint64 {
h := fnv.New64()
_, _ = h.Write([]byte(repo))
_, _ = h.Write([]byte(branch))
_, _ = h.Write([]byte(path))
sum := h.Sum64()
return sum
}
func calcPathHash(repo string, branch string, path string) string {
sum := calcPathSum(repo, branch, path)
return strconv.FormatUint(sum, 16)
}

func newDefaultInstanceParams(t testing.TB, name string) *params.InstanceParams {
t.Helper()
const totalAllocatedBytes = 15 * 1024 * 1024
const pebbleSSTableCacheSizeBytes = 8 * 1024 * 1024
baseDir := t.TempDir()
return &params.InstanceParams{
SharedParams: params.SharedParams{
Logger: logging.Default(),
Local: params.LocalDiskParams{
TotalAllocatedBytes: totalAllocatedBytes,
BaseDir: baseDir,
},
Adapter: mem.New(),
BlockStoragePrefix: "",
Eviction: nil,
PebbleSSTableCacheSizeBytes: pebbleSSTableCacheSizeBytes,
},
FSName: name,
DiskAllocProportion: 1.0,
}
}
19 changes: 17 additions & 2 deletions cmd/lakefs/cmd/migrate_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var migrateDBCmd = &cobra.Command{
fmt.Println("Failed to create a new migrate:", err)
os.Exit(1)
}

migrateTool.SetReporter(&migrateReporter{})
repoExpr, _ := cmd.Flags().GetString("repository")
if err := migrateTool.FilterRepository(repoExpr); err != nil {
fmt.Println("Failed to setup repository filter:", err)
Expand All @@ -56,14 +56,29 @@ var migrateDBCmd = &cobra.Command{

err = migrateTool.Run()
if err != nil {
fmt.Println("Migration failed")
fmt.Println(err)
os.Exit(1)
}
fmt.Println("Migrate completed")
},
}

type migrateReporter struct{}

func (m *migrateReporter) BeginRepository(repository string) {
fmt.Println("Migrate repository:", repository)
}

func (m *migrateReporter) BeginCommit(ref, message, committer, branch string) {
fmt.Printf(" Commit: %s (%s) [%s] <%s>\n", message, ref, committer, branch)
}

func (m *migrateReporter) EndRepository(err error) {
if err != nil {
fmt.Println("Failed to migrate repository:", err)
}
}

//nolint:gochecknoinits
func init() {
migrateCmd.AddCommand(migrateDBCmd)
Expand Down

0 comments on commit 8cc9966

Please sign in to comment.