Skip to content

Commit

Permalink
rollback committed changes blocked by child branches (#980)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored Dec 1, 2020
1 parent c152e2c commit 7b08240
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 7 deletions.
1 change: 1 addition & 0 deletions catalog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ var (
ErrUnsupportedDelimiter = errors.New("unsupported delimiter")
ErrBadTypeConversion = errors.New("bad type")
ErrExportFailed = errors.New("export failed")
ErrRollbackWithActiveBranch = fmt.Errorf("%w: rollback with active branch", ErrFeatureNotSupported)
)
1 change: 1 addition & 0 deletions catalog/mvcc/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func applyDiffChangesToRightBranch(tx db.Tx, mergeBatch mergeBatchRecords, previ
}
return nil
}

func insertMergeCommit(tx db.Tx, relation RelationType, leftID int64, rightID int64, nextCommitID CommitID, previousMaxCommitID CommitID, committer string, msg string, metadata catalog.Metadata) error {
var childNewLineage []int64
leftLastCommitID, err := getLastCommitIDByBranchID(tx, leftID)
Expand Down
66 changes: 59 additions & 7 deletions catalog/mvcc/cataloger_rollback_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,68 @@ package mvcc

import (
"context"
"fmt"

"github.com/treeverse/lakefs/catalog"

"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/db"
)

func (c *cataloger) RollbackCommit(ctx context.Context, repository, reference string) error {
c.log.WithContext(ctx).WithFields(logging.Fields{
"repository": repository,
"reference": reference,
}).Debug("Rollback commit - feature not supported")
return catalog.ErrFeatureNotSupported
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "reference", IsValid: ValidateReference(reference)},
}); err != nil {
return err
}

ref, err := ParseRef(reference)
if err != nil {
return err
}
if ref.CommitID <= UncommittedID {
return catalog.ErrInvalidReference
}

_, err = c.db.Transact(func(tx db.Tx) (interface{}, error) {
// extract branch id from reference
branchID, err := getBranchID(tx, repository, ref.Branch, LockTypeUpdate)
if err != nil {
return nil, err
}

// validate no child branch point to parent commit
var count int
err = tx.GetPrimitive(&count, `SELECT COUNT(*) from catalog_commits
WHERE merge_source_branch = $1 AND merge_source_commit > $2 AND merge_type = 'from_parent'`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("check merge with branch: %w", err)
}
if count > 0 {
return nil, catalog.ErrRollbackWithActiveBranch
}

// delete all commits after this commit on this branch
_, err = tx.Exec(`DELETE FROM catalog_commits WHERE branch_id = $1 AND commit_id > $2`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("delete commits on branch %d, after commit %d: %w", branchID, ref.CommitID, err)
}

// delete all entries created after this commit
_, err = tx.Exec(`DELETE FROM catalog_entries WHERE branch_id = $1 AND min_commit > $2`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("delete entries %d, after min commit %d: %w", branchID, ref.CommitID, err)
}

// update max_commit to infinite
_, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $1 WHERE branch_id = $2 AND max_commit >= $3 AND NOT max_commit = $1`,
MaxCommitID, branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("clear entries %d, max commit %d: %w", branchID, ref.CommitID, err)
}
return nil, nil
}, c.txOpts(ctx, db.ReadCommitted())...)
return err
}
133 changes: 133 additions & 0 deletions catalog/mvcc/cataloger_rollback_commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package mvcc

import (
"context"
"testing"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/testutil"
)

func TestCataloger_RollbackCommit_Basic(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

files := []string{"file1", "file2", "file3"}
var refs []string
// commit 3 files
for _, filename := range files {
testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "")

commitLog, err := c.Commit(ctx, repository, "master", "first", "tester", nil)
testutil.MustDo(t, "first commit", err)

refs = append(refs, commitLog.Reference)
}
if refs == nil || len(refs) != 3 {
t.Fatalf("expected 3 references for 3 commits, got %d", len(refs))
}

// rollback to each reference and check all files are there
for i := 0; i < len(refs); i++ {
filesCount := len(refs) - i
ref := refs[filesCount-1]
err := c.RollbackCommit(ctx, repository, ref)
testutil.MustDo(t, "rollback", err)

entries, _, err := c.ListEntries(ctx, repository, "master", "", "", "", -1)
testutil.MustDo(t, "list entries", err)
if len(entries) != filesCount {
t.Fatalf("List entries length after revert %d, expected %d", len(entries), filesCount)
}
for i := 0; i < filesCount; i++ {
if entries[i].Path != files[i] {
t.Fatalf("List entries after revert, file at index %d: %s, expected %s", i, entries[i].Path, files[i])
}
}
}

// check there are no commits
commits, _, err := c.ListCommits(ctx, repository, "master", "", -1)
testutil.MustDo(t, "list commits", err)
const expectedCommitsLen = 3 // branch + repo + first commit
if len(commits) != expectedCommitsLen {
t.Fatalf("List commits len=%d, expected=%d", len(commits), expectedCommitsLen)
}
}

func TestCataloger_RollbackCommit_BlockedByBranch(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// get first commit
masterReference, err := c.GetBranchReference(ctx, repository, "master")
testutil.MustDo(t, "getting master branch reference", err)

// create a branch
_, err = c.CreateBranch(ctx, repository, "branch1", "master")
testutil.MustDo(t, "create branch1", err)

// commit new data to master
testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileX", nil, "")
_, err = c.Commit(ctx, repository, "master", "commit file x", "tester", nil)
testutil.MustDo(t, "commit file x", err)

// merge changes into the branch1
_, err = c.Merge(ctx, repository, "master", "branch1", "tester", "sync file x", nil)
testutil.MustDo(t, "merge master to branch1", err)

// rollback to initial commit should fail
err = c.RollbackCommit(ctx, repository, masterReference)
if err == nil {
t.Fatal("Rollback with blocked branch should fail with error")
}
}

func TestCataloger_RollbackCommit_AfterMerge(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// create and commit a file - fileFile
filenames := []string{"file1", "file2"}
for _, filename := range filenames {
testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "")
}
firstCommit, err := c.Commit(ctx, repository, "master", "first file", "tester", nil)
testutil.MustDo(t, "first commit", err)

// create a branch and commit some changes - add, delete, modify and commit
_, err = c.CreateBranch(ctx, repository, "branch1", "master")
testutil.MustDo(t, "create branch1", err)
// update file1 on branch1
testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file1", nil, "branch1")
// delete file2 on branch1
err = c.DeleteEntry(ctx, repository, "branch1", "file2")
testutil.MustDo(t, "delete file2", err)
// add file2 on branch1
testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file2", nil, "branch1")
// commit changes
_, err = c.Commit(ctx, repository, "branch1", "tester", "changes", nil)
testutil.MustDo(t, "commit changes to branch1", err)

// merge changes from branch1 to master
_, err = c.Merge(ctx, repository, "branch1", "master", "tester", "sync branch1 to master", nil)
testutil.MustDo(t, "merge branch1 to master", err)

// rollback to first commit
err = c.RollbackCommit(ctx, repository, firstCommit.Reference)
testutil.MustDo(t, "rollback to first commit", err)

// check we have our original files
for _, filename := range filenames {
ent, err := c.GetEntry(ctx, repository, "master", filename, catalog.GetEntryParams{})
testutil.MustDo(t, filename+" get should work", err)

expectedChecksum := testCreateEntryCalcChecksum(filename, t.Name(), "")
if expectedChecksum != ent.Checksum {
t.Fatalf("Entry file1 after revert checksum %s, expected %s", ent.Checksum, expectedChecksum)
}
}
}

0 comments on commit 7b08240

Please sign in to comment.