diff --git a/catalog/errors.go b/catalog/errors.go index a70b1da2bf0..1411649ff4d 100644 --- a/catalog/errors.go +++ b/catalog/errors.go @@ -33,4 +33,5 @@ var ( ErrUnsupportedDelimiter = errors.New("unsupported delimiter") ErrBadTypeConversion = errors.New("bad type") ErrExportFailed = errors.New("export failed") + ErrRollbackWithActiveBranch = fmt.Errorf("%w: rollback with active branch", ErrFeatureNotSupported) ) diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index f5ff35a8ba0..83f88990c58 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -266,6 +266,7 @@ func applyDiffChangesToRightBranch(tx db.Tx, mergeBatch mergeBatchRecords, previ } return nil } + func insertMergeCommit(tx db.Tx, relation RelationType, leftID int64, rightID int64, nextCommitID CommitID, previousMaxCommitID CommitID, committer string, msg string, metadata catalog.Metadata) error { var childNewLineage []int64 leftLastCommitID, err := getLastCommitIDByBranchID(tx, leftID) diff --git a/catalog/mvcc/cataloger_rollback_commit.go b/catalog/mvcc/cataloger_rollback_commit.go index adf5749f05e..be2bdc48b62 100644 --- a/catalog/mvcc/cataloger_rollback_commit.go +++ b/catalog/mvcc/cataloger_rollback_commit.go @@ -2,16 +2,68 @@ package mvcc import ( "context" + "fmt" "github.com/treeverse/lakefs/catalog" - - "github.com/treeverse/lakefs/logging" + "github.com/treeverse/lakefs/db" ) func (c *cataloger) RollbackCommit(ctx context.Context, repository, reference string) error { - c.log.WithContext(ctx).WithFields(logging.Fields{ - "repository": repository, - "reference": reference, - }).Debug("Rollback commit - feature not supported") - return catalog.ErrFeatureNotSupported + if err := Validate(ValidateFields{ + {Name: "repository", IsValid: ValidateRepositoryName(repository)}, + {Name: "reference", IsValid: ValidateReference(reference)}, + }); err != nil { + return err + } + + ref, err := ParseRef(reference) + if err != nil { + return err + } + if ref.CommitID <= UncommittedID { + return catalog.ErrInvalidReference + } + + _, err = c.db.Transact(func(tx db.Tx) (interface{}, error) { + // extract branch id from reference + branchID, err := getBranchID(tx, repository, ref.Branch, LockTypeUpdate) + if err != nil { + return nil, err + } + + // validate no child branch point to parent commit + var count int + err = tx.GetPrimitive(&count, `SELECT COUNT(*) from catalog_commits + WHERE merge_source_branch = $1 AND merge_source_commit > $2 AND merge_type = 'from_parent'`, + branchID, ref.CommitID) + if err != nil { + return nil, fmt.Errorf("check merge with branch: %w", err) + } + if count > 0 { + return nil, catalog.ErrRollbackWithActiveBranch + } + + // delete all commits after this commit on this branch + _, err = tx.Exec(`DELETE FROM catalog_commits WHERE branch_id = $1 AND commit_id > $2`, + branchID, ref.CommitID) + if err != nil { + return nil, fmt.Errorf("delete commits on branch %d, after commit %d: %w", branchID, ref.CommitID, err) + } + + // delete all entries created after this commit + _, err = tx.Exec(`DELETE FROM catalog_entries WHERE branch_id = $1 AND min_commit > $2`, + branchID, ref.CommitID) + if err != nil { + return nil, fmt.Errorf("delete entries %d, after min commit %d: %w", branchID, ref.CommitID, err) + } + + // update max_commit to infinite + _, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $1 WHERE branch_id = $2 AND max_commit >= $3 AND NOT max_commit = $1`, + MaxCommitID, branchID, ref.CommitID) + if err != nil { + return nil, fmt.Errorf("clear entries %d, max commit %d: %w", branchID, ref.CommitID, err) + } + return nil, nil + }, c.txOpts(ctx, db.ReadCommitted())...) + return err } diff --git a/catalog/mvcc/cataloger_rollback_commit_test.go b/catalog/mvcc/cataloger_rollback_commit_test.go new file mode 100644 index 00000000000..806b3f8e107 --- /dev/null +++ b/catalog/mvcc/cataloger_rollback_commit_test.go @@ -0,0 +1,133 @@ +package mvcc + +import ( + "context" + "testing" + + "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/testutil" +) + +func TestCataloger_RollbackCommit_Basic(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repo", "master") + + files := []string{"file1", "file2", "file3"} + var refs []string + // commit 3 files + for _, filename := range files { + testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "") + + commitLog, err := c.Commit(ctx, repository, "master", "first", "tester", nil) + testutil.MustDo(t, "first commit", err) + + refs = append(refs, commitLog.Reference) + } + if refs == nil || len(refs) != 3 { + t.Fatalf("expected 3 references for 3 commits, got %d", len(refs)) + } + + // rollback to each reference and check all files are there + for i := 0; i < len(refs); i++ { + filesCount := len(refs) - i + ref := refs[filesCount-1] + err := c.RollbackCommit(ctx, repository, ref) + testutil.MustDo(t, "rollback", err) + + entries, _, err := c.ListEntries(ctx, repository, "master", "", "", "", -1) + testutil.MustDo(t, "list entries", err) + if len(entries) != filesCount { + t.Fatalf("List entries length after revert %d, expected %d", len(entries), filesCount) + } + for i := 0; i < filesCount; i++ { + if entries[i].Path != files[i] { + t.Fatalf("List entries after revert, file at index %d: %s, expected %s", i, entries[i].Path, files[i]) + } + } + } + + // check there are no commits + commits, _, err := c.ListCommits(ctx, repository, "master", "", -1) + testutil.MustDo(t, "list commits", err) + const expectedCommitsLen = 3 // branch + repo + first commit + if len(commits) != expectedCommitsLen { + t.Fatalf("List commits len=%d, expected=%d", len(commits), expectedCommitsLen) + } +} + +func TestCataloger_RollbackCommit_BlockedByBranch(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repo", "master") + + // get first commit + masterReference, err := c.GetBranchReference(ctx, repository, "master") + testutil.MustDo(t, "getting master branch reference", err) + + // create a branch + _, err = c.CreateBranch(ctx, repository, "branch1", "master") + testutil.MustDo(t, "create branch1", err) + + // commit new data to master + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileX", nil, "") + _, err = c.Commit(ctx, repository, "master", "commit file x", "tester", nil) + testutil.MustDo(t, "commit file x", err) + + // merge changes into the branch1 + _, err = c.Merge(ctx, repository, "master", "branch1", "tester", "sync file x", nil) + testutil.MustDo(t, "merge master to branch1", err) + + // rollback to initial commit should fail + err = c.RollbackCommit(ctx, repository, masterReference) + if err == nil { + t.Fatal("Rollback with blocked branch should fail with error") + } +} + +func TestCataloger_RollbackCommit_AfterMerge(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repo", "master") + + // create and commit a file - fileFile + filenames := []string{"file1", "file2"} + for _, filename := range filenames { + testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "") + } + firstCommit, err := c.Commit(ctx, repository, "master", "first file", "tester", nil) + testutil.MustDo(t, "first commit", err) + + // create a branch and commit some changes - add, delete, modify and commit + _, err = c.CreateBranch(ctx, repository, "branch1", "master") + testutil.MustDo(t, "create branch1", err) + // update file1 on branch1 + testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file1", nil, "branch1") + // delete file2 on branch1 + err = c.DeleteEntry(ctx, repository, "branch1", "file2") + testutil.MustDo(t, "delete file2", err) + // add file2 on branch1 + testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file2", nil, "branch1") + // commit changes + _, err = c.Commit(ctx, repository, "branch1", "tester", "changes", nil) + testutil.MustDo(t, "commit changes to branch1", err) + + // merge changes from branch1 to master + _, err = c.Merge(ctx, repository, "branch1", "master", "tester", "sync branch1 to master", nil) + testutil.MustDo(t, "merge branch1 to master", err) + + // rollback to first commit + err = c.RollbackCommit(ctx, repository, firstCommit.Reference) + testutil.MustDo(t, "rollback to first commit", err) + + // check we have our original files + for _, filename := range filenames { + ent, err := c.GetEntry(ctx, repository, "master", filename, catalog.GetEntryParams{}) + testutil.MustDo(t, filename+" get should work", err) + + expectedChecksum := testCreateEntryCalcChecksum(filename, t.Name(), "") + if expectedChecksum != ent.Checksum { + t.Fatalf("Entry file1 after revert checksum %s, expected %s", ent.Checksum, expectedChecksum) + } + } +}