Skip to content

Commit

Permalink
Entry catalog enable first commit and listing using graveler (#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored Jan 11, 2021
1 parent 7089ff5 commit 66a3989
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 115 deletions.
3 changes: 1 addition & 2 deletions cache/reference_counted.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync/atomic"

"github.com/hnlq715/golang-lru/simplelru"

"github.com/treeverse/lakefs/logging"
)

Expand All @@ -35,7 +34,7 @@ type ParamsWithDisposal struct {
}

// shardedCacheWithDisposal shards a CacheWithDisposal across its keys. It requires that its
// keyus have a String() method that is compatible with their equality, i.e. k1.GoString() ==
// keys have a String() method that is compatible with their equality, i.e. k1.GoString() ==
// k2.GoString() implies k1 equals k2 in the sense of map comparison.
type shardedCacheWithDisposal struct {
name string
Expand Down
26 changes: 26 additions & 0 deletions graveler/committed/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,29 @@ func (rvi *iterator) SeekGE(key graveler.Key) {
// Ready to call Next to see values.
rvi.err = rvi.it.Err()
}

type emptyIterator struct{}

func NewEmptyIterator() Iterator {
return &emptyIterator{}
}

func (e *emptyIterator) Next() bool {
return false
}

func (e *emptyIterator) NextRange() bool {
return false
}

func (e *emptyIterator) Value() (*graveler.ValueRecord, *Range) {
return nil, nil
}

func (e *emptyIterator) SeekGE(graveler.Key) {}

func (e *emptyIterator) Err() error {
return nil
}

func (e *emptyIterator) Close() {}
6 changes: 5 additions & 1 deletion graveler/committed/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ func (c *committedManager) Get(ctx context.Context, ns graveler.StorageNamespace
}

func (c *committedManager) List(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) {
panic("implement me")
it, err := c.metaRangeManager.NewMetaRangeIterator(ns, rangeID)
if err != nil {
return nil, err
}
return NewValueIterator(it), nil
}

func (c *committedManager) WriteMetaRange(ctx context.Context, ns graveler.StorageNamespace, it graveler.ValueIterator) (*graveler.MetaRangeID, error) {
Expand Down
3 changes: 3 additions & 0 deletions graveler/committed/meta_range_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (m *metaRangeManager) NewWriter(ns graveler.StorageNamespace) MetaRangeWrit
}

func (m *metaRangeManager) NewMetaRangeIterator(ns graveler.StorageNamespace, id graveler.MetaRangeID) (Iterator, error) {
if id == "" {
return NewEmptyIterator(), nil
}
rangesIt, err := m.metaManager.NewRangeIterator(Namespace(ns), ID(id))
if err != nil {
return nil, fmt.Errorf("manage metarange %s: %w", id, err)
Expand Down
39 changes: 39 additions & 0 deletions graveler/committed/value_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package committed

import "github.com/treeverse/lakefs/graveler"

type valueIterator struct {
it Iterator
}

func (v *valueIterator) Next() bool {
for v.it.Next() {
if val, _ := v.it.Value(); val != nil {
return true
}
}
return false
}

func (v *valueIterator) SeekGE(id graveler.Key) {
v.it.SeekGE(id)
}

func (v *valueIterator) Value() *graveler.ValueRecord {
rec, _ := v.it.Value()
return rec
}

func (v *valueIterator) Err() error {
return v.it.Err()
}

func (v *valueIterator) Close() {
v.it.Close()
}

func NewValueIterator(it Iterator) graveler.ValueIterator {
return &valueIterator{
it: it,
}
}
5 changes: 4 additions & 1 deletion graveler/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ var (
ErrInvalidBranchID = fmt.Errorf("branch id: %w", ErrInvalidValue)
ErrInvalidRef = fmt.Errorf("ref: %w", ErrInvalidValue)
ErrInvalidCommitID = fmt.Errorf("commit id: %w", ErrInvalidValue)
ErrCommitNotFound = fmt.Errorf("commit: %w", ErrNotFound)
ErrCommitNotFound = fmt.Errorf("commit %w", ErrNotFound)
ErrRepositoryNotFound = fmt.Errorf("repository %w", ErrNotFound)
ErrBranchNotFound = fmt.Errorf("branch %w", ErrNotFound)
ErrTagNotFound = fmt.Errorf("tag %w", ErrNotFound)
ErrRefAmbiguous = fmt.Errorf("reference is ambiguous: %w", ErrNotFound)
ErrConflictFound = errors.New("conflict found")
ErrBranchExists = errors.New("branch already exists")
Expand Down
67 changes: 50 additions & 17 deletions graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -55,7 +56,6 @@ type Ref string
// TagID represents a named tag pointing at a commit
type TagID string

// CommitParents commit's parents slice
type CommitParents []CommitID

// BranchID is an identifier for a branch
Expand Down Expand Up @@ -744,12 +744,17 @@ func (g *graveler) List(ctx context.Context, repositoryID RepositoryID, ref Ref)
if err != nil {
return nil, err
}
commit, err := g.RefManager.GetCommit(ctx, repositoryID, reference.CommitID())
if err != nil {
return nil, err
commitID := reference.CommitID()
var metaRangeID MetaRangeID
if commitID != "" {
commit, err := g.RefManager.GetCommit(ctx, repositoryID, commitID)
if err != nil {
return nil, err
}
metaRangeID = commit.MetaRangeID
}

listing, err := g.CommittedManager.List(ctx, repo.StorageNamespace, commit.MetaRangeID)
listing, err := g.CommittedManager.List(ctx, repo.StorageNamespace, metaRangeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -777,42 +782,65 @@ func (g *graveler) Commit(ctx context.Context, repositoryID RepositoryID, branch
if err != nil {
return "", fmt.Errorf("get branch: %w", err)
}
commit, err := g.RefManager.GetCommit(ctx, repositoryID, branch.CommitID)
if err != nil {
return "", fmt.Errorf("get commit: %w", err)
var branchMetaRangeID MetaRangeID
if branch.CommitID != "" {
commit, err := g.RefManager.GetCommit(ctx, repositoryID, branch.CommitID)
if err != nil {
return "", fmt.Errorf("get commit: %w", err)
}
branchMetaRangeID = commit.MetaRangeID
}

changes, err := g.StagingManager.List(ctx, branch.StagingToken)
if err != nil {
return "", fmt.Errorf("staging list: %w", err)
}
metaRangeID, err := g.CommittedManager.Apply(ctx, repo.StorageNamespace, commit.MetaRangeID, changes)
metaRangeID, err := g.CommittedManager.Apply(ctx, repo.StorageNamespace, branchMetaRangeID, changes)
if err != nil {
return "", fmt.Errorf("apply: %w", err)
}
newCommit, err := g.RefManager.AddCommit(ctx, repositoryID, Commit{

// fill and add commit
commit := Commit{
Committer: committer,
Message: message,
MetaRangeID: metaRangeID,
CreationDate: time.Now(),
Parents: CommitParents{branch.CommitID},
Metadata: metadata,
})
}
if branch.CommitID != "" {
commit.Parents = CommitParents{branch.CommitID}
}

newCommit, err := g.RefManager.AddCommit(ctx, repositoryID, commit)
if err != nil {
return "", fmt.Errorf("add commit: %w", err)
}
err = g.RefManager.SetBranch(ctx, repositoryID, branchID, Branch{
CommitID: newCommit,
StagingToken: newStagingToken(repositoryID, branchID),
})
if err != nil {
return "", fmt.Errorf("set branch commit %s: %w", newCommit, err)
}
err = g.StagingManager.Drop(ctx, branch.StagingToken)
if err != nil {
g.log.WithContext(ctx).WithFields(logging.Fields{
"repository_id": repositoryID,
"branch_id": branchID,
"commit_id": *commit,
"commit_id": branch.CommitID,
"message": message,
"staging_token": branch.StagingToken,
}).Error("Failed to drop staging data")
}
return newCommit, nil
}

func newStagingToken(repositoryID RepositoryID, branchID BranchID) StagingToken {
v := strings.Join([]string{repositoryID.String(), branchID.String(), uuid.New().String()}, "-")
return StagingToken(v)
}

func (g *graveler) CommitExistingMetaRange(ctx context.Context, repositoryID RepositoryID, branchID BranchID, metaRangeID MetaRangeID, committer string, message string, metadata Metadata) (CommitID, error) {
cancel, err := g.branchLocker.AquireMetadataUpdate(repositoryID, branchID)
if err != nil {
Expand Down Expand Up @@ -960,15 +988,20 @@ func (g *graveler) DiffUncommitted(ctx context.Context, repositoryID RepositoryI
if err != nil {
return nil, err
}
commit, err := g.RefManager.GetCommit(ctx, repositoryID, branch.CommitID)
if err != nil {
return nil, err
var metaRangeID MetaRangeID
if branch.CommitID != "" {
commit, err := g.RefManager.GetCommit(ctx, repositoryID, branch.CommitID)
if err != nil {
return nil, err
}
metaRangeID = commit.MetaRangeID
}

valueIterator, err := g.StagingManager.List(ctx, branch.StagingToken)
if err != nil {
return nil, err
}
return NewUncommittedDiffIterator(ctx, g.CommittedManager, valueIterator, repo.StorageNamespace, commit.MetaRangeID), nil
return NewUncommittedDiffIterator(ctx, g.CommittedManager, valueIterator, repo.StorageNamespace, metaRangeID), nil
}

func (g *graveler) getCommitRecordFromRef(ctx context.Context, repositoryID RepositoryID, ref Ref) (*CommitRecord, error) {
Expand Down
11 changes: 6 additions & 5 deletions graveler/graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestGraveler_DiffUncommitted(t *testing.T) {
name: "no changes",
r: graveler.NewGraveler(&testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}}), Err: graveler.ErrNotFound},
&testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{})},
&testutil.RefsFake{Branch: &graveler.Branch{}, Commit: &graveler.Commit{}},
&testutil.RefsFake{Branch: &graveler.Branch{CommitID: "c1"}, Commit: &graveler.Commit{MetaRangeID: "mri1"}},
),
amount: 10,
expectedDiff: testutil.NewDiffIter([]graveler.Diff{}),
Expand All @@ -168,7 +168,7 @@ func TestGraveler_DiffUncommitted(t *testing.T) {
name: "added one",
r: graveler.NewGraveler(&testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{}), Err: graveler.ErrNotFound},
&testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})},
&testutil.RefsFake{Branch: &graveler.Branch{}, Commit: &graveler.Commit{}},
&testutil.RefsFake{Branch: &graveler.Branch{CommitID: "c1"}, Commit: &graveler.Commit{MetaRangeID: "mri1"}},
),
amount: 10,
expectedDiff: testutil.NewDiffIter([]graveler.Diff{{
Expand All @@ -181,7 +181,7 @@ func TestGraveler_DiffUncommitted(t *testing.T) {
name: "changed one",
r: graveler.NewGraveler(&testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})},
&testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})},
&testutil.RefsFake{Branch: &graveler.Branch{}, Commit: &graveler.Commit{}},
&testutil.RefsFake{Branch: &graveler.Branch{CommitID: "c1"}, Commit: &graveler.Commit{MetaRangeID: "mri1"}},
),
amount: 10,
expectedDiff: testutil.NewDiffIter([]graveler.Diff{{
Expand All @@ -194,7 +194,7 @@ func TestGraveler_DiffUncommitted(t *testing.T) {
name: "removed one",
r: graveler.NewGraveler(&testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})},
&testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: nil}})},
&testutil.RefsFake{Branch: &graveler.Branch{}, Commit: &graveler.Commit{}},
&testutil.RefsFake{Branch: &graveler.Branch{CommitID: "c1"}, Commit: &graveler.Commit{MetaRangeID: "mri1"}},
),
amount: 10,
expectedDiff: testutil.NewDiffIter([]graveler.Diff{{
Expand All @@ -206,7 +206,8 @@ func TestGraveler_DiffUncommitted(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff, err := tt.r.DiffUncommitted(context.Background(), "repo", "branch")
ctx := context.Background()
diff, err := tt.r.DiffUncommitted(ctx, "repo", "branch")
if err != tt.expectedErr {
t.Fatalf("wrong error, expected:%s got:%s", tt.expectedErr, err)
}
Expand Down
22 changes: 14 additions & 8 deletions graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m *Manager) GetRepository(ctx context.Context, repositoryID graveler.Repos
return repository, nil
}, db.ReadOnly(), db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
return nil, graveler.ErrRepositoryNotFound
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -79,6 +79,9 @@ func (m *Manager) DeleteRepository(ctx context.Context, repositoryID graveler.Re
_, err = tx.Exec(`DELETE FROM graveler_repositories WHERE id = $1`, repositoryID)
return nil, err
}, db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return graveler.ErrRepositoryNotFound
}
return err
}

Expand All @@ -100,7 +103,7 @@ func (m *Manager) GetBranch(ctx context.Context, repositoryID graveler.Repositor
}, nil
}, db.ReadOnly(), db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
return nil, graveler.ErrBranchNotFound
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -134,6 +137,9 @@ func (m *Manager) DeleteBranch(ctx context.Context, repositoryID graveler.Reposi
}
return nil, nil
}, db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return graveler.ErrBranchNotFound
}
return err
}

