diff --git a/export/export_handler.go b/export/export_handler.go index 014f5027a30..03f9f6d232a 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -92,7 +92,7 @@ func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfig if diffFromBase { diffs, hasMore, err = getDiffFromBase(context.Background(), startData.Repo, startData.ToCommitRef, after, limit, h.cataloger) } else { - // Todo(guys) change this to work with diff iterator once it is available outside of cataloger + // TODO(guys) change this to work with diff iterator once it is available outside of cataloger diffs, hasMore, err = h.cataloger.Diff(context.Background(), startData.Repo, startData.ToCommitRef, startData.FromCommitRef, catalog.DiffParams{ Limit: limit, After: after, diff --git a/graveler/combined_iterator.go b/graveler/combined_iterator.go new file mode 100644 index 00000000000..f86ff8d407c --- /dev/null +++ b/graveler/combined_iterator.go @@ -0,0 +1,100 @@ +package graveler + +import "bytes" + +// combinedIterator iterates over two listing iterators, +// in case of duplication (in values or in errors) returns value in iterA +type combinedIterator struct { + iterA ListingIterator + iterB ListingIterator + p ListingIterator +} + +func NewCombinedIterator(iterA, iterB ListingIterator) ListingIterator { + return &combinedIterator{ + iterA: iterA, + iterB: iterB, + p: nil, + } +} + +func (c *combinedIterator) Next() bool { + // call next with the relevant iterators + valA := c.iterA.Value() + valB := c.iterB.Value() + + switch { + case c.p == nil: + // first + c.iterA.Next() + c.iterB.Next() + case valA == nil && valB == nil: + // last + return false + case valA == nil: + c.p = c.iterB + return c.iterB.Next() + case valB == nil: + c.p = c.iterA + return c.iterA.Next() + case bytes.Equal(valA.Key, valB.Key): + c.iterA.Next() + c.iterB.Next() + case bytes.Compare(valA.Key, valB.Key) < 0: + c.iterA.Next() + default: + // value of iterA < value of iterB + c.iterB.Next() + } + + if c.iterA.Err() != nil { + c.p = c.iterA + return false + } + if c.iterB.Err() != nil { + c.p = c.iterB + return false + } + // get the current pointer + valA = c.iterA.Value() + valB = c.iterB.Value() + switch { + case valA == nil && valB == nil: + c.p = c.iterA // in order not to be stuck in start state + return false + case valA == nil: + c.p = c.iterB + case valB == nil: + c.p = c.iterA + case bytes.Compare(valA.Key, valB.Key) <= 0: + c.p = c.iterA + default: + c.p = c.iterB + } + return true +} + +func (c *combinedIterator) SeekGE(id Key) { + c.p = nil + c.iterA.SeekGE(id) + c.iterB.SeekGE(id) +} + +func (c *combinedIterator) Value() *Listing { + if c.p == nil { + return nil + } + return c.p.Value() +} + +func (c *combinedIterator) Err() error { + if c.p == nil { + return nil + } + return c.p.Err() +} + +func (c *combinedIterator) Close() { + c.iterA.Close() + c.iterB.Close() +} diff --git a/graveler/graveler.go b/graveler/graveler.go index cb4d2af40eb..1e6b78b1676 100644 --- a/graveler/graveler.go +++ b/graveler/graveler.go @@ -9,6 +9,7 @@ import ( "regexp" "strings" + "github.com/google/uuid" "github.com/treeverse/lakefs/ident" "github.com/treeverse/lakefs/catalog" @@ -202,13 +203,13 @@ type VersionController interface { DeleteRepository(ctx context.Context, repositoryID RepositoryID) error // CreateBranch creates branch on repository pointing to ref - CreateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (Branch, error) + CreateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (*Branch, error) // UpdateBranch updates branch on repository pointing to ref - UpdateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (Branch, error) + UpdateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (*Branch, error) // GetBranch gets branch information by branch / repository id - GetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (Branch, error) + GetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (*Branch, error) // Log returns an iterator starting at commit ID up to repository root Log(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (CommitIterator, error) @@ -421,7 +422,9 @@ var ( ErrInvalidRef = fmt.Errorf("ref: %w", ErrInvalidValue) ErrInvalidCommitID = fmt.Errorf("commit id: %w", ErrInvalidValue) ErrCommitNotFound = fmt.Errorf("commit: %w", ErrNotFound) - ErrCommitIDAmbiguous = fmt.Errorf("commit ID is ambiguous: %w", ErrNotFound) + ErrRefAmbiguous = fmt.Errorf("reference is ambiguous: %w", ErrNotFound) + ErrConflictFound = errors.New("conflict found") + ErrBranchExists = errors.New("branch already exists") ) func NewRepositoryID(id string) (RepositoryID, error) { @@ -488,3 +491,326 @@ func NewCommitID(id string) (CommitID, error) { func (id CommitID) String() string { return string(id) } + +type graveler struct { + CommittedManager CommittedManager + StagingManager StagingManager + RefManager RefManager +} + +func NewGraveler(committedManager CommittedManager, stagingManager StagingManager, refManager RefManager) Graveler { + return &graveler{ + CommittedManager: committedManager, + StagingManager: stagingManager, + RefManager: refManager, + } +} + +func (g *graveler) GetRepository(ctx context.Context, repositoryID RepositoryID) (*Repository, error) { + return g.RefManager.GetRepository(ctx, repositoryID) +} + +func (g *graveler) CreateRepository(ctx context.Context, repositoryID RepositoryID, storageNamespace StorageNamespace, branchID BranchID) (*Repository, error) { + repo := Repository{ + StorageNamespace: storageNamespace, + CreationDate: time.Now(), + DefaultBranchID: branchID, + } + branch := Branch{ + stagingToken: generateStagingToken(repositoryID, branchID), + } + err := g.RefManager.CreateRepository(ctx, repositoryID, repo, branch) + if err != nil { + return nil, err + } + return &repo, nil +} + +func (g *graveler) ListRepositories(ctx context.Context, from RepositoryID) (RepositoryIterator, error) { + return g.RefManager.ListRepositories(ctx, from) +} + +func (g *graveler) DeleteRepository(ctx context.Context, repositoryID RepositoryID) error { + return g.RefManager.DeleteRepository(ctx, repositoryID) +} + +func (g *graveler) GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error) { + return g.RefManager.GetCommit(ctx, repositoryID, commitID) +} + +func generateStagingToken(repositoryID RepositoryID, branchID BranchID) StagingToken { + // TODO(Guys): initial implementation, change this + uid := uuid.New().String() + return StagingToken(fmt.Sprintf("%s-%s:%s", repositoryID, branchID, uid)) +} + +func (g *graveler) CreateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (*Branch, error) { + // check if branch exists + _, err := g.RefManager.GetBranch(ctx, repositoryID, branchID) + if !errors.Is(err, ErrNotFound) { + if err == nil { + err = ErrBranchExists + } + return nil, err + } + + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return nil, err + } + + newBranch := Branch{ + CommitID: reference.CommitID(), + stagingToken: generateStagingToken(repositoryID, branchID), + } + err = g.RefManager.SetBranch(ctx, repositoryID, branchID, newBranch) + if err != nil { + return nil, err + } + return &newBranch, nil +} + +func (g *graveler) UpdateBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (*Branch, error) { + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return nil, err + } + + curBranch, err := g.RefManager.GetBranch(ctx, repositoryID, branchID) + if err != nil { + return nil, err + } + // validate no conflict + // TODO(Guys) return error only on conflicts, currently returns error for any changes on staging + iter, err := g.StagingManager.List(ctx, curBranch.stagingToken) + if err != nil { + return nil, err + } + defer iter.Close() + if iter.Next() { + return nil, ErrConflictFound + } + + newBranch := Branch{ + CommitID: reference.CommitID(), + stagingToken: curBranch.stagingToken, + } + err = g.RefManager.SetBranch(ctx, repositoryID, branchID, newBranch) + if err != nil { + return nil, err + } + return &newBranch, nil +} + +func (g *graveler) GetBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (*Branch, error) { + return g.RefManager.GetBranch(ctx, repositoryID, branchID) +} + +func (g *graveler) Dereference(ctx context.Context, repositoryID RepositoryID, ref Ref) (CommitID, error) { + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return "", err + } + return reference.CommitID(), nil +} + +func (g *graveler) Log(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (CommitIterator, error) { + return g.RefManager.Log(ctx, repositoryID, commitID) +} + +func (g *graveler) ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error) { + return g.RefManager.ListBranches(ctx, repositoryID, from) +} + +func (g *graveler) DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error { + branch, err := g.RefManager.GetBranch(ctx, repositoryID, branchID) + if err != nil { + return err + } + err = g.StagingManager.Drop(ctx, branch.stagingToken) + if err != nil && !errors.Is(err, ErrNotFound) { + return err + } + return g.RefManager.DeleteBranch(ctx, repositoryID, branchID) +} + +func (g *graveler) Get(ctx context.Context, repositoryID RepositoryID, ref Ref, key Key) (*Value, error) { + repo, err := g.RefManager.GetRepository(ctx, repositoryID) + if err != nil { + return nil, err + } + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return nil, err + } + if reference.Type() == ReferenceTypeBranch { + // try to get from staging, if not found proceed to committed + branch := reference.Branch() + value, err := g.StagingManager.Get(ctx, branch.stagingToken, key) + if !errors.Is(err, ErrNotFound) { + if err != nil { + return nil, err + } + if value == nil { + // tombstone + return nil, ErrNotFound + } + return value, nil + } + } + commitID := reference.CommitID() + commit, err := g.RefManager.GetCommit(ctx, repositoryID, commitID) + if err != nil { + return nil, err + } + return g.CommittedManager.Get(ctx, repo.StorageNamespace, commit.TreeID, key) +} + +func (g *graveler) Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value) error { + branch, err := g.GetBranch(ctx, repositoryID, branchID) + if err != nil { + return err + } + return g.StagingManager.Set(ctx, branch.stagingToken, key, value) +} + +func (g *graveler) Delete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key) error { + branch, err := g.GetBranch(ctx, repositoryID, branchID) + if err != nil { + return err + } + return g.StagingManager.Delete(ctx, branch.stagingToken, key) +} + +func (g *graveler) List(ctx context.Context, repositoryID RepositoryID, ref Ref, prefix, from, delimiter Key) (ListingIterator, error) { + repo, err := g.RefManager.GetRepository(ctx, repositoryID) + if err != nil { + return nil, err + } + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return nil, err + } + commit, err := g.RefManager.GetCommit(ctx, repositoryID, reference.CommitID()) + if err != nil { + return nil, err + } + + var listing ListingIterator + committedValues, err := g.CommittedManager.List(ctx, repo.StorageNamespace, commit.TreeID, from) + if err != nil { + return nil, err + } + if reference.Type() == ReferenceTypeBranch { + stagingList, err := g.StagingManager.List(ctx, reference.Branch().stagingToken) + if err != nil { + return nil, err + } + listing = NewCombinedIterator(NewListingIterator(NewPrefixIterator(stagingList, prefix), delimiter, prefix), NewListingIterator(NewPrefixIterator(committedValues, prefix), delimiter, prefix)) + } else { + listing = NewListingIterator(NewPrefixIterator(committedValues, prefix), delimiter, prefix) + } + return listing, nil +} + +func (g *graveler) Commit(ctx context.Context, repositoryID RepositoryID, branchID BranchID, committer string, message string, metadata Metadata) (CommitID, error) { + panic("implement me") +} + +func (g *graveler) Reset(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error { + panic("implement me") // waiting for staging reset +} + +func (g *graveler) Revert(ctx context.Context, repositoryID RepositoryID, branchID BranchID, ref Ref) (CommitID, error) { + panic("implement me") +} + +func (g *graveler) Merge(ctx context.Context, repositoryID RepositoryID, from Ref, to BranchID) (CommitID, error) { + repo, err := g.RefManager.GetRepository(ctx, repositoryID) + if err != nil { + return "", err + } + + fromCommit, err := g.getCommitRecordFromRef(ctx, repositoryID, from) + if err != nil { + return "", err + } + toCommit, err := g.getCommitRecordFromRef(ctx, repositoryID, Ref(to)) + if err != nil { + return "", err + } + baseCommit, err := g.RefManager.FindMergeBase(ctx, repositoryID, fromCommit.CommitID, toCommit.CommitID) + if err != nil { + return "", err + } + + treeID, err := g.CommittedManager.Merge(ctx, repo.StorageNamespace, fromCommit.TreeID, toCommit.TreeID, baseCommit.TreeID) + if err != nil { + return "", err + } + commit := Commit{ + Committer: "unknown", // TODO(Guys): pass committer or enter default value + Message: "merge message", // TODO(Guys): get merge message + TreeID: treeID, + CreationDate: time.Time{}, + Parents: []CommitID{fromCommit.CommitID, toCommit.CommitID}, + Metadata: nil, // TODO(Guys): pass metadata + } + return g.RefManager.AddCommit(ctx, repositoryID, commit) +} + +func (g *graveler) DiffUncommitted(ctx context.Context, repositoryID RepositoryID, branchID BranchID, from Key) (DiffIterator, error) { + repo, err := g.RefManager.GetRepository(ctx, repositoryID) + if err != nil { + return nil, err + } + branch, err := g.RefManager.GetBranch(ctx, repositoryID, branchID) + if err != nil { + return nil, err + } + commit, err := g.RefManager.GetCommit(ctx, repositoryID, branch.CommitID) + if err != nil { + return nil, err + } + valueIterator, err := g.StagingManager.List(ctx, branch.stagingToken) + if err != nil { + return nil, err + } + return NewUncommittedDiffIterator(ctx, g.CommittedManager, valueIterator, repo.StorageNamespace, commit.TreeID), nil +} + +func (g *graveler) getCommitRecordFromRef(ctx context.Context, repositoryID RepositoryID, ref Ref) (*CommitRecord, error) { + reference, err := g.RefManager.RevParse(ctx, repositoryID, ref) + if err != nil { + return nil, err + } + commit, err := g.RefManager.GetCommit(ctx, repositoryID, reference.CommitID()) + if err != nil { + return nil, err + } + return &CommitRecord{ + CommitID: reference.CommitID(), + Commit: commit, + }, nil +} + +func (g *graveler) Diff(ctx context.Context, repositoryID RepositoryID, left, right Ref, from Key) (DiffIterator, error) { + repo, err := g.RefManager.GetRepository(ctx, repositoryID) + if err != nil { + return nil, err + } + leftCommit, err := g.getCommitRecordFromRef(ctx, repositoryID, left) + if err != nil { + return nil, err + } + rightCommit, err := g.getCommitRecordFromRef(ctx, repositoryID, right) + if err != nil { + return nil, err + } + baseCommit, err := g.RefManager.FindMergeBase(ctx, repositoryID, leftCommit.CommitID, rightCommit.CommitID) + if err != nil { + return nil, err + } + + return g.CommittedManager.Diff(ctx, repo.StorageNamespace, leftCommit.TreeID, rightCommit.TreeID, baseCommit.TreeID, from) +} diff --git a/graveler/graveler_mock_test.go b/graveler/graveler_mock_test.go new file mode 100644 index 00000000000..ec39dae886e --- /dev/null +++ b/graveler/graveler_mock_test.go @@ -0,0 +1,338 @@ +package graveler_test + +import ( + "bytes" + "context" + "testing" + + "github.com/go-test/deep" + "github.com/treeverse/lakefs/graveler" +) + +const defaultBranchID = graveler.BranchID("master") + +type committedMock struct { + Value *graveler.Value + ValueIterator graveler.ValueIterator + diffIterator graveler.DiffIterator + err error + treeID graveler.TreeID +} + +func (c *committedMock) Get(_ context.Context, _ graveler.StorageNamespace, _ graveler.TreeID, _ graveler.Key) (*graveler.Value, error) { + if c.err != nil { + return nil, c.err + } + return c.Value, nil +} + +func (c *committedMock) List(_ context.Context, _ graveler.StorageNamespace, _ graveler.TreeID, _ graveler.Key) (graveler.ValueIterator, error) { + if c.err != nil { + return nil, c.err + } + return c.ValueIterator, nil +} + +func (c *committedMock) Diff(_ context.Context, _ graveler.StorageNamespace, _, _, _ graveler.TreeID, _ graveler.Key) (graveler.DiffIterator, error) { + if c.err != nil { + return nil, c.err + } + return c.diffIterator, nil +} + +func (c *committedMock) Merge(_ context.Context, _ graveler.StorageNamespace, _, _, _ graveler.TreeID) (graveler.TreeID, error) { + if c.err != nil { + return "", c.err + } + return c.treeID, nil +} + +func (c *committedMock) Apply(_ context.Context, _ graveler.StorageNamespace, _ graveler.TreeID, _ graveler.ValueIterator) (graveler.TreeID, error) { + if c.err != nil { + return "", c.err + } + return c.treeID, nil +} + +type stagingMock struct { + err error + Value *graveler.Value + ValueIterator graveler.ValueIterator + stagingToken graveler.StagingToken +} + +func (s *stagingMock) Drop(_ context.Context, _ graveler.StagingToken) error { + if s.err != nil { + return s.err + } + return nil +} + +func (s *stagingMock) Get(_ context.Context, _ graveler.StagingToken, _ graveler.Key) (*graveler.Value, error) { + if s.err != nil { + return nil, s.err + } + return s.Value, nil +} + +func (s *stagingMock) Set(_ context.Context, _ graveler.StagingToken, _ graveler.Key, _ graveler.Value) error { + if s.err != nil { + return s.err + } + return nil +} + +func (s *stagingMock) Delete(_ context.Context, _ graveler.StagingToken, _ graveler.Key) error { + return nil +} + +func (s *stagingMock) List(_ context.Context, _ graveler.StagingToken) (graveler.ValueIterator, error) { + if s.err != nil { + return nil, s.err + } + return s.ValueIterator, nil +} + +func (s *stagingMock) Snapshot(_ context.Context, _ graveler.StagingToken) (graveler.StagingToken, error) { + if s.err != nil { + return "", s.err + } + return s.stagingToken, nil +} + +func (s *stagingMock) ListSnapshot(_ context.Context, _ graveler.StagingToken, _ graveler.Key) (graveler.ValueIterator, error) { + if s.err != nil { + return nil, s.err + } + return s.ValueIterator, nil +} + +type mockRefs struct { + listRepositoriesRes graveler.RepositoryIterator + listBranchesRes graveler.BranchIterator + commitIter graveler.CommitIterator + refType graveler.ReferenceType + branch *graveler.Branch + branchErr error +} + +func (m *mockRefs) RevParse(_ context.Context, _ graveler.RepositoryID, _ graveler.Ref) (graveler.Reference, error) { + var branch graveler.BranchID + if m.refType == graveler.ReferenceTypeBranch { + branch = defaultBranchID + } + return newMockReference(m.refType, branch, ""), nil +} + +func (m *mockRefs) GetRepository(_ context.Context, _ graveler.RepositoryID) (*graveler.Repository, error) { + return &graveler.Repository{}, nil +} + +func (m *mockRefs) CreateRepository(_ context.Context, _ graveler.RepositoryID, _ graveler.Repository, _ graveler.Branch) error { + return nil +} + +func (m *mockRefs) ListRepositories(_ context.Context, _ graveler.RepositoryID) (graveler.RepositoryIterator, error) { + return m.listRepositoriesRes, nil +} + +func (m *mockRefs) DeleteRepository(_ context.Context, _ graveler.RepositoryID) error { + return nil +} + +func (m *mockRefs) GetBranch(_ context.Context, _ graveler.RepositoryID, _ graveler.BranchID) (*graveler.Branch, error) { + return m.branch, m.branchErr +} + +func (m *mockRefs) SetBranch(_ context.Context, _ graveler.RepositoryID, _ graveler.BranchID, _ graveler.Branch) error { + return nil +} + +func (m *mockRefs) DeleteBranch(_ context.Context, _ graveler.RepositoryID, _ graveler.BranchID) error { + return nil +} + +func (m *mockRefs) ListBranches(_ context.Context, _ graveler.RepositoryID, _ graveler.BranchID) (graveler.BranchIterator, error) { + return m.listBranchesRes, nil +} + +func (m *mockRefs) GetCommit(_ context.Context, _ graveler.RepositoryID, _ graveler.CommitID) (*graveler.Commit, error) { + return &graveler.Commit{}, nil +} + +func (m *mockRefs) AddCommit(_ context.Context, _ graveler.RepositoryID, _ graveler.Commit) (graveler.CommitID, error) { + return "", nil +} + +func (m *mockRefs) FindMergeBase(_ context.Context, _ graveler.RepositoryID, _ ...graveler.CommitID) (*graveler.Commit, error) { + return &graveler.Commit{}, nil +} + +func (m *mockRefs) Log(_ context.Context, _ graveler.RepositoryID, _ graveler.CommitID) (graveler.CommitIterator, error) { + return m.commitIter, nil +} + +type ListingIter struct { + current int + listings []graveler.Listing + err error +} + +func newListingIter(listings []graveler.Listing) *ListingIter { + return &ListingIter{listings: listings, current: -1} +} + +func (r *ListingIter) Next() bool { + r.current++ + return r.current < len(r.listings) +} + +func (r *ListingIter) SeekGE(id graveler.Key) { + for i, listing := range r.listings { + if bytes.Compare(id, listing.Key) >= 0 { + r.current = i - 1 + } + } + r.current = len(r.listings) +} + +func (r *ListingIter) Value() *graveler.Listing { + if r.current < 0 || r.current >= len(r.listings) { + return nil + } + return &r.listings[r.current] +} + +func (r *ListingIter) Err() error { + return r.err +} + +func (r *ListingIter) Close() { + return +} + +type diffIter struct { + current int + records []graveler.Diff + err error +} + +func newDiffIter(records []graveler.Diff) *diffIter { + return &diffIter{records: records, current: -1} +} +func (r *diffIter) Next() bool { + r.current++ + return r.current < len(r.records) +} + +func (r *diffIter) SeekGE(id graveler.Key) { + for i, record := range r.records { + if bytes.Compare(id, record.Key) >= 0 { + r.current = i - 1 + } + } + r.current = len(r.records) +} + +func (r *diffIter) Value() *graveler.Diff { + if r.current < 0 || r.current >= len(r.records) { + return nil + } + return &r.records[r.current] +} + +func (r *diffIter) Err() error { + return r.err +} + +func (r *diffIter) Close() { + return +} + +type mockValueIterator struct { + current int + records []graveler.ValueRecord + err error +} + +func newMockValueIterator(records []graveler.ValueRecord) graveler.ValueIterator { + return &mockValueIterator{records: records, current: -1} +} + +func (r *mockValueIterator) Next() bool { + r.current++ + return r.current < len(r.records) +} + +func (r *mockValueIterator) SeekGE(id graveler.Key) { + for i, record := range r.records { + if bytes.Compare(record.Key, id) >= 0 { + r.current = i - 1 + return + } + } + r.current = len(r.records) +} + +func (r *mockValueIterator) Value() *graveler.ValueRecord { + if r.current < 0 || r.current >= len(r.records) { + return nil + } + return &r.records[r.current] +} + +func (r *mockValueIterator) Err() error { + return r.err +} + +func (r *mockValueIterator) Close() { + return +} + +type mockReference struct { + refType graveler.ReferenceType + branch graveler.Branch + commitId graveler.CommitID +} + +// newMockReference returns a mockReference +// if branch parameter is empty branch record will be nil +func newMockReference(refType graveler.ReferenceType, branchID graveler.BranchID, commitId graveler.CommitID) *mockReference { + var branch graveler.Branch + if branchID != "" { + branch = graveler.Branch{CommitID: commitId} + + } + return &mockReference{ + refType: refType, + branch: branch, + commitId: commitId, + } +} + +func (m *mockReference) Type() graveler.ReferenceType { + return m.refType +} + +func (m *mockReference) Branch() graveler.Branch { + return m.branch +} + +func (m *mockReference) CommitID() graveler.CommitID { + return m.commitId +} + +func compareListingIterators(t *testing.T, got, expected graveler.ListingIterator) { + t.Helper() + for got.Next() { + if !expected.Next() { + t.Fatalf("got next returned true where expected next returned false") + } + if diff := deep.Equal(got.Value(), expected.Value()); diff != nil { + t.Errorf("unexpected diff %s", diff) + } + } + if expected.Next() { + t.Fatalf("expected next returned true where got next returned false") + } +} diff --git a/graveler/graveler_test.go b/graveler/graveler_test.go new file mode 100644 index 00000000000..ca378d11be2 --- /dev/null +++ b/graveler/graveler_test.go @@ -0,0 +1,449 @@ +package graveler_test + +import ( + "context" + "errors" + "testing" + + "github.com/go-test/deep" + "github.com/treeverse/lakefs/graveler" +) + +func TestGraveler_PrefixIterator(t *testing.T) { + tests := []struct { + name string + valueIter graveler.ValueIterator + prefix []byte + seekTo []byte + expectedPrefixIter graveler.ValueIterator + }{ + { + name: "no prefix", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("foo")}}), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("foo")}}), + }, + { + name: "no files", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("other/path/foo")}}), + prefix: []byte("path/"), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{}), + }, + { + name: "one file", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("path/foo")}}), + prefix: []byte("path/"), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("path/foo"), Value: nil}}), + }, + { + name: "one file in prefix", + prefix: []byte("path/"), + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("before/foo")}, {Key: []byte("path/foo")}, {Key: []byte("last/foo")}}), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("path/foo"), Value: nil}}), + }, + { + name: "seek before", + prefix: []byte("path/"), + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("before/foo")}, {Key: []byte("path/foo")}, {Key: []byte("last/foo")}}), + seekTo: []byte("before/"), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("path/foo"), Value: nil}}), + }, + { + name: "seek after", + prefix: []byte("path/"), + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: []byte("before/foo")}, {Key: []byte("path/foo")}, {Key: []byte("z_after/foo")}}), + seekTo: []byte("z_after/"), + expectedPrefixIter: newMockValueIterator([]graveler.ValueRecord{}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + prefixIter := graveler.NewPrefixIterator(tt.valueIter, tt.prefix) + defer prefixIter.Close() + prefixIter.SeekGE(tt.seekTo) + // compare iterators + for prefixIter.Next() { + if !tt.expectedPrefixIter.Next() { + t.Fatalf("listing next returned true where expected listing next returned false") + } + if diff := deep.Equal(prefixIter.Value(), tt.expectedPrefixIter.Value()); diff != nil { + t.Errorf("unexpected diff %s", diff) + } + } + if tt.expectedPrefixIter.Next() { + t.Fatalf("expected listing next returned true where listing next returned false") + } + + }) + } +} + +func TestGraveler_ListingIterator(t *testing.T) { + tests := []struct { + name string + valueIter graveler.ValueIterator + delimiter []byte + prefix []byte + expectedListingIter graveler.ListingIterator + }{ + { + name: "no file", + valueIter: newMockValueIterator([]graveler.ValueRecord{}), + delimiter: []byte("/"), + prefix: nil, + expectedListingIter: newListingIter([]graveler.Listing{}), + }, + { + name: "one file no delimiter", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo")}}), + delimiter: nil, + prefix: nil, + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: false, Key: graveler.Key("foo")}}), + }, + { + name: "one file", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo")}}), + delimiter: []byte("/"), + prefix: nil, + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: false, Key: graveler.Key("foo")}}), + }, + { + name: "one common prefix", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/bar")}, {Key: graveler.Key("foo/bar2")}}), + delimiter: []byte("/"), + prefix: nil, + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: true, Key: graveler.Key("foo/")}}), + }, + { + name: "one common prefix one file", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/bar")}, {Key: graveler.Key("foo/bar2")}, {Key: graveler.Key("foo/bar3")}, {Key: graveler.Key("foo/bar4")}, {Key: graveler.Key("fooFighter")}}), + delimiter: []byte("/"), + prefix: nil, + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: true, Key: graveler.Key("foo/")}, {CommonPrefix: false, Key: graveler.Key("fooFighter")}}), + }, + { + name: "one file with prefix", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("path/to/foo")}}), + delimiter: []byte("/"), + prefix: []byte("path/to/"), + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: false, Key: graveler.Key("path/to/foo")}}), + }, + { + name: "one common prefix with prefix", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("path/to/foo/bar")}, {Key: graveler.Key("path/to/foo/bar2")}}), + delimiter: []byte("/"), + prefix: []byte("path/to/"), + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: true, Key: graveler.Key("path/to/foo/")}}), + }, + { + name: "one common prefix one file with prefix", + valueIter: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("path/to/foo/bar")}, {Key: graveler.Key("path/to/foo/bar2")}, {Key: graveler.Key("path/to/foo/bar3")}, {Key: graveler.Key("path/to/foo/bar4")}, {Key: graveler.Key("path/to/fooFighter")}}), + delimiter: []byte("/"), + prefix: []byte("path/to/"), + expectedListingIter: newListingIter([]graveler.Listing{{CommonPrefix: true, Key: graveler.Key("path/to/foo/")}, {CommonPrefix: false, Key: graveler.Key("path/to/fooFighter")}}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listingIter := graveler.NewListingIterator(tt.valueIter, tt.delimiter, tt.prefix) + defer listingIter.Close() + compareListingIterators(t, listingIter, tt.expectedListingIter) + }) + } +} + +func TestGraveler_List(t *testing.T) { + tests := []struct { + name string + r graveler.Graveler + amount int + delimiter graveler.Key + from graveler.Key + prefix graveler.Key + expectedErr error + expectedHasMore bool + expectedListing graveler.ListingIterator + }{ + { + name: "one committed one staged no paths", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("bar"), Value: &graveler.Value{}}})}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + delimiter: graveler.Key("/"), + prefix: graveler.Key(""), + amount: 10, + expectedListing: newListingIter([]graveler.Listing{{Key: graveler.Key("bar"), Value: &graveler.Value{}}, {Key: graveler.Key("foo"), Value: &graveler.Value{}}}), + }, + { + name: "same path different file", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("original")}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("other")}}})}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + delimiter: graveler.Key("/"), + prefix: graveler.Key(""), + amount: 10, + expectedListing: newListingIter([]graveler.Listing{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("other")}}}), + }, + { + name: "one committed one staged no paths - with prefix", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("prefix/bar"), Value: &graveler.Value{}}})}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + delimiter: graveler.Key("/"), + prefix: graveler.Key("prefix/"), + amount: 10, + expectedListing: newListingIter([]graveler.Listing{{Key: graveler.Key("prefix/bar"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}}), + }, + { + name: "objects and paths in both committed and staging", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("prefix/pathA/foo"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/pathA/foo2"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/pathB/foo"), Value: &graveler.Value{}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("prefix/file"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/pathA/bar"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/pathB/bar"), Value: &graveler.Value{}}})}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + delimiter: graveler.Key("/"), + prefix: graveler.Key("prefix/"), + amount: 10, + expectedListing: newListingIter([]graveler.Listing{ + { + CommonPrefix: false, + Key: graveler.Key("prefix/file"), + Value: &graveler.Value{}, + }, { + CommonPrefix: true, + Key: graveler.Key("prefix/pathA/"), + Value: nil, + }, { + CommonPrefix: true, + Key: graveler.Key("prefix/pathB/"), + Value: nil, + }}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listing, err := tt.r.List(context.Background(), "", "", tt.prefix, tt.from, tt.delimiter) + if err != tt.expectedErr { + t.Fatalf("wrong error, expected:%s got:%s", tt.expectedErr, err) + } + if err != nil { + return // err == tt.expectedErr + } + defer listing.Close() + // compare iterators + compareListingIterators(t, listing, tt.expectedListing) + }) + } +} + +func TestGraveler_Get(t *testing.T) { + var ErrTest = errors.New("some kind of err") + tests := []struct { + name string + r graveler.Graveler + expectedValueResult graveler.Value + expectedErr error + }{ + { + name: "commit - exists", + r: graveler.NewGraveler(&committedMock{Value: &graveler.Value{Identity: []byte("committed")}}, nil, + &mockRefs{refType: graveler.ReferenceTypeCommit}, + ), + expectedValueResult: graveler.Value{Identity: []byte("committed")}, + }, + { + name: "commit - not found", + r: graveler.NewGraveler(&committedMock{err: graveler.ErrNotFound}, nil, + &mockRefs{refType: graveler.ReferenceTypeCommit}, + ), expectedErr: graveler.ErrNotFound, + }, + { + name: "commit - error", + r: graveler.NewGraveler(&committedMock{err: ErrTest}, nil, + &mockRefs{refType: graveler.ReferenceTypeCommit}, + ), expectedErr: ErrTest, + }, + { + name: "branch - only staged", + r: graveler.NewGraveler(&committedMock{err: graveler.ErrNotFound}, &stagingMock{Value: &graveler.Value{Identity: []byte("staged")}}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + expectedValueResult: graveler.Value{Identity: []byte("staged")}, + }, + { + name: "branch - committed and staged", + r: graveler.NewGraveler(&committedMock{Value: &graveler.Value{Identity: []byte("committed")}}, &stagingMock{Value: &graveler.Value{Identity: []byte("staged")}}, + + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + expectedValueResult: graveler.Value{Identity: []byte("staged")}, + }, + { + name: "branch - only committed", + r: graveler.NewGraveler(&committedMock{Value: &graveler.Value{Identity: []byte("committed")}}, &stagingMock{err: graveler.ErrNotFound}, + + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + expectedValueResult: graveler.Value{Identity: []byte("committed")}, + }, + { + name: "branch - tombstone", + r: graveler.NewGraveler(&committedMock{Value: &graveler.Value{Identity: []byte("committed")}}, &stagingMock{Value: nil}, + + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + expectedErr: graveler.ErrNotFound, + }, + { + name: "branch - staged return error", + r: graveler.NewGraveler(&committedMock{}, &stagingMock{err: ErrTest}, + &mockRefs{refType: graveler.ReferenceTypeBranch}, + ), + expectedErr: ErrTest, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + Value, err := tt.r.Get(context.Background(), "", "", nil) + if err != tt.expectedErr { + t.Fatalf("wrong error, expected:%s got:%s", tt.expectedErr, err) + } + if err != nil { + return // err == tt.expected error + } + if string(tt.expectedValueResult.Identity) != string(Value.Identity) { + t.Errorf("wrong Value address, expected:%s got:%s", tt.expectedValueResult.Identity, Value.Identity) + } + }) + } +} + +func TestGraveler_DiffUncommitted(t *testing.T) { + tests := []struct { + name string + r graveler.Graveler + amount int + expectedErr error + expectedHasMore bool + expectedDiff graveler.DiffIterator + }{ + { + name: "no changes", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}}), err: graveler.ErrNotFound}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{})}, + &mockRefs{branch: &graveler.Branch{}}, + ), + amount: 10, + expectedDiff: newDiffIter([]graveler.Diff{}), + }, + { + name: "added one", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{}), err: graveler.ErrNotFound}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})}, + &mockRefs{branch: &graveler.Branch{}}, + ), + amount: 10, + expectedDiff: newDiffIter([]graveler.Diff{{ + Key: graveler.Key("foo/one"), + Type: graveler.DiffTypeAdded, + Value: &graveler.Value{}, + }}), + }, + { + name: "changed one", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})}, + &mockRefs{branch: &graveler.Branch{}}, + ), + amount: 10, + expectedDiff: newDiffIter([]graveler.Diff{{ + Key: graveler.Key("foo/one"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{}, + }}), + }, + { + name: "removed one", + r: graveler.NewGraveler(&committedMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})}, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: nil}})}, + &mockRefs{branch: &graveler.Branch{}}, + ), + amount: 10, + expectedDiff: newDiffIter([]graveler.Diff{{ + Key: graveler.Key("foo/one"), + Type: graveler.DiffTypeRemoved, + }}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + diff, err := tt.r.DiffUncommitted(context.Background(), "repo", "branch", graveler.Key("from")) + if err != tt.expectedErr { + t.Fatalf("wrong error, expected:%s got:%s", tt.expectedErr, err) + } + if err != nil { + return // err == tt.expectedErr + } + + // compare iterators + for diff.Next() { + if !tt.expectedDiff.Next() { + t.Fatalf("listing next returned true where expected listing next returned false") + } + if diff := deep.Equal(diff.Value(), tt.expectedDiff.Value()); diff != nil { + t.Errorf("unexpected diff %s", diff) + } + } + if tt.expectedDiff.Next() { + t.Fatalf("expected listing next returned true where listing next returned false") + } + }) + } +} + +func TestGraveler_CreateBranch(t *testing.T) { + gravel := graveler.NewGraveler(nil, + nil, + &mockRefs{ + branchErr: graveler.ErrNotFound, + }, + ) + _, err := gravel.CreateBranch(context.Background(), "", "", "") + if err != nil { + t.Fatal("unexpected error on create branch", err) + } + // test create branch when branch exists + gravel = graveler.NewGraveler(nil, + nil, + &mockRefs{ + branch: &graveler.Branch{}, + }, + ) + _, err = gravel.CreateBranch(context.Background(), "", "", "") + if !errors.Is(err, graveler.ErrBranchExists) { + t.Fatal("did not get expected error, expected ErrBranchExists") + } +} + +func TestGraveler_UpdateBranch(t *testing.T) { + gravel := graveler.NewGraveler(nil, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{{Key: graveler.Key("foo/one"), Value: &graveler.Value{}}})}, + &mockRefs{branch: &graveler.Branch{}}, + ) + _, err := gravel.UpdateBranch(context.Background(), "", "", "") + if !errors.Is(err, graveler.ErrConflictFound) { + t.Fatal("expected update to fail on conflict") + } + gravel = graveler.NewGraveler(nil, + &stagingMock{ValueIterator: newMockValueIterator([]graveler.ValueRecord{})}, + &mockRefs{branch: &graveler.Branch{}}, + ) + _, err = gravel.UpdateBranch(context.Background(), "", "", "") + if err != nil { + t.Fatal("did not expect to get error") + } +} diff --git a/graveler/listing_iterator.go b/graveler/listing_iterator.go new file mode 100644 index 00000000000..d4e8be1e763 --- /dev/null +++ b/graveler/listing_iterator.go @@ -0,0 +1,111 @@ +package graveler + +import ( + "bytes" + "math" +) + +// listingIterator implements a ListingIterator using a ValueIterator +// assumes all values in valueIterator start with prefix +type listingIterator struct { + valueIterator ValueIterator + delimiter Key + prefix Key + current *Listing + nextFunc func() bool +} + +// getFollowingValue returns the following value (i.e will increase the last byte by 1) +// in the following cases will return received value: empty value, the last byte is math.MaxUint8 +func getFollowingValue(value []byte) []byte { + if len(value) == 0 || value[len(value)-1] == math.MaxUint8 { + return value + } + copiedDelimiter := make([]byte, len(value)) + copy(copiedDelimiter, value) + copiedDelimiter[len(copiedDelimiter)-1]++ + return copiedDelimiter +} + +func NewListingIterator(iterator ValueIterator, delimiter, prefix Key) ListingIterator { + l := &listingIterator{ + valueIterator: iterator, + delimiter: delimiter, + prefix: prefix, + } + if len(delimiter) == 0 { + l.nextFunc = l.nextNoDelimiter + } else { + l.nextFunc = l.nextWithDelimiter + } + return l +} + +func (l *listingIterator) nextNoDelimiter() bool { + hasNext := l.valueIterator.Next() + if !hasNext { + l.current = nil + return false + } + val := l.valueIterator.Value() + l.current = &Listing{ + CommonPrefix: false, + Key: val.Key, + Value: val.Value, + } + return true +} + +func (l *listingIterator) nextWithDelimiter() bool { + if l.current != nil && l.current.CommonPrefix { + nextKey := getFollowingValue(l.current.Key) + l.valueIterator.SeekGE(nextKey) + } + hasNext := l.valueIterator.Next() + if hasNext { + l.current = l.getListingFromValue(*l.valueIterator.Value()) + } else { + l.current = nil + } + return hasNext +} + +func (l *listingIterator) Next() bool { + return l.nextFunc() +} + +func (l *listingIterator) getListingFromValue(value ValueRecord) *Listing { + relevantKey := value.Key[len(l.prefix):] + delimiterIndex := bytes.Index(relevantKey, l.delimiter) + if delimiterIndex == -1 { + // return listing for non common prefix with value + return &Listing{ + CommonPrefix: false, + Key: value.Key, + Value: value.Value, + } + } + // return listing for common prefix key + commonPrefixKey := value.Key[:len(l.prefix)+delimiterIndex+len(l.delimiter)] + return &Listing{ + CommonPrefix: true, + Key: commonPrefixKey, + } +} + +func (l *listingIterator) SeekGE(id Key) { + l.current = nil + l.valueIterator.SeekGE(id) +} + +func (l *listingIterator) Value() *Listing { + return l.current +} + +func (l *listingIterator) Err() error { + return l.valueIterator.Err() +} + +func (l *listingIterator) Close() { + l.valueIterator.Close() +} diff --git a/graveler/pgrefs.go b/graveler/pgrefs.go index b15a9883c5f..77bcea22235 100644 --- a/graveler/pgrefs.go +++ b/graveler/pgrefs.go @@ -196,7 +196,7 @@ func (m *PGRefManager) GetCommitByPrefix(ctx context.Context, repositoryID Repos return "", ErrNotFound } if len(startWith) > 1 { - return "", ErrCommitIDAmbiguous // more than 1 commit starts with the ID prefix + return "", ErrRefAmbiguous // more than 1 commit starts with the ID prefix } return startWith[0], nil }, db.ReadOnly(), db.WithContext(ctx)) diff --git a/graveler/prefix_iterator.go b/graveler/prefix_iterator.go new file mode 100644 index 00000000000..77b88cb4882 --- /dev/null +++ b/graveler/prefix_iterator.go @@ -0,0 +1,54 @@ +package graveler + +import "bytes" + +// prefixIterator holds a ValueIterator and iterates only over values the their Key starts with the prefix +type prefixIterator struct { + prefix Key + iterator ValueIterator + ended bool +} + +func NewPrefixIterator(iterator ValueIterator, prefix Key) ValueIterator { + iterator.SeekGE(prefix) + return &prefixIterator{ + prefix: prefix, + iterator: iterator, + } +} + +func (p *prefixIterator) Next() bool { + if p.ended { + return false + } + // prefix iterator ends when there is no more data, or the next value doesn't match the prefix + if !p.iterator.Next() || !bytes.HasPrefix(p.iterator.Value().Key, p.prefix) { + p.ended = true + return false + } + return true +} + +func (p *prefixIterator) SeekGE(id Key) { + from := id + if bytes.Compare(id, p.prefix) <= 0 { + from = p.prefix + } + p.iterator.SeekGE(from) + p.ended = false +} + +func (p *prefixIterator) Value() *ValueRecord { + if p.ended { + return nil + } + return p.iterator.Value() +} + +func (p *prefixIterator) Err() error { + return p.iterator.Err() +} + +func (p *prefixIterator) Close() { + p.iterator.Close() +} diff --git a/graveler/uncommitted_diff_iterator.go b/graveler/uncommitted_diff_iterator.go new file mode 100644 index 00000000000..a0663e815cd --- /dev/null +++ b/graveler/uncommitted_diff_iterator.go @@ -0,0 +1,98 @@ +package graveler + +import ( + "context" + "errors" + + "github.com/treeverse/lakefs/logging" +) + +type uncommittedDiffIterator struct { + committedManager CommittedManager + list ValueIterator + storageNamespace StorageNamespace + treeID TreeID + value *Diff + err error + ctx context.Context +} + +func NewUncommittedDiffIterator(ctx context.Context, manager CommittedManager, list ValueIterator, sn StorageNamespace, treeItreeID TreeID) DiffIterator { + return &uncommittedDiffIterator{ + ctx: ctx, + committedManager: manager, + list: list, + storageNamespace: sn, + treeID: treeItreeID, + } +} + +func (d *uncommittedDiffIterator) valueExistsInCommitted(val ValueRecord) (bool, error) { + _, err := d.committedManager.Get(d.ctx, d.storageNamespace, d.treeID, val.Key) + if errors.Is(err, ErrNotFound) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +func (d *uncommittedDiffIterator) getDiffType(val ValueRecord) (DiffType, error) { + existsInCommitted, err := d.valueExistsInCommitted(val) + if err != nil { + return 0, err + } + + if val.Value == nil { + // tombstone + if !existsInCommitted { + logging.Default(). + WithFields(logging.Fields{"tree_id": d.treeID, "storage_namespace": d.storageNamespace, "key": val.Key}). + Warn("tombstone for a file that does not exist") + } + return DiffTypeRemoved, nil + } + if existsInCommitted { + return DiffTypeChanged, nil + } + return DiffTypeAdded, nil +} + +func (d *uncommittedDiffIterator) Next() bool { + if !d.list.Next() { + d.value = nil + return false + } + val := d.list.Value() + diffType, err := d.getDiffType(*val) + if err != nil { + d.value = nil + d.err = err + return false + } + d.value = &Diff{ + Type: diffType, + Key: val.Key, + Value: val.Value, + } + return true +} + +func (d *uncommittedDiffIterator) SeekGE(id Key) { + d.value = nil + d.err = nil + d.list.SeekGE(id) +} + +func (d *uncommittedDiffIterator) Value() *Diff { + return d.value +} + +func (d *uncommittedDiffIterator) Err() error { + return d.err +} + +func (d *uncommittedDiffIterator) Close() { + d.list.Close() +} diff --git a/parade/action_manager.go b/parade/action_manager.go index 5be495865f9..15e3d7bd81d 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -16,7 +16,7 @@ const ( defaultMaxTasks = 500 defaultWaitTime = time.Millisecond * 300 defaultErrWaitTime = time.Second * 3 - defaultMaxDuration = time.Minute * 30 // Todo(guys): change this + defaultMaxDuration = time.Minute * 30 // TODO(guys): change this ) // ManagerProperties defines the configuration properties of an ActionManager