diff --git a/ddl/000017_graveler_refs.down.sql b/ddl/000017_graveler_refs.down.sql new file mode 100644 index 00000000000..de0fd42d1c3 --- /dev/null +++ b/ddl/000017_graveler_refs.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS graveler_repositories; +DROP TABLE IF EXISTS graveler_commits; +DROP TABLE IF EXISTS graveler_branches; diff --git a/ddl/000017_graveler_refs.up.sql b/ddl/000017_graveler_refs.up.sql new file mode 100644 index 00000000000..1e2788e8552 --- /dev/null +++ b/ddl/000017_graveler_refs.up.sql @@ -0,0 +1,38 @@ +CREATE TABLE IF NOT EXISTS graveler_repositories +( + id text NOT NULL, + + storage_namespace text NOT NULL, + creation_date timestamptz NOT NULL, + default_branch text NOT NULL, + + PRIMARY KEY (id) +); + + +CREATE TABLE IF NOT EXISTS graveler_branches +( + repository_id text NOT NULL, + id text NOT NULL, + + staging_token text, + commit_id text, + + PRIMARY KEY (repository_id, id) +); + + +CREATE TABLE IF NOT EXISTS graveler_commits +( + repository_id text NOT NULL, + id text NOT NULL, + + committer text NOT NULL, + message text NOT NULL, + creation_date timestamptz NOT NULL, + tree_id text NOT NULL, + metadata jsonb, + parents text[], + + PRIMARY KEY (repository_id, id) +); diff --git a/graveler/graveler.go b/graveler/graveler.go index 100d73d365a..17eeea03e95 100644 --- a/graveler/graveler.go +++ b/graveler/graveler.go @@ -8,6 +8,11 @@ import ( "net/url" "regexp" "strings" + + "github.com/treeverse/lakefs/ident" + + "github.com/treeverse/lakefs/catalog" + "time" ) @@ -55,12 +60,15 @@ type Ref string // TagID represents a named tag pointing at a commit type TagID string -// CommitID is a content addressable hash representing a Commit object -type CommitID string +// CommitParents +type CommitParents []CommitID // BranchID is an identifier for a branch type BranchID string +// CommitID is a content addressable hash representing a Commit object +type CommitID string + // TreeID represents a snapshot of the tree, referenced by a commit type TreeID string @@ -75,13 +83,13 @@ type Metadata map[string]string // Repository represents repository metadata type Repository struct { - StorageNamespace StorageNamespace - CreationDate time.Time - DefaultBranchID BranchID + StorageNamespace StorageNamespace `db:"storage_namespace"` + CreationDate time.Time `db:"creation_date"` + DefaultBranchID BranchID `db:"default_branch"` } type RepositoryRecord struct { - RepositoryID RepositoryID + RepositoryID RepositoryID `db:"id"` *Repository } @@ -97,19 +105,41 @@ type ValueRecord struct { *Value } +func (ps CommitParents) Identity() []byte { + strings := make([]string, len(ps)) + for i, v := range ps { + strings[i] = string(v) + } + buf := ident.NewAddressWriter() + buf.MarshalStringSlice(strings) + return buf.Identity() +} + // Commit represents commit metadata (author, time, tree ID) type Commit struct { - Committer string - Message string - TreeID TreeID - CreationDate time.Time - Parents []CommitID - Metadata map[string]string + Committer string `db:"committer"` + Message string `db:"message"` + TreeID TreeID `db:"tree_id"` + CreationDate time.Time `db:"creation_date"` + Parents CommitParents `db:"parents"` + Metadata catalog.Metadata `db:"metadata"` +} + +func (c Commit) Identity() []byte { + b := ident.NewAddressWriter() + b.MarshalString("commit:v1") + b.MarshalString(c.Committer) + b.MarshalString(c.Message) + b.MarshalString(string(c.TreeID)) + b.MarshalInt64(c.CreationDate.Unix()) + b.MarshalStringMap(c.Metadata) + b.MarshalIdentifiable(c.Parents) + return b.Identity() } // CommitRecords holds CommitID with the associated Commit data type CommitRecord struct { - CommitID CommitID + CommitID CommitID `db:"id"` *Commit } @@ -184,7 +214,7 @@ type VersionController interface { Log(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (CommitIterator, error) // ListBranches lists branches on repositories - ListBranches(ctx context.Context, repositoryID RepositoryID) (BranchIterator, error) + ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error) // DeleteBranch deletes branch from repository DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error @@ -209,7 +239,7 @@ type VersionController interface { Merge(ctx context.Context, repositoryID RepositoryID, from Ref, to BranchID) (CommitID, error) // DiffUncommitted returns iterator to scan the changes made on the branch - DiffUncommitted(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (DiffIterator, error) + DiffUncommitted(ctx context.Context, repositoryID RepositoryID, branchID BranchID, from Key) (DiffIterator, error) // Diff returns the changes between 'left' and 'right' ref, starting from the 'from' key Diff(ctx context.Context, repositoryID RepositoryID, left, right Ref, from Key) (DiffIterator, error) @@ -295,7 +325,7 @@ type RefManager interface { CreateRepository(ctx context.Context, repositoryID RepositoryID, repository Repository, branch Branch) error // ListRepositories lists repositories - ListRepositories(ctx context.Context) (RepositoryIterator, error) + ListRepositories(ctx context.Context, from RepositoryID) (RepositoryIterator, error) // DeleteRepository deletes the repository DeleteRepository(ctx context.Context, repositoryID RepositoryID) error @@ -313,9 +343,9 @@ type RefManager interface { DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error // ListBranches lists branches - ListBranches(ctx context.Context, repositoryID RepositoryID) (BranchIterator, error) + ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error) - // GetCommit returns the Commit metadata object for the given CommitID + // GetCommit returns the Commit metadata object for the given CommitID. GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error) // AddCommit stores the Commit object, returning its ID @@ -337,11 +367,11 @@ type CommittedManager interface { Get(ctx context.Context, ns StorageNamespace, treeID TreeID, key Key) (*Value, error) // List takes a given tree and returns an ValueIterator - List(ctx context.Context, ns StorageNamespace, treeID TreeID) (ValueIterator, error) + List(ctx context.Context, ns StorageNamespace, treeID TreeID, from Key) (ValueIterator, error) // Diff receives two trees and a 3rd merge base tree used to resolve the change type // it tracks changes from left to right, returning an iterator of Diff entries - Diff(ctx context.Context, ns StorageNamespace, left, right, base TreeID) (DiffIterator, error) + Diff(ctx context.Context, ns StorageNamespace, left, right, base TreeID, from Key) (DiffIterator, error) // Merge receives two trees and a 3rd merge base tree used to resolve the change type // it applies that changes from left to right, resulting in a new tree that @@ -381,13 +411,16 @@ var ( // Graveler errors var ( ErrNotFound = errors.New("not found") + ErrNotUnique = errors.New("not unique") ErrInvalidValue = errors.New("invalid value") - ErrInvalidStorageNamespace = fmt.Errorf("storage namespace %w", ErrInvalidValue) - ErrInvalidRepositoryID = fmt.Errorf("repository id %w", ErrInvalidValue) - ErrInvalidBranchID = fmt.Errorf("branch id %w", ErrInvalidValue) - ErrInvalidRef = fmt.Errorf("ref %w", ErrInvalidValue) - ErrInvalidCommitID = fmt.Errorf("commit id %w", ErrInvalidValue) - ErrCommitNotFound = fmt.Errorf("commit %w", ErrNotFound) + ErrInvalidMergeBase = fmt.Errorf("only 2 commits allowed in FindMergeBase: %w", ErrInvalidValue) + ErrInvalidStorageNamespace = fmt.Errorf("storage namespace: %w", ErrInvalidValue) + ErrInvalidRepositoryID = fmt.Errorf("repository id: %w", ErrInvalidValue) + ErrInvalidBranchID = fmt.Errorf("branch id: %w", ErrInvalidValue) + ErrInvalidRef = fmt.Errorf("ref: %w", ErrInvalidValue) + ErrInvalidCommitID = fmt.Errorf("commit id: %w", ErrInvalidValue) + ErrCommitNotFound = fmt.Errorf("commit: %w", ErrNotFound) + ErrCommitIDAmbiguous = fmt.Errorf("commit ID is ambiguous: %w", ErrNotFound) ) func NewRepositoryID(id string) (RepositoryID, error) { diff --git a/graveler/main_test.go b/graveler/main_test.go index 021716a43f9..f7b0e9be023 100644 --- a/graveler/main_test.go +++ b/graveler/main_test.go @@ -1,4 +1,4 @@ -package graveler +package graveler_test import ( "flag" @@ -6,9 +6,12 @@ import ( "os" "testing" + "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/graveler" + "github.com/treeverse/lakefs/testutil" + "github.com/ory/dockertest/v3" "github.com/sirupsen/logrus" - "github.com/treeverse/lakefs/testutil" ) var ( @@ -16,6 +19,18 @@ var ( databaseURI string ) +func testRefManager(t testing.TB) graveler.RefManager { + t.Helper() + conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true)) + return graveler.NewPGRefManager(conn) +} + +func testRefManagerWithDB(t testing.TB) (graveler.RefManager, db.Database) { + t.Helper() + conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true)) + return graveler.NewPGRefManager(conn), conn +} + func TestMain(m *testing.M) { flag.Parse() if !testing.Verbose() { diff --git a/graveler/pgiterator.go b/graveler/pgiterator.go new file mode 100644 index 00000000000..3a4af262317 --- /dev/null +++ b/graveler/pgiterator.go @@ -0,0 +1,313 @@ +package graveler + +import ( + "context" + + "github.com/treeverse/lakefs/db" +) + +const ( + // IteratorPrefetchSize is the amount of records to fetch from PG + IteratorPrefetchSize = 1000 +) + +type pgBranchRecord struct { + BranchID `db:"id"` + *PgBranch +} + +type PGRepositoryIterator struct { + db db.Database + ctx context.Context + + value *RepositoryRecord + buf []*RepositoryRecord + + offset string + fetchSize int + shouldFetch bool + + err error +} + +func NewRepositoryIterator(ctx context.Context, db db.Database, fetchSize int, offset string) *PGRepositoryIterator { + return &PGRepositoryIterator{ + db: db, + ctx: ctx, + fetchSize: fetchSize, + shouldFetch: true, + offset: offset, + } +} + +func (ri *PGRepositoryIterator) Next() bool { + // no buffer is initialized + if ri.buf == nil { + ri.fetch(true) // initial fetch + } else if len(ri.buf) == 0 { + ri.fetch(false) // paginating since we're out of values + } + + if len(ri.buf) == 0 { + return false + } + + // stage a value and increment offset + ri.value = ri.buf[0] + ri.offset = string(ri.value.RepositoryID) + if len(ri.buf) > 1 { + ri.buf = ri.buf[1:] + } else { + ri.buf = make([]*RepositoryRecord, 0) + } + + return true +} + +func (ri *PGRepositoryIterator) fetch(initial bool) { + const ( + offsetGE = ">=" + offsetGT = ">" + ) + if !ri.shouldFetch { + return + } + offsetCondition := offsetGT + if initial { + offsetCondition = offsetGE + } + err := ri.db.WithContext(ri.ctx).Select(&ri.buf, ` + SELECT id, storage_namespace, creation_date, default_branch + FROM graveler_repositories + WHERE id `+offsetCondition+` $1 + ORDER BY id ASC + LIMIT $2`, ri.offset, ri.fetchSize) + if err != nil { + ri.err = err + return + } + if len(ri.buf) < ri.fetchSize { + ri.shouldFetch = false + } +} + +func (ri *PGRepositoryIterator) SeekGE(id RepositoryID) bool { + ri.offset = string(id) + ri.shouldFetch = true + ri.buf = make([]*RepositoryRecord, 0) + ri.fetch(true) // do a new initial fetch + + if len(ri.buf) == 0 { + return false + } + + // stage a value and increment offset + ri.value = ri.buf[0] + ri.offset = string(ri.value.RepositoryID) + if len(ri.buf) > 1 { + ri.buf = ri.buf[0 : len(ri.buf)-1] + } else { + ri.buf = make([]*RepositoryRecord, 0) + } + return true +} + +func (ri *PGRepositoryIterator) Value() *RepositoryRecord { + return ri.value +} + +func (ri *PGRepositoryIterator) Err() error { + return ri.err +} + +func (ri *PGRepositoryIterator) Close() { + +} + +type PGBranchIterator struct { + db db.Database + ctx context.Context + + repositoryID RepositoryID + value *BranchRecord + buf []*BranchRecord + + offset string + fetchSize int + shouldFetch bool + + err error +} + +func NewBranchIterator(ctx context.Context, db db.Database, repositoryID RepositoryID, prefetchSize int, offset string) *PGBranchIterator { + return &PGBranchIterator{ + db: db, + ctx: ctx, + repositoryID: repositoryID, + fetchSize: prefetchSize, + shouldFetch: true, + offset: offset, + } +} + +func (ri *PGBranchIterator) Next() bool { + // no buffer is initialized + if ri.buf == nil { + ri.fetch(true) // initial fetch + } else if len(ri.buf) == 0 { + ri.fetch(false) // paging size we're out of values + } + + if len(ri.buf) == 0 { + return false + } + + // stage a value and increment offset + ri.value = ri.buf[0] + ri.offset = string(ri.value.BranchID) + if len(ri.buf) > 1 { + ri.buf = ri.buf[1:] + } else { + ri.buf = make([]*BranchRecord, 0) + } + + return true +} + +func (ri *PGBranchIterator) fetch(initial bool) { + const ( + offsetGE = ">=" + offsetGT = ">" + ) + if !ri.shouldFetch { + return + } + offsetCondition := offsetGT + if initial { + offsetCondition = offsetGE + } + buf := make([]*pgBranchRecord, 0) + err := ri.db.WithContext(ri.ctx).Select(&buf, ` + SELECT id, staging_token, commit_id + FROM graveler_branches + WHERE repository_id = $1 + AND id `+offsetCondition+` $2 + ORDER BY id ASC + LIMIT $3`, ri.repositoryID, ri.offset, ri.fetchSize) + if err != nil { + ri.err = err + return + } + if len(buf) < ri.fetchSize { + ri.shouldFetch = false + } + ri.buf = make([]*BranchRecord, len(buf)) + for i, b := range buf { + ri.buf[i] = &BranchRecord{ + BranchID: b.BranchID, + Branch: &Branch{ + CommitID: b.CommitID, + stagingToken: b.StagingToken, + }, + } + } +} + +func (ri *PGBranchIterator) SeekGE(id BranchID) bool { + ri.offset = string(id) + ri.shouldFetch = true + ri.buf = make([]*BranchRecord, 0) + ri.fetch(true) // do a new initial fetch + + if len(ri.buf) == 0 { + return false + } + + // stage a value and increment offset + ri.value = ri.buf[0] + ri.offset = string(ri.value.BranchID) + if len(ri.buf) > 1 { + ri.buf = ri.buf[0 : len(ri.buf)-1] + } else { + ri.buf = make([]*BranchRecord, 0) + } + return true +} + +func (ri *PGBranchIterator) Value() *BranchRecord { + return ri.value +} + +func (ri *PGBranchIterator) Err() error { + return ri.err +} + +func (ri *PGBranchIterator) Close() { + +} + +type PGCommitIterator struct { + db db.Database + ctx context.Context + repositoryID RepositoryID + + value *CommitRecord + next CommitID + + err error +} + +func NewCommitIterator(ctx context.Context, db db.Database, repositoryID RepositoryID, start CommitID) *PGCommitIterator { + return &PGCommitIterator{ + db: db, + ctx: ctx, + repositoryID: repositoryID, + next: start, + } +} + +func (ci *PGCommitIterator) Next() bool { + if ci.value == nil { + return ci.fetch() + } + if len(ci.value.Commit.Parents) > 0 { + ci.next = ci.value.Commit.Parents[0] + return ci.fetch() + } + return false +} + +func (ci *PGCommitIterator) fetch() bool { + if ci.next == "" { + return false + } + record := &CommitRecord{} + err := ci.db.WithContext(ci.ctx).Get(record, ` + SELECT id, committer, message, creation_date, parents, tree_id, metadata + FROM graveler_commits + WHERE repository_id = $1 AND id = $2 + `, ci.repositoryID, ci.next) + if err != nil { + ci.err = err + return false + } + ci.value = record + return true +} + +func (ci *PGCommitIterator) SeekGE(id CommitID) bool { + ci.next = id + return ci.fetch() +} + +func (ci *PGCommitIterator) Value() *CommitRecord { + return ci.value +} + +func (ci *PGCommitIterator) Err() error { + return ci.err +} + +func (ci *PGCommitIterator) Close() { + +} diff --git a/graveler/pgiterator_test.go b/graveler/pgiterator_test.go new file mode 100644 index 00000000000..0689e24e844 --- /dev/null +++ b/graveler/pgiterator_test.go @@ -0,0 +1,182 @@ +package graveler_test + +import ( + "context" + "testing" + "time" + + "github.com/go-test/deep" + + "github.com/treeverse/lakefs/graveler" + + "github.com/treeverse/lakefs/testutil" +) + +func TestPGRepositoryIterator(t *testing.T) { + r, db := testRefManagerWithDB(t) + repos := []graveler.RepositoryID{"a", "aa", "b", "c", "e", "d"} + + // prepare data + for _, repoId := range repos { + testutil.Must(t, r.CreateRepository(context.Background(), repoId, graveler.Repository{ + StorageNamespace: "s3://foo", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{})) + } + + t.Run("listing all repos", func(t *testing.T) { + iter := graveler.NewRepositoryIterator(context.Background(), db, 3, "") + repoIds := make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(repoIds, []graveler.RepositoryID{"a", "aa", "b", "c", "d", "e"}); diffs != nil { + t.Fatalf("got wrong list of repo IDs: %v", diffs) + } + }) + + t.Run("listing repos from prefix", func(t *testing.T) { + iter := graveler.NewRepositoryIterator(context.Background(), db, 3, "b") + repoIds := make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(repoIds, []graveler.RepositoryID{"b", "c", "d", "e"}); diffs != nil { + t.Fatalf("got wrong list of repo IDs: %v", diffs) + } + }) + + t.Run("listing repos SeekGE", func(t *testing.T) { + iter := graveler.NewRepositoryIterator(context.Background(), db, 3, "b") + repoIds := make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(repoIds, []graveler.RepositoryID{"b", "c", "d", "e"}); diffs != nil { + t.Fatalf("got wrong list of repo IDs: %v", diffs) + } + + // now let's seek + if !iter.SeekGE("aa") { + t.Fatalf("we should have values here") + } + + repoIds = make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(repoIds, []graveler.RepositoryID{"aa", "b", "c", "d", "e"}); diffs != nil { + t.Fatalf("got wrong list of repo IDs: %v", diffs) + } + }) +} + +func TestPGBranchIterator(t *testing.T) { + r, db := testRefManagerWithDB(t) + branches := []graveler.BranchID{"a", "aa", "b", "c", "e", "d"} + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://foo", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{})) + + // prepare data + for _, b := range branches { + testutil.Must(t, r.SetBranch(context.Background(), "repo1", b, graveler.Branch{CommitID: "c1"})) + } + + t.Run("listing all branches", func(t *testing.T) { + iter := graveler.NewBranchIterator(context.Background(), db, "repo1", 3, "") + ids := make([]graveler.BranchID, 0) + for iter.Next() { + b := iter.Value() + ids = append(ids, b.BranchID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(ids, []graveler.BranchID{"a", "aa", "b", "c", "d", "e", "master"}); diffs != nil { + t.Fatalf("got wrong list of IDs: %v", diffs) + } + }) + + t.Run("listing branches from prefix", func(t *testing.T) { + iter := graveler.NewBranchIterator(context.Background(), db, "repo1", 3, "b") + ids := make([]graveler.BranchID, 0) + for iter.Next() { + b := iter.Value() + ids = append(ids, b.BranchID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(ids, []graveler.BranchID{"b", "c", "d", "e", "master"}); diffs != nil { + t.Fatalf("got wrong list of branch IDs: %v", diffs) + } + }) + + t.Run("listing branches SeekGE", func(t *testing.T) { + iter := graveler.NewBranchIterator(context.Background(), db, "repo1", 3, "b") + ids := make([]graveler.BranchID, 0) + for iter.Next() { + b := iter.Value() + ids = append(ids, b.BranchID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(ids, []graveler.BranchID{"b", "c", "d", "e", "master"}); diffs != nil { + t.Fatalf("got wrong list of branch IDs: %v", diffs) + } + + // now let's seek + if !iter.SeekGE("aa") { + t.Fatalf("we should have values here") + } + + ids = make([]graveler.BranchID, 0) + for iter.Next() { + b := iter.Value() + ids = append(ids, b.BranchID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if diffs := deep.Equal(ids, []graveler.BranchID{"aa", "b", "c", "d", "e", "master"}); diffs != nil { + t.Fatalf("got wrong list of branch IDs") + } + }) +} diff --git a/graveler/pgrefs.go b/graveler/pgrefs.go new file mode 100644 index 00000000000..b15a9883c5f --- /dev/null +++ b/graveler/pgrefs.go @@ -0,0 +1,265 @@ +package graveler + +import ( + "context" + "database/sql/driver" + "errors" + "strings" + + "github.com/treeverse/lakefs/ident" + + "github.com/jackc/pgtype" + "github.com/treeverse/lakefs/db" +) + +type PgBranch struct { + CommitID CommitID `db:"commit_id"` + StagingToken StagingToken `db:"staging_token"` +} + +func (ps CommitParents) Value() (driver.Value, error) { + if ps == nil { + return []string{}, nil + } + vs := make([]string, len(ps)) + for i, v := range ps { + vs[i] = string(v) + } + return vs, nil +} + +func (ps *CommitParents) Scan(src interface{}) error { + p := pgtype.TextArray{} + err := p.Scan(src) + if err != nil { + return err + } + for _, v := range p.Elements { + *ps = append(*ps, CommitID(v.String)) + } + return nil +} + +type PGRefManager struct { + db db.Database +} + +func NewPGRefManager(db db.Database) *PGRefManager { + return &PGRefManager{db} +} + +func (m *PGRefManager) GetRepository(ctx context.Context, repositoryID RepositoryID) (*Repository, error) { + repository, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + repository := &Repository{} + err := tx.Get(repository, + `SELECT storage_namespace, creation_date, default_branch FROM graveler_repositories WHERE id = $1`, + repositoryID) + if err != nil { + return nil, err + } + return repository, nil + }, db.ReadOnly(), db.WithContext(ctx)) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return repository.(*Repository), nil +} + +func (m *PGRefManager) CreateRepository(ctx context.Context, repositoryID RepositoryID, repository Repository, branch Branch) error { + _, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + _, err := tx.Exec( + `INSERT INTO graveler_repositories (id, storage_namespace, creation_date, default_branch) VALUES ($1, $2, $3, $4)`, + repositoryID, repository.StorageNamespace, repository.CreationDate, repository.DefaultBranchID) + if errors.Is(err, db.ErrAlreadyExists) { + return nil, ErrNotUnique + } + if err != nil { + return nil, err + } + _, err = tx.Exec(` + INSERT INTO graveler_branches (repository_id, id, staging_token, commit_id) + VALUES ($1, $2, $3, $4)`, + repositoryID, repository.DefaultBranchID, branch.stagingToken, branch.CommitID) + return nil, err + }, db.WithContext(ctx)) + return err +} + +func (m *PGRefManager) ListRepositories(ctx context.Context, from RepositoryID) (RepositoryIterator, error) { + return NewRepositoryIterator(ctx, m.db, IteratorPrefetchSize, string(from)), nil +} + +func (m *PGRefManager) DeleteRepository(ctx context.Context, repositoryID RepositoryID) error { + _, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + var err error + _, err = tx.Exec(`DELETE FROM graveler_branches WHERE repository_id = $1`, repositoryID) + if err != nil { + return nil, err + } + _, err = tx.Exec(`DELETE FROM graveler_commits WHERE repository_id = $1`, repositoryID) + if err != nil { + return nil, err + } + _, err = tx.Exec(`DELETE FROM graveler_repositories WHERE id = $1`, repositoryID) + return nil, err + }, db.WithContext(ctx)) + return err +} + +func (m *PGRefManager) RevParse(ctx context.Context, repositoryID RepositoryID, ref Ref) (Reference, error) { + return ResolveRef(ctx, m, repositoryID, ref) +} + +func (m *PGRefManager) GetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (*Branch, error) { + branch, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + pbranch := &PgBranch{} + err := tx.Get(pbranch, + `SELECT staging_token, commit_id FROM graveler_branches WHERE repository_id = $1 AND id = $2`, + repositoryID, branchID) + if err != nil { + return nil, err + } + return &Branch{ + CommitID: pbranch.CommitID, + stagingToken: pbranch.StagingToken, + }, nil + }, db.ReadOnly(), db.WithContext(ctx)) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return branch.(*Branch), nil +} + +func (m *PGRefManager) SetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, branch Branch) error { + _, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + _, err := tx.Exec(` + INSERT INTO graveler_branches (repository_id, id, staging_token, commit_id) + VALUES ($1, $2, $3, $4) + ON CONFLICT (repository_id, id) + DO UPDATE SET staging_token = $3, commit_id = $4`, + repositoryID, branchID, branch.stagingToken, branch.CommitID) + return nil, err + }, db.WithContext(ctx)) + return err +} + +func (m *PGRefManager) DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error { + _, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + r, err := tx.Exec( + `DELETE FROM graveler_branches WHERE repository_id = $1 AND id = $2`, + repositoryID, branchID) + if err != nil { + return nil, err + } + if r.RowsAffected() == 0 { + return nil, ErrNotFound + } + return nil, nil + }, db.WithContext(ctx)) + return err +} + +func (m *PGRefManager) ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error) { + return NewBranchIterator(ctx, m.db, repositoryID, IteratorPrefetchSize, string(from)), nil +} + +func (m *PGRefManager) GetCommitByPrefix(ctx context.Context, repositoryID RepositoryID, prefix CommitID) (*Commit, error) { + commit, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + records := make([]*CommitRecord, 0) + // LIMIT 2 is used to test if a truncated commit ID resolves to *one* commit. + // if we get 2 results that start with the truncated ID, that's enough to determine this prefix is not unique + err := tx.Select(&records, ` + SELECT id, committer, message, creation_date, parents, tree_id, metadata + FROM graveler_commits + WHERE repository_id = $1 AND id >= $2 + LIMIT 2`, + repositoryID, prefix) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + startWith := make([]*Commit, 0) + for _, c := range records { + if strings.HasPrefix(string(c.CommitID), string(prefix)) { + startWith = append(startWith, c.Commit) + } + } + if len(startWith) == 0 { + return "", ErrNotFound + } + if len(startWith) > 1 { + return "", ErrCommitIDAmbiguous // more than 1 commit starts with the ID prefix + } + return startWith[0], nil + }, db.ReadOnly(), db.WithContext(ctx)) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return commit.(*Commit), nil +} + +func (m *PGRefManager) GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error) { + commit, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + commit := &Commit{} + err := tx.Get(commit, ` + SELECT committer, message, creation_date, parents, tree_id, metadata + FROM graveler_commits WHERE repository_id = $1 AND id = $2`, + repositoryID, commitID) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return commit, nil + }, db.ReadOnly(), db.WithContext(ctx)) + if errors.Is(err, db.ErrNotFound) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return commit.(*Commit), nil +} + +func (m *PGRefManager) AddCommit(ctx context.Context, repositoryID RepositoryID, commit Commit) (CommitID, error) { + _, err := m.db.Transact(func(tx db.Tx) (interface{}, error) { + // commits are written based on their content hash, if we insert the same ID again, + // it will necessarily have the same attributes as the existing one, so no need to overwrite it + _, err := tx.Exec(` + INSERT INTO graveler_commits + (repository_id, id, committer, message, creation_date, parents, tree_id, metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING`, + repositoryID, ident.ContentAddress(commit), commit.Committer, commit.Message, + commit.CreationDate, commit.Parents, commit.TreeID, commit.Metadata) + return nil, err + }, db.WithContext(ctx)) + if err != nil { + return "", err + } + return CommitID(ident.ContentAddress(commit)), err +} + +func (m *PGRefManager) FindMergeBase(ctx context.Context, repositoryID RepositoryID, commitIDs ...CommitID) (*Commit, error) { + const allowedCommitsToCompare = 2 + if len(commitIDs) != allowedCommitsToCompare { + return nil, ErrInvalidMergeBase + } + return FindLowestCommonAncestor(ctx, m, repositoryID, commitIDs[0], commitIDs[1]) +} + +func (m *PGRefManager) Log(ctx context.Context, repositoryID RepositoryID, from CommitID) (CommitIterator, error) { + return NewCommitIterator(ctx, m.db, repositoryID, from), nil +} diff --git a/graveler/pgrefs_test.go b/graveler/pgrefs_test.go new file mode 100644 index 00000000000..3401c0a505d --- /dev/null +++ b/graveler/pgrefs_test.go @@ -0,0 +1,370 @@ +package graveler_test + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/treeverse/lakefs/graveler" + + "github.com/treeverse/lakefs/catalog" + + "github.com/treeverse/lakefs/testutil" +) + +func TestPGRefManager_GetRepository(t *testing.T) { + r := testRefManager(t) + t.Run("repo_doesnt_exist", func(t *testing.T) { + _, err := r.GetRepository(context.Background(), "example-repo") + if err != graveler.ErrNotFound { + t.Fatalf("expected ErrNotFound got error: %v", err) + } + }) + t.Run("repo_exists", func(t *testing.T) { + testutil.Must(t, r.CreateRepository(context.Background(), "example-repo", graveler.Repository{ + StorageNamespace: "s3://foo", + CreationDate: time.Now(), + DefaultBranchID: "weird-branch", + }, graveler.Branch{})) + + repo, err := r.GetRepository(context.Background(), "example-repo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if repo.DefaultBranchID != "weird-branch" { + t.Fatalf("got unexpected branch ID: %s", repo.DefaultBranchID) + } + }) +} + +func TestPGRefManager_ListRepositories(t *testing.T) { + r := testRefManager(t) + repos := []graveler.RepositoryID{"a", "aa", "b", "c", "e", "d"} + for _, repoId := range repos { + testutil.Must(t, r.CreateRepository(context.Background(), repoId, graveler.Repository{ + StorageNamespace: "s3://foo", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{})) + } + + t.Run("listing all repos", func(t *testing.T) { + iter, err := r.ListRepositories(context.Background(), "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + repoIds := make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if !reflect.DeepEqual(repoIds, []graveler.RepositoryID{"a", "aa", "b", "c", "d", "e"}) { + t.Fatalf("got wrong list of repo IDs") + } + }) + + t.Run("listing repos from prefix", func(t *testing.T) { + iter, err := r.ListRepositories(context.Background(), "aaa") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + repoIds := make([]graveler.RepositoryID, 0) + for iter.Next() { + repo := iter.Value() + repoIds = append(repoIds, repo.RepositoryID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + iter.Close() + + if !reflect.DeepEqual(repoIds, []graveler.RepositoryID{"b", "c", "d", "e"}) { + t.Fatalf("got wrong list of repo IDs") + } + }) +} + +func TestPGRefManager_DeleteRepository(t *testing.T) { + r := testRefManager(t) + t.Run("repo_exists", func(t *testing.T) { + testutil.Must(t, r.CreateRepository(context.Background(), "example-repo", graveler.Repository{ + StorageNamespace: "s3://foo", + CreationDate: time.Now(), + DefaultBranchID: "weird-branch", + }, graveler.Branch{})) + + _, err := r.GetRepository(context.Background(), "example-repo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + err = r.DeleteRepository(context.Background(), "example-repo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = r.GetRepository(context.Background(), "example-repo") + if err != graveler.ErrNotFound { + t.Fatalf("expected ErrNotFound, got: %v", err) + } + }) + + t.Run("repo_does_not_exists", func(t *testing.T) { + // delete repo always returns success even if the repo doesn't exist + err := r.DeleteRepository(context.Background(), "example-repo11111") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestPGRefManager_GetBranch(t *testing.T) { + r := testRefManager(t) + t.Run("get_branch_exists", func(t *testing.T) { + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + branch, err := r.GetBranch(context.Background(), "repo1", "master") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if branch.CommitID != "c1" { + t.Fatalf("unexpected branch recevied: %s - expected c1", branch.CommitID) + } + }) + + t.Run("get_branch_doesnt_exists", func(t *testing.T) { + _, err := r.GetBranch(context.Background(), "repo1", "masterrrrr") + if err != graveler.ErrNotFound { + t.Fatalf("expected ErrNotFound, got error: %v", err) + } + }) +} + +func TestPGRefManager_SetBranch(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + + testutil.Must(t, r.SetBranch(context.Background(), "repo1", "branch2", graveler.Branch{ + CommitID: "c2", + })) + + b, err := r.GetBranch(context.Background(), "repo1", "branch2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b.CommitID != "c2" { + t.Fatalf("unexpected commit for branch2: %s - expected: c2", b.CommitID) + } + + // overwrite + testutil.Must(t, r.SetBranch(context.Background(), "repo1", "branch2", graveler.Branch{ + CommitID: "c3", + })) + + b, err = r.GetBranch(context.Background(), "repo1", "branch2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b.CommitID != "c3" { + t.Fatalf("unexpected commit for branch2: %s - expected: c3", b.CommitID) + } + +} + +func TestPGRefManager_DeleteBranch(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + + testutil.Must(t, r.SetBranch(context.Background(), "repo1", "branch2", graveler.Branch{ + CommitID: "c2", + })) + + testutil.Must(t, r.DeleteBranch(context.Background(), "repo1", "branch2")) + + _, err := r.GetBranch(context.Background(), "repo1", "branch2") + if err != graveler.ErrNotFound { + t.Fatalf("unexpected error: %v", err) + } + +} + +func TestPGRefManager_ListBranches(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + + for _, b := range []graveler.BranchID{"a", "aa", "c", "b", "z", "f"} { + testutil.Must(t, r.SetBranch(context.Background(), "repo1", b, graveler.Branch{ + CommitID: "c2", + })) + } + + iter, err := r.ListBranches(context.Background(), "repo1", "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var bs []graveler.BranchID + for iter.Next() { + b := iter.Value() + bs = append(bs, b.BranchID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + if !reflect.DeepEqual(bs, []graveler.BranchID{"a", "aa", "b", "c", "f", "master", "z"}) { + t.Fatalf("unexpected branch list: %v", bs) + } +} + +func TestPGRefManager_AddCommit(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + + ts, _ := time.Parse(time.RFC3339, "2020-12-01T15:00:00Z00:00") + c := graveler.Commit{ + Committer: "user1", + Message: "message1", + TreeID: "deadbeef123", + CreationDate: ts, + Parents: graveler.CommitParents{"deadbeef1", "deadbeef12"}, + Metadata: catalog.Metadata{"foo": "bar"}, + } + + cid, err := r.AddCommit(context.Background(), "repo1", c) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cid != "2277b5abd2d3ba6b4d35c48a0e358b0c4bcf5cd6d891c67437fb4c4af0d2fd4b" { + t.Fatalf("unexpected commit ID: %s", cid) + } + + commit, err := r.GetCommit(context.Background(), "repo1", cid) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if commit.Parents[0] != "deadbeef1" { + t.Fatalf("expected parent1 to be deadbeef1, got %v", commit.Parents) + } + + if commit.Metadata["foo"] != "bar" { + t.Fatalf("unexpected metadata value for foo: %v", commit.Metadata["foo"]) + } +} + +func TestPGRefManager_Log(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{ + CommitID: "c1", + })) + + ts, _ := time.Parse(time.RFC3339, "2020-12-01T15:00:00Z") + var previous graveler.CommitID + for i := 0; i < 20; i++ { + c := graveler.Commit{ + Committer: "user1", + Message: "message1", + TreeID: "deadbeef123", + CreationDate: ts, + Parents: graveler.CommitParents{previous}, + Metadata: catalog.Metadata{"foo": "bar"}, + } + cid, err := r.AddCommit(context.Background(), "repo1", c) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + previous = cid + } + + iter, err := r.Log(context.Background(), "repo1", previous) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ids := make([]graveler.CommitID, 0) + for iter.Next() { + c := iter.Value() + ids = append(ids, c.CommitID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + + expected := graveler.CommitParents{ + "c3f815d633789cd7c1325352277d4de528844c758a9beedfa8a3cfcfb5c75627", + "8549d7544244ba1b63b5967b6b328b331658f627369cb89bd442684719c318ae", + "13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774", + "7de38592b9e6046ffb55915a40848f05749f168531f0cd6a2aa61fe6e8d92d02", + "94c7773c89650e99671c33d46e31226230cdaeed79a77cdbd7c7419ea68b91ca", + "0efcb6e81db6bdd2cfeb77664b6573a7d69f555bbe561fd1fd018a4e4cac7603", + "d85e4ae46b63f641b439afde9ebab794a3c39c203a42190c0b9d7773ab71a60e", + "a766cfdb311fe5f18f489d90d283f65ed522e719fe1ad5397277339eee0d1964", + "67ea954d570e20172775f41ac9763905d16d73490d9b72731d353db33f85d437", + "d3b16c2cf7f5b9adc2770976bcabe463a5bdd3b5dbf740034f09a9c663620aed", + "d420fbf793716d6d53798218d7a247f38a5bbed095d57df71ee79e05446e46ec", + "cc72bda1adade1a72b3de617472c16af187063c79e7edc7921c04e883b44de4c", + "752581ac60bd8e38a2e65a754591a93a1703dc6c658f91380b8836013188c566", + "3cf70857454c71fd0bbf69af8a5360671ba98f6ac9371b047144208c58c672a2", + "bfa1e0382ff3c51905dc62ced0a67588b5219c1bba71a517ae7e7857f0c26afe", + "d2248dcc1a4de004e10e3bc6b820655e649b8d986d983b60ec98a357a0df194b", + "a2d98d820f6ff3f221223dbe6a22548f78549830d3b19286b101f13a0ee34085", + "4f13621ec00d4e44e8a0f0ad340224f9d51db9b6518ee7bef17f598aea9e0431", + "df87d5329f4438662d6ecb9b90ee17c0bdc9a78a884acc93c0c4fe9f0f79d059", + "29706d36de7219e0796c31b278f87201ef835e8cdafbcc3c907d292cd31f77d5", + } + + if len(expected) != len(ids) { + t.Fatalf("wrong size of log: %d - expected %d", len(ids), len(expected)) + } + + for i, cid := range ids { + if cid != expected[i] { + t.Fatalf("wrong commit ID at index %d: got %v expected %v", i, cid, expected[i]) + } + } +} diff --git a/graveler/refmergebase.go b/graveler/refmergebase.go new file mode 100644 index 00000000000..ddbfd7023a5 --- /dev/null +++ b/graveler/refmergebase.go @@ -0,0 +1,101 @@ +package graveler + +import ( + "context" + + "github.com/treeverse/lakefs/ident" +) + +// taken from history: +// https://github.com/treeverse/lakeFS/blob/606bf07969c14a569a60efe9c92831f424fa7f36/index/dag/commit_iterator.go +type CommitGetter interface { + GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error) +} + +type CommitWalker struct { + getter CommitGetter + ctx context.Context + + repositoryID RepositoryID + + queue []CommitID + discoveredSet map[CommitID]struct{} + value *Commit + err error +} + +func NewCommitWalker(ctx context.Context, getter CommitGetter, repositoryID RepositoryID, startID CommitID) *CommitWalker { + return &CommitWalker{ + getter: getter, + ctx: ctx, + repositoryID: repositoryID, + queue: []CommitID{startID}, + discoveredSet: make(map[CommitID]struct{}), + } +} + +func (w *CommitWalker) Next() bool { + if w.err != nil || len(w.queue) == 0 { + w.value = nil + return false // no more values to walk! + } + + // pop + addr := w.queue[0] + w.queue = w.queue[1:] + commit, err := w.getter.GetCommit(w.ctx, w.repositoryID, addr) + if err != nil { + w.err = err + w.value = nil + return false + } + + // fill queue + for _, parent := range commit.Parents { + if _, wasDiscovered := w.discoveredSet[parent]; !wasDiscovered { + w.queue = append(w.queue, parent) + w.discoveredSet[parent] = struct{}{} + } + } + w.value = commit + return true +} + +func (w *CommitWalker) Value() *Commit { + return w.value +} + +func (w *CommitWalker) Err() error { + return w.err +} + +func FindLowestCommonAncestor(ctx context.Context, getter CommitGetter, repositoryID RepositoryID, left, right CommitID) (*Commit, error) { + discoveredSet := make(map[string]struct{}) + iterLeft := NewCommitWalker(ctx, getter, repositoryID, left) + iterRight := NewCommitWalker(ctx, getter, repositoryID, right) + for { + commit, err := findLowestCommonAncestorNextIter(discoveredSet, iterLeft) + if commit != nil || err != nil { + return commit, err + } + commit, err = findLowestCommonAncestorNextIter(discoveredSet, iterRight) + if commit != nil || err != nil { + return commit, err + } + if iterLeft.Value() == nil && iterRight.Value() == nil { + break + } + } + return nil, nil +} + +func findLowestCommonAncestorNextIter(discoveredSet map[string]struct{}, iter *CommitWalker) (*Commit, error) { + if iter.Next() { + commit := iter.Value() + if _, wasDiscovered := discoveredSet[ident.ContentAddress(commit)]; wasDiscovered { + return commit, nil + } + discoveredSet[ident.ContentAddress(commit)] = struct{}{} + } + return nil, iter.Err() +} diff --git a/graveler/refmergebase_test.go b/graveler/refmergebase_test.go new file mode 100644 index 00000000000..27273094399 --- /dev/null +++ b/graveler/refmergebase_test.go @@ -0,0 +1,169 @@ +package graveler_test + +import ( + "context" + "testing" + + "github.com/treeverse/lakefs/graveler" + "github.com/treeverse/lakefs/ident" +) + +type MockCommitGetter struct { + kv map[graveler.CommitID]*graveler.Commit + visited map[graveler.CommitID]interface{} +} + +func (g *MockCommitGetter) GetCommit(ctx context.Context, repositoryID graveler.RepositoryID, commitID graveler.CommitID) (*graveler.Commit, error) { + for _, v := range g.kv { + if caddr(v) == commitID { + g.visited[commitID] = struct{}{} + return v, nil + } + } + return nil, graveler.ErrNotFound +} + +func newReader(kv map[graveler.CommitID]*graveler.Commit) *MockCommitGetter { + return &MockCommitGetter{ + kv: kv, + visited: make(map[graveler.CommitID]interface{}), + } +} + +func caddr(commit *graveler.Commit) graveler.CommitID { + if commit == nil { + return graveler.CommitID("") + } + return graveler.CommitID(ident.ContentAddress(commit)) +} + +func TestFindLowestCommonAncestor(t *testing.T) { + cases := []struct { + Name string + Left graveler.CommitID + Right graveler.CommitID + Getter func() *MockCommitGetter + Expected graveler.CommitID + NoVisitExpected []graveler.CommitID + }{ + { + Name: "root_match", + Left: "c7", + Right: "c6", + Getter: func() *MockCommitGetter { + c0 := &graveler.Commit{Message: "0", Parents: []graveler.CommitID{}} + c1 := &graveler.Commit{Message: "1", Parents: []graveler.CommitID{caddr(c0)}} + c2 := &graveler.Commit{Message: "2", Parents: []graveler.CommitID{caddr(c0)}} + c3 := &graveler.Commit{Message: "3", Parents: []graveler.CommitID{caddr(c1)}} + c4 := &graveler.Commit{Message: "4", Parents: []graveler.CommitID{caddr(c2)}} + c5 := &graveler.Commit{Message: "5", Parents: []graveler.CommitID{caddr(c3)}} + c6 := &graveler.Commit{Message: "6", Parents: []graveler.CommitID{caddr(c4)}} + c7 := &graveler.Commit{Message: "7", Parents: []graveler.CommitID{caddr(c5)}} + return newReader(map[graveler.CommitID]*graveler.Commit{ + "c0": c0, "c1": c1, "c2": c2, "c3": c3, "c4": c4, "c5": c5, "c6": c6, "c7": c7, + }) + }, + Expected: "c0", + }, + { + Name: "close_ancestor", + Left: "c3", + Right: "c4", + Getter: func() *MockCommitGetter { + c0 := &graveler.Commit{Message: "0", Parents: []graveler.CommitID{}} + c1 := &graveler.Commit{Message: "1", Parents: []graveler.CommitID{caddr(c0)}} + c2 := &graveler.Commit{Message: "2", Parents: []graveler.CommitID{caddr(c1)}} + c3 := &graveler.Commit{Message: "3", Parents: []graveler.CommitID{caddr(c2)}} + c4 := &graveler.Commit{Message: "4", Parents: []graveler.CommitID{caddr(c2)}} + return newReader(map[graveler.CommitID]*graveler.Commit{ + "c0": c0, "c1": c1, "c2": c2, "c3": c3, "c4": c4, + }) + }, + Expected: "c2", + NoVisitExpected: []graveler.CommitID{"c0"}, + }, + { + Name: "criss_cross", + Left: "c5", + Right: "c6", + Getter: func() *MockCommitGetter { + c0 := &graveler.Commit{Message: "0", Parents: []graveler.CommitID{}} + c1 := &graveler.Commit{Message: "1", Parents: []graveler.CommitID{caddr(c0)}} + c2 := &graveler.Commit{Message: "2", Parents: []graveler.CommitID{caddr(c0)}} + c3 := &graveler.Commit{Message: "3", Parents: []graveler.CommitID{caddr(c1), caddr(c2)}} + c4 := &graveler.Commit{Message: "4", Parents: []graveler.CommitID{caddr(c1), caddr(c2)}} + c5 := &graveler.Commit{Message: "5", Parents: []graveler.CommitID{caddr(c3)}} + c6 := &graveler.Commit{Message: "6", Parents: []graveler.CommitID{caddr(c4)}} + return newReader(map[graveler.CommitID]*graveler.Commit{ + "c0": c0, "c1": c1, "c2": c2, "c3": c3, "c4": c4, "c5": c5, "c6": c6, + }) + }, + Expected: "c1", + NoVisitExpected: []graveler.CommitID{"c0"}, + }, + { + Name: "contained", + Left: "c2", + Right: "c1", + Getter: func() *MockCommitGetter { + c0 := &graveler.Commit{Message: "0", Parents: []graveler.CommitID{}} + c1 := &graveler.Commit{Message: "1", Parents: []graveler.CommitID{caddr(c0)}} + c2 := &graveler.Commit{Message: "2", Parents: []graveler.CommitID{caddr(c1)}} + return newReader(map[graveler.CommitID]*graveler.Commit{ + "c0": c0, "c1": c1, "c2": c2, + }) + }, + Expected: "c1", + }, + { + Name: "parallel", + Left: "c7", + Right: "c3", + Getter: func() *MockCommitGetter { + c0 := &graveler.Commit{Message: "0", Parents: []graveler.CommitID{}} + c1 := &graveler.Commit{Message: "1", Parents: []graveler.CommitID{caddr(c0)}} + c2 := &graveler.Commit{Message: "2", Parents: []graveler.CommitID{caddr(c1)}} + c3 := &graveler.Commit{Message: "3", Parents: []graveler.CommitID{caddr(c2)}} + c4 := &graveler.Commit{Message: "4", Parents: []graveler.CommitID{}} + c5 := &graveler.Commit{Message: "5", Parents: []graveler.CommitID{caddr(c4)}} + c6 := &graveler.Commit{Message: "6", Parents: []graveler.CommitID{caddr(c5)}} + c7 := &graveler.Commit{Message: "7", Parents: []graveler.CommitID{caddr(c6)}} + return newReader(map[graveler.CommitID]*graveler.Commit{ + "c0": c0, "c1": c1, "c2": c2, "c3": c3, "c4": c4, "c5": c5, "c6": c6, "c7": c7, + }) + }, + Expected: "", + }, + } + for _, cas := range cases { + t.Run(cas.Name, func(t *testing.T) { + getter := cas.Getter() + base, err := graveler.FindLowestCommonAncestor( + context.Background(), getter, "", caddr(getter.kv[cas.Left]), caddr(getter.kv[cas.Right])) + if err != nil { + t.Fatal(err) + } + var addr graveler.CommitID + if base != nil { + addr = caddr(base) + } + if addr != caddr(getter.kv[cas.Expected]) { + key := "unknown" + for k, v := range getter.kv { + if addr == caddr(v) { + key = string(k) + break + } + } + t.Fatalf("expected %v (%v) got %v (%v)", cas.Expected, caddr(getter.kv[cas.Expected]), key, addr) + } + + //check efficiency i.e check that we didn't iterate over unnecessary nodes + for _, addr := range cas.NoVisitExpected { + if getter.visited[addr] != nil { + t.Fatalf("commit %s should not be visited", addr) + } + } + }) + } +} diff --git a/graveler/refresolver.go b/graveler/refresolver.go new file mode 100644 index 00000000000..ad6836a8443 --- /dev/null +++ b/graveler/refresolver.go @@ -0,0 +1,132 @@ +package graveler + +import ( + "context" + "errors" + + "github.com/treeverse/lakefs/ident" +) + +type RefStore interface { + GetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (*Branch, error) + GetCommitByPrefix(ctx context.Context, repositoryID RepositoryID, prefix CommitID) (*Commit, error) + Log(ctx context.Context, repositoryID RepositoryID, from CommitID) (CommitIterator, error) +} + +type reference struct { + typ ReferenceType + branch *Branch + commitID *CommitID +} + +func (r reference) Type() ReferenceType { + return r.typ +} + +func (r reference) Branch() Branch { + return *r.branch +} + +func (r reference) CommitID() CommitID { + return *r.commitID +} + +func ResolveRef(ctx context.Context, store RefStore, repositoryID RepositoryID, ref Ref) (Reference, error) { + // first we need to parse-rev to get a list references + // valid revs: branch, tag, commit ID, commit ID prefix (as long as unambiguous) + // valid modifiers: ~N + parsed, err := RevParse(ref) + if err != nil { + return nil, err + } + + var baseCommit CommitID + if isAHash(parsed.BaseRev) { + commit, err := store.GetCommitByPrefix(ctx, repositoryID, CommitID(parsed.BaseRev)) + if err != nil && !errors.Is(err, ErrNotFound) { + // couldn't check if it's a commit + return nil, err + } + if err == nil { + baseCommit = CommitID(ident.ContentAddress(commit)) + } + // otherwise, simply not a commit. Moving on. + } + + if baseCommit == "" { + // check if it's a branch + branch, err := store.GetBranch(ctx, repositoryID, BranchID(parsed.BaseRev)) + if err != nil && !errors.Is(err, ErrNotFound) { + return nil, err + } + if err == nil { + baseCommit = branch.CommitID + } + + if err == nil && len(parsed.Modifiers) == 0 { + return &reference{ + typ: ReferenceTypeBranch, + branch: branch, + commitID: &branch.CommitID, + }, nil + } + } + + // TODO(ozkatz): once we have tags, they should also be resolved + if baseCommit == "" { + return nil, ErrNotFound + } + + for _, mod := range parsed.Modifiers { + // lastly, apply modifier + switch mod.Type { + case RevModTypeTilde: + // skip mod.ValueNumeric iterations + iter, err := store.Log(ctx, repositoryID, baseCommit) + if err != nil { + return nil, err + } + i := 0 + found := false + for iter.Next() { + i++ // adding 1 because we start at base commit + if i == mod.Value+1 { + baseCommit = iter.Value().CommitID + found = true + break + } + } + if iter.Err() != nil { + return nil, iter.Err() + } + iter.Close() + // went too far! + if !found { + return nil, ErrNotFound + } + case RevModTypeCaret: + switch mod.Value { + case 0: + continue // ^0 = the commit itself + default: + // get the commit and extract parents + c, err := store.GetCommitByPrefix(ctx, repositoryID, baseCommit) + if err != nil { + return nil, err + } + if mod.Value > len(c.Parents) { + return nil, ErrInvalidRef + } + baseCommit = c.Parents[mod.Value-1] + } + + default: + return nil, ErrInvalidRef + } + } + + return reference{ + typ: ReferenceTypeCommit, + commitID: &baseCommit, + }, nil +} diff --git a/graveler/refresolver_test.go b/graveler/refresolver_test.go new file mode 100644 index 00000000000..b6d78e3bad0 --- /dev/null +++ b/graveler/refresolver_test.go @@ -0,0 +1,303 @@ +package graveler_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/treeverse/lakefs/graveler" + + "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/testutil" +) + +func TestPGRefManager_Dereference(t *testing.T) { + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{})) + + ts, _ := time.Parse(time.RFC3339, "2020-12-01T15:00:00Z") + var previous graveler.CommitID + for i := 0; i < 20; i++ { + c := graveler.Commit{ + Committer: "user1", + Message: "message1", + TreeID: "deadbeef123", + CreationDate: ts, + Parents: graveler.CommitParents{previous}, + Metadata: catalog.Metadata{"foo": "bar"}, + } + cid, err := r.AddCommit(context.Background(), "repo1", c) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + previous = cid + } + + iter, err := r.Log(context.Background(), "repo1", previous) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ids := make([]graveler.CommitID, 0) + for iter.Next() { + c := iter.Value() + ids = append(ids, c.CommitID) + } + if iter.Err() != nil { + t.Fatalf("unexpected error: %v", iter.Err()) + } + + // commit log: + // "c3f815d633789cd7c1325352277d4de528844c758a9beedfa8a3cfcfb5c75627", + // "8549d7544244ba1b63b5967b6b328b331658f627369cb89bd442684719c318ae", + // "13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774", + // "7de38592b9e6046ffb55915a40848f05749f168531f0cd6a2aa61fe6e8d92d02", + // "94c7773c89650e99671c33d46e31226230cdaeed79a77cdbd7c7419ea68b91ca", + // "0efcb6e81db6bdd2cfeb77664b6573a7d69f555bbe561fd1fd018a4e4cac7603", + // "d85e4ae46b63f641b439afde9ebab794a3c39c203a42190c0b9d7773ab71a60e", + // "a766cfdb311fe5f18f489d90d283f65ed522e719fe1ad5397277339eee0d1964", + // "67ea954d570e20172775f41ac9763905d16d73490d9b72731d353db33f85d437", + // "d3b16c2cf7f5b9adc2770976bcabe463a5bdd3b5dbf740034f09a9c663620aed", + // "d420fbf793716d6d53798218d7a247f38a5bbed095d57df71ee79e05446e46ec", + // "cc72bda1adade1a72b3de617472c16af187063c79e7edc7921c04e883b44de4c", + // "752581ac60bd8e38a2e65a754591a93a1703dc6c658f91380b8836013188c566", + // "3cf70857454c71fd0bbf69af8a5360671ba98f6ac9371b047144208c58c672a2", + // "bfa1e0382ff3c51905dc62ced0a67588b5219c1bba71a517ae7e7857f0c26afe", + // "d2248dcc1a4de004e10e3bc6b820655e649b8d986d983b60ec98a357a0df194b", + // "a2d98d820f6ff3f221223dbe6a22548f78549830d3b19286b101f13a0ee34085", + // "4f13621ec00d4e44e8a0f0ad340224f9d51db9b6518ee7bef17f598aea9e0431", + // "df87d5329f4438662d6ecb9b90ee17c0bdc9a78a884acc93c0c4fe9f0f79d059", + // "29706d36de7219e0796c31b278f87201ef835e8cdafbcc3c907d292cd31f77d5", + + testutil.Must(t, r.SetBranch(context.Background(), "repo1", "branch1", graveler.Branch{ + CommitID: "13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774", + })) + + testutil.Must(t, r.SetBranch(context.Background(), "repo1", "branch2", graveler.Branch{ + CommitID: "d420fbf793716d6d53798218d7a247f38a5bbed095d57df71ee79e05446e46ec", + })) + + table := []struct { + Name string + Ref graveler.Ref + Expected graveler.CommitID + ExpectedErr error + }{ + { + Name: "branch_exist", + Ref: graveler.Ref("branch1"), + Expected: graveler.CommitID("13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774"), + }, + { + Name: "branch_doesnt_exist", + Ref: graveler.Ref("branch3"), + ExpectedErr: graveler.ErrNotFound, + }, + { + Name: "commit", + Ref: graveler.Ref("13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774"), + Expected: graveler.CommitID("13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774"), + }, + { + Name: "commit_prefix_good", + Ref: graveler.Ref("13daf"), + Expected: graveler.CommitID("13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774"), + }, + { + Name: "commit_prefix_ambiguous", + Ref: graveler.Ref("a"), + ExpectedErr: graveler.ErrNotFound, + }, + { + Name: "commit_prefix_missing", + Ref: graveler.Ref("66666"), + ExpectedErr: graveler.ErrNotFound, + }, + { + Name: "branch_with_modifier", + Ref: graveler.Ref("branch1~2"), + Expected: graveler.CommitID("94c7773c89650e99671c33d46e31226230cdaeed79a77cdbd7c7419ea68b91ca"), + }, + { + Name: "commit_with_modifier", + Ref: graveler.Ref("13dafa9c45bcf67e6997776039cbf8ab571ace560ce9e13665f383434a495774~2"), + Expected: graveler.CommitID("94c7773c89650e99671c33d46e31226230cdaeed79a77cdbd7c7419ea68b91ca"), + }, + { + Name: "commit_prefix_with_modifier", + Ref: graveler.Ref("13dafa~2"), + Expected: graveler.CommitID("94c7773c89650e99671c33d46e31226230cdaeed79a77cdbd7c7419ea68b91ca"), + }, + { + Name: "commit_prefix_with_modifier_too_big", + Ref: graveler.Ref("2c14ddd9b097a8f96db3f27a454877c9513378635d313ba0f0277d793a183e72~200"), + ExpectedErr: graveler.ErrNotFound, + }, + } + + for _, cas := range table { + t.Run(cas.Name, func(t *testing.T) { + ref, err := r.RevParse(context.Background(), "repo1", cas.Ref) + if err != nil { + if cas.ExpectedErr == nil || !errors.Is(err, cas.ExpectedErr) { + t.Fatalf("unexpected error: %v", err) + } + return + } + if cas.Expected != ref.CommitID() { + t.Fatalf("got unexpected commit ID: %s - expected %s", ref.CommitID(), cas.Expected) + } + }) + } +} + +func TestPGRefManager_DereferenceWithGraph(t *testing.T) { + /* + + This is taken from `git help rev-parse` - let's run these tests + + G H I J + \ / \ / + D E F + \ | / \ + \ | / | + \|/ | + B C + \ / + \ / + A + + A = = A^0 + B = A^ = A^1 = A~1 + C = = A^2 + D = A^^ = A^1^1 = A~2 + E = B^2 = A^^2 + F = B^3 = A^^3 + G = A^^^ = A^1^1^1 = A~3 + H = D^2 = B^^2 = A^^^2 = A~2^2 + I = F^ = B^3^ = A^^3^ + J = F^2 = B^3^2 = A^^3^2 + + */ + r := testRefManager(t) + testutil.Must(t, r.CreateRepository(context.Background(), "repo1", graveler.Repository{ + StorageNamespace: "s3://", + CreationDate: time.Now(), + DefaultBranchID: "master", + }, graveler.Branch{})) + + G, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + H, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + I, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + J, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + D, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{G, H}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + E, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + F, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{I, J}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + B, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{D, E, F}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + C, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{F}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + A, err := r.AddCommit(context.Background(), "repo1", graveler.Commit{ + Parents: graveler.CommitParents{B, C}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + resolve := func(base graveler.CommitID, mod string, expected graveler.CommitID) { + ref := fmt.Sprintf("%s%s", base, mod) + resolved, err := r.RevParse(context.Background(), "repo1", graveler.Ref(ref)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resolved.CommitID() != expected { + t.Fatalf("expected %s == %s", ref, expected) + } + } + + // now the tests: + resolve(A, "^0", A) + resolve(A, "^", B) + resolve(A, "^1", B) + resolve(A, "~1", B) + resolve(A, "^2", C) + resolve(A, "^^", D) + resolve(A, "^1^1", D) + resolve(A, "~2", D) + resolve(B, "^2", E) + resolve(A, "^^2", E) + resolve(B, "^2", E) + resolve(A, "^^2", E) + resolve(B, "^3", F) + resolve(A, "^^3", F) + resolve(A, "^^^", G) + resolve(A, "^1^1^1", G) + resolve(A, "~3", G) + resolve(D, "^2", H) + resolve(B, "^^2", H) + resolve(A, "^^^2", H) + resolve(A, "~2^2", H) + resolve(F, "^", I) + resolve(B, "^3^", I) + resolve(A, "^^3^", I) + resolve(F, "^2", J) + resolve(B, "^3^2", J) + resolve(A, "^^3^2", J) +} diff --git a/graveler/revparse.go b/graveler/revparse.go new file mode 100644 index 00000000000..fca90b5433e --- /dev/null +++ b/graveler/revparse.go @@ -0,0 +1,77 @@ +package graveler + +import ( + "fmt" + "regexp" + "strconv" +) + +var ( + hashRegexp = regexp.MustCompile("^[a-fA-F0-9]{1,64}$") + modifiersRegexp = regexp.MustCompile("(^|[~^])[^^~]*") +) + +type RevModType uint8 + +const ( + RevModTypeTilde RevModType = iota + RevModTypeCaret +) + +func isAHash(part string) bool { + return hashRegexp.MatchString(part) +} + +type ParsedRev struct { + BaseRev string + Modifiers []RevModifier +} + +type RevModifier struct { + Type RevModType + Value int +} + +func parseMod(buf string) (RevModifier, error) { + amount := 1 + var err error + if len(buf) > 1 { + amount, err = strconv.Atoi(buf[1:]) + if err != nil { + return RevModifier{}, fmt.Errorf("could not parse modifier %s: %w", buf, ErrInvalidRef) + } + } + var typ RevModType + switch buf[0] { + case '~': + typ = RevModTypeTilde + case '^': + typ = RevModTypeCaret + default: + return RevModifier{}, ErrInvalidRef + } + + return RevModifier{ + Type: typ, + Value: amount, + }, nil +} + +func RevParse(r Ref) (ParsedRev, error) { + p := ParsedRev{ + Modifiers: make([]RevModifier, 0), + } + parts := modifiersRegexp.FindAllString(string(r), -1) + p.BaseRev = parts[0] + if p.BaseRev == "" { + return p, ErrInvalidRef + } + for _, part := range parts[1:] { + mod, err := parseMod(part) + if err != nil { + return p, err + } + p.Modifiers = append(p.Modifiers, mod) + } + return p, nil +} diff --git a/graveler/revparse_test.go b/graveler/revparse_test.go new file mode 100644 index 00000000000..4683db49bc1 --- /dev/null +++ b/graveler/revparse_test.go @@ -0,0 +1,138 @@ +package graveler_test + +import ( + "errors" + "testing" + + "github.com/treeverse/lakefs/graveler" +) + +func TestRevParse(t *testing.T) { + table := []struct { + Name string + Input string + Expected graveler.ParsedRev + ExpectedErr error + }{ + { + Name: "just_branch", + Input: "master", + Expected: graveler.ParsedRev{ + BaseRev: "master", + Modifiers: make([]graveler.RevModifier, 0), + }, + }, + { + Name: "branch_one_caret", + Input: "master^", + Expected: graveler.ParsedRev{ + BaseRev: "master", + Modifiers: []graveler.RevModifier{ + { + Type: graveler.RevModTypeCaret, + Value: 1, + }, + }, + }, + }, + { + Name: "branch_two_caret", + Input: "master^^", + Expected: graveler.ParsedRev{ + BaseRev: "master", + Modifiers: []graveler.RevModifier{ + { + Type: graveler.RevModTypeCaret, + Value: 1, + }, + { + Type: graveler.RevModTypeCaret, + Value: 1, + }, + }, + }, + }, + { + Name: "branch_two_caret_one_qualified", + Input: "master^2^", + Expected: graveler.ParsedRev{ + BaseRev: "master", + Modifiers: []graveler.RevModifier{ + { + Type: graveler.RevModTypeCaret, + Value: 2, + }, + { + Type: graveler.RevModTypeCaret, + Value: 1, + }, + }, + }, + }, + { + Name: "branch_tilde_caret_tilde", + Input: "master~^~3", + Expected: graveler.ParsedRev{ + BaseRev: "master", + Modifiers: []graveler.RevModifier{ + { + Type: graveler.RevModTypeTilde, + Value: 1, + }, + { + Type: graveler.RevModTypeCaret, + Value: 1, + }, + { + Type: graveler.RevModTypeTilde, + Value: 3, + }, + }, + }, + }, + { + Name: "no_base", + Input: "^^^3", + ExpectedErr: graveler.ErrInvalidRef, + }, + { + Name: "non_numeric_qualifier", + Input: "master^a", + ExpectedErr: graveler.ErrInvalidRef, + }, + } + + for _, cas := range table { + t.Run(cas.Name, func(t *testing.T) { + got, err := graveler.RevParse(graveler.Ref(cas.Input)) + if cas.ExpectedErr != nil { + if !errors.Is(err, cas.ExpectedErr) { + t.Fatalf("expected error of type: %s, got %v", cas.ExpectedErr, err) + } + return + } else if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got.BaseRev != cas.Expected.BaseRev { + t.Fatalf("expected base rev: %s got %s", cas.Expected.BaseRev, got.BaseRev) + } + + if len(got.Modifiers) != len(cas.Expected.Modifiers) { + t.Fatalf("got wrong number of modifiers, expected %d got %d", + len(cas.Expected.Modifiers), len(got.Modifiers)) + } + + for i, m := range got.Modifiers { + if m.Type != cas.Expected.Modifiers[i].Type { + t.Fatalf("unexpected modifier at index %d: expected type %d got %d", + i, cas.Expected.Modifiers[i].Type, m.Type) + } + if m.Value != cas.Expected.Modifiers[i].Value { + t.Fatalf("unexpected modifier at index %d: expected value %d got %d", + i, cas.Expected.Modifiers[i].Value, m.Value) + } + } + }) + } +} diff --git a/graveler/staging_test.go b/graveler/staging_test.go index 6439cd07f8c..b3db89e4f7c 100644 --- a/graveler/staging_test.go +++ b/graveler/staging_test.go @@ -1,4 +1,4 @@ -package graveler +package graveler_test import ( "bytes" @@ -8,13 +8,14 @@ import ( "testing" "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/graveler" "github.com/treeverse/lakefs/testutil" ) -func newTestStagingManager(t *testing.T) StagingManager { +func newTestStagingManager(t *testing.T) graveler.StagingManager { t.Helper() conn, _ := testutil.GetDB(t, databaseURI) - return NewStagingManager(conn) + return graveler.NewStagingManager(conn) } func TestSetGet(t *testing.T) { @@ -120,14 +121,14 @@ func TestDrop(t *testing.T) { func TestList(t *testing.T) { s := newTestStagingManager(t) for _, numOfValues := range []int{1, 100, 1000, 1500, 2500} { - token := StagingToken(fmt.Sprintf("t_%d", numOfValues)) + token := graveler.StagingToken(fmt.Sprintf("t_%d", numOfValues)) for i := 0; i < numOfValues; i++ { err := s.Set(context.Background(), token, []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) if err != nil { t.Fatalf("got unexpected error: %v", err) } } - res := make([]*ValueRecord, 0, numOfValues) + res := make([]*graveler.ValueRecord, 0, numOfValues) it, _ := s.List(context.Background(), token) for it.Next() { res = append(res, it.Value()) @@ -192,12 +193,12 @@ func TestNilIdentity(t *testing.T) { if err != nil { t.Fatalf("got unexpected error: %v", err) } - err = s.Set(context.Background(), "t1", []byte("key1"), Value{ + err = s.Set(context.Background(), "t1", []byte("key1"), graveler.Value{ Identity: nil, Data: []byte("value1"), }) - if !errors.Is(err, ErrInvalidValue) { - t.Fatalf("got unexpected error. expected=%v, got=%v", ErrInvalidValue, err) + if !errors.Is(err, graveler.ErrInvalidValue) { + t.Fatalf("got unexpected error. expected=%v, got=%v", graveler.ErrInvalidValue, err) } e, err := s.Get(context.Background(), "t1", []byte("key1")) if err != nil { @@ -215,7 +216,7 @@ func TestDeleteAndTombstone(t *testing.T) { if !errors.Is(err, db.ErrNotFound) { t.Fatalf("error different than expected. expected=%v, got=%v", db.ErrNotFound, err) } - tombstoneValues := []Value{ + tombstoneValues := []graveler.Value{ { Identity: []byte("identity1"), Data: make([]byte, 0), @@ -276,8 +277,8 @@ func TestDeleteAndTombstone(t *testing.T) { } } -func newTestValue(identity, data string) Value { - return Value{ +func newTestValue(identity, data string) graveler.Value { + return graveler.Value{ Identity: []byte(identity), Data: []byte(data), } diff --git a/ident/ident.go b/ident/ident.go new file mode 100644 index 00000000000..c3e3dfd4d86 --- /dev/null +++ b/ident/ident.go @@ -0,0 +1,92 @@ +package ident + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "hash" + "sort" +) + +type Identifiable interface { + Identity() []byte +} + +func ContentAddress(entity Identifiable) string { + return hex.EncodeToString(entity.Identity()) +} + +type AddressType uint8 + +const ( + AddressTypeBytes AddressType = iota + AddressTypeString + AddressTypeInt64 + AddressTypeStringSlice + AddressTypeStringMap + AddressTypeEmbeddedIdentifiable +) + +type AddressWriter struct { + hash.Hash +} + +func NewAddressWriter() *AddressWriter { + return &AddressWriter{sha256.New()} +} + +func (b *AddressWriter) marshalType(addressType AddressType) { + _, _ = b.Write([]byte{byte(addressType)}) +} + +func (b *AddressWriter) MarshalBytes(v []byte) { + b.marshalType(AddressTypeBytes) + b.MarshalInt64(int64(len(v))) + _, _ = b.Write(v) +} + +func (b *AddressWriter) MarshalString(v string) { + b.marshalType(AddressTypeString) + b.MarshalInt64(int64(len(v))) + _, _ = b.Write([]byte(v)) +} + +func (b *AddressWriter) MarshalInt64(v int64) { + b.marshalType(AddressTypeInt64) + _, _ = b.Write([]byte{8}) + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(v)) + _, _ = b.Write(bytes) +} + +func (b *AddressWriter) MarshalStringSlice(v []string) { + b.marshalType(AddressTypeStringSlice) + b.MarshalInt64(int64(len(v))) + for _, item := range v { + b.MarshalString(item) + } +} + +func (b *AddressWriter) MarshalStringMap(v map[string]string) { + b.marshalType(AddressTypeStringMap) + b.MarshalInt64(int64(len(v))) + keys := make([]string, len(v)) + i := 0 + for k := range v { + keys[i] = k + } + sort.Strings(keys) + for _, k := range keys { + b.MarshalString(k) + b.MarshalString(v[k]) + } +} + +func (b *AddressWriter) MarshalIdentifiable(v Identifiable) { + b.marshalType(AddressTypeEmbeddedIdentifiable) + b.MarshalBytes(v.Identity()) +} + +func (b *AddressWriter) Identity() []byte { + return b.Sum(nil) +}