Expand All @@ -152,7 +158,7 @@ func (m *Manager) GetTag(ctx context.Context, repositoryID graveler.RepositoryID
return &commitID, nil
}, db.ReadOnly(), db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
return nil, graveler.ErrTagNotFound
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -189,6 +195,9 @@ func (m *Manager) DeleteTag(ctx context.Context, repositoryID graveler.Repositor
}
return nil, nil
}, db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return graveler.ErrTagNotFound
}
return err
}

Expand Down Expand Up @@ -228,7 +237,7 @@ func (m *Manager) GetCommitByPrefix(ctx context.Context, repositoryID graveler.R
return startWith[0], nil
}, db.ReadOnly(), db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
return nil, graveler.ErrCommitNotFound
}
if err != nil {
return nil, err
Expand All @@ -243,16 +252,13 @@ func (m *Manager) GetCommit(ctx context.Context, repositoryID graveler.Repositor
SELECT committer, message, creation_date, parents, meta_range_id, metadata
FROM graveler_commits WHERE repository_id = $1 AND id = $2`,
repositoryID, commitID)
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
}
if err != nil {
return nil, err
}
return rec.toGravelerCommit(), nil
}, db.ReadOnly(), db.WithContext(ctx))
if errors.Is(err, db.ErrNotFound) {
return nil, graveler.ErrNotFound
return nil, graveler.ErrCommitNotFound
}
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 66a3989

Please sign in to comment.