Skip to content

Commit

Permalink
pg refs manager for graveler
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored Dec 7, 2020
1 parent c5a4955 commit 5ee62e0
Show file tree
Hide file tree
Showing 16 changed files with 2,271 additions and 39 deletions.
3 changes: 3 additions & 0 deletions ddl/000017_graveler_refs.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE IF EXISTS graveler_repositories;
DROP TABLE IF EXISTS graveler_commits;
DROP TABLE IF EXISTS graveler_branches;
38 changes: 38 additions & 0 deletions ddl/000017_graveler_refs.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE IF NOT EXISTS graveler_repositories
(
id text NOT NULL,

storage_namespace text NOT NULL,
creation_date timestamptz NOT NULL,
default_branch text NOT NULL,

PRIMARY KEY (id)
);


CREATE TABLE IF NOT EXISTS graveler_branches
(
repository_id text NOT NULL,
id text NOT NULL,

staging_token text,
commit_id text,

PRIMARY KEY (repository_id, id)
);


CREATE TABLE IF NOT EXISTS graveler_commits
(
repository_id text NOT NULL,
id text NOT NULL,

committer text NOT NULL,
message text NOT NULL,
creation_date timestamptz NOT NULL,
tree_id text NOT NULL,
metadata jsonb,
parents text[],

PRIMARY KEY (repository_id, id)
);
85 changes: 59 additions & 26 deletions graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"net/url"
"regexp"
"strings"

"github.com/treeverse/lakefs/ident"

"github.com/treeverse/lakefs/catalog"

"time"
)

Expand Down Expand Up @@ -55,12 +60,15 @@ type Ref string
// TagID represents a named tag pointing at a commit
type TagID string

// CommitID is a content addressable hash representing a Commit object
type CommitID string
// CommitParents
type CommitParents []CommitID

// BranchID is an identifier for a branch
type BranchID string

// CommitID is a content addressable hash representing a Commit object
type CommitID string

// TreeID represents a snapshot of the tree, referenced by a commit
type TreeID string

Expand All @@ -75,13 +83,13 @@ type Metadata map[string]string

// Repository represents repository metadata
type Repository struct {
StorageNamespace StorageNamespace
CreationDate time.Time
DefaultBranchID BranchID
StorageNamespace StorageNamespace `db:"storage_namespace"`
CreationDate time.Time `db:"creation_date"`
DefaultBranchID BranchID `db:"default_branch"`
}

type RepositoryRecord struct {
RepositoryID RepositoryID
RepositoryID RepositoryID `db:"id"`
*Repository
}

Expand All @@ -97,19 +105,41 @@ type ValueRecord struct {
*Value
}

func (ps CommitParents) Identity() []byte {
strings := make([]string, len(ps))
for i, v := range ps {
strings[i] = string(v)
}
buf := ident.NewAddressWriter()
buf.MarshalStringSlice(strings)
return buf.Identity()
}

// Commit represents commit metadata (author, time, tree ID)
type Commit struct {
Committer string
Message string
TreeID TreeID
CreationDate time.Time
Parents []CommitID
Metadata map[string]string
Committer string `db:"committer"`
Message string `db:"message"`
TreeID TreeID `db:"tree_id"`
CreationDate time.Time `db:"creation_date"`
Parents CommitParents `db:"parents"`
Metadata catalog.Metadata `db:"metadata"`
}

func (c Commit) Identity() []byte {
b := ident.NewAddressWriter()
b.MarshalString("commit:v1")
b.MarshalString(c.Committer)
b.MarshalString(c.Message)
b.MarshalString(string(c.TreeID))
b.MarshalInt64(c.CreationDate.Unix())
b.MarshalStringMap(c.Metadata)
b.MarshalIdentifiable(c.Parents)
return b.Identity()
}

// CommitRecords holds CommitID with the associated Commit data
type CommitRecord struct {
CommitID CommitID
CommitID CommitID `db:"id"`
*Commit
}

Expand Down Expand Up @@ -184,7 +214,7 @@ type VersionController interface {
Log(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (CommitIterator, error)

// ListBranches lists branches on repositories
ListBranches(ctx context.Context, repositoryID RepositoryID) (BranchIterator, error)
ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error)

// DeleteBranch deletes branch from repository
DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error
Expand All @@ -209,7 +239,7 @@ type VersionController interface {
Merge(ctx context.Context, repositoryID RepositoryID, from Ref, to BranchID) (CommitID, error)

// DiffUncommitted returns iterator to scan the changes made on the branch
DiffUncommitted(ctx context.Context, repositoryID RepositoryID, branchID BranchID) (DiffIterator, error)
DiffUncommitted(ctx context.Context, repositoryID RepositoryID, branchID BranchID, from Key) (DiffIterator, error)

// Diff returns the changes between 'left' and 'right' ref, starting from the 'from' key
Diff(ctx context.Context, repositoryID RepositoryID, left, right Ref, from Key) (DiffIterator, error)
Expand Down Expand Up @@ -295,7 +325,7 @@ type RefManager interface {
CreateRepository(ctx context.Context, repositoryID RepositoryID, repository Repository, branch Branch) error

// ListRepositories lists repositories
ListRepositories(ctx context.Context) (RepositoryIterator, error)
ListRepositories(ctx context.Context, from RepositoryID) (RepositoryIterator, error)

// DeleteRepository deletes the repository
DeleteRepository(ctx context.Context, repositoryID RepositoryID) error
Expand All @@ -313,9 +343,9 @@ type RefManager interface {
DeleteBranch(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error

// ListBranches lists branches
ListBranches(ctx context.Context, repositoryID RepositoryID) (BranchIterator, error)
ListBranches(ctx context.Context, repositoryID RepositoryID, from BranchID) (BranchIterator, error)

// GetCommit returns the Commit metadata object for the given CommitID
// GetCommit returns the Commit metadata object for the given CommitID.
GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error)

// AddCommit stores the Commit object, returning its ID
Expand All @@ -337,11 +367,11 @@ type CommittedManager interface {
Get(ctx context.Context, ns StorageNamespace, treeID TreeID, key Key) (*Value, error)

// List takes a given tree and returns an ValueIterator
List(ctx context.Context, ns StorageNamespace, treeID TreeID) (ValueIterator, error)
List(ctx context.Context, ns StorageNamespace, treeID TreeID, from Key) (ValueIterator, error)

// Diff receives two trees and a 3rd merge base tree used to resolve the change type
// it tracks changes from left to right, returning an iterator of Diff entries
Diff(ctx context.Context, ns StorageNamespace, left, right, base TreeID) (DiffIterator, error)
Diff(ctx context.Context, ns StorageNamespace, left, right, base TreeID, from Key) (DiffIterator, error)

// Merge receives two trees and a 3rd merge base tree used to resolve the change type
// it applies that changes from left to right, resulting in a new tree that
Expand Down Expand Up @@ -381,13 +411,16 @@ var (
// Graveler errors
var (
ErrNotFound = errors.New("not found")
ErrNotUnique = errors.New("not unique")
ErrInvalidValue = errors.New("invalid value")
ErrInvalidStorageNamespace = fmt.Errorf("storage namespace %w", ErrInvalidValue)
ErrInvalidRepositoryID = fmt.Errorf("repository id %w", ErrInvalidValue)
ErrInvalidBranchID = fmt.Errorf("branch id %w", ErrInvalidValue)
ErrInvalidRef = fmt.Errorf("ref %w", ErrInvalidValue)
ErrInvalidCommitID = fmt.Errorf("commit id %w", ErrInvalidValue)
ErrCommitNotFound = fmt.Errorf("commit %w", ErrNotFound)
ErrInvalidMergeBase = fmt.Errorf("only 2 commits allowed in FindMergeBase: %w", ErrInvalidValue)
ErrInvalidStorageNamespace = fmt.Errorf("storage namespace: %w", ErrInvalidValue)
ErrInvalidRepositoryID = fmt.Errorf("repository id: %w", ErrInvalidValue)
ErrInvalidBranchID = fmt.Errorf("branch id: %w", ErrInvalidValue)
ErrInvalidRef = fmt.Errorf("ref: %w", ErrInvalidValue)
ErrInvalidCommitID = fmt.Errorf("commit id: %w", ErrInvalidValue)
ErrCommitNotFound = fmt.Errorf("commit: %w", ErrNotFound)
ErrCommitIDAmbiguous = fmt.Errorf("commit ID is ambiguous: %w", ErrNotFound)
)

func NewRepositoryID(id string) (RepositoryID, error) {
Expand Down
19 changes: 17 additions & 2 deletions graveler/main_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
package graveler
package graveler_test

import (
"flag"
"log"
"os"
"testing"

"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/graveler"
"github.com/treeverse/lakefs/testutil"

"github.com/ory/dockertest/v3"
"github.com/sirupsen/logrus"
"github.com/treeverse/lakefs/testutil"
)

var (
pool *dockertest.Pool
databaseURI string
)

func testRefManager(t testing.TB) graveler.RefManager {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true))
return graveler.NewPGRefManager(conn)
}

func testRefManagerWithDB(t testing.TB) (graveler.RefManager, db.Database) {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true))
return graveler.NewPGRefManager(conn), conn
}

func TestMain(m *testing.M) {
flag.Parse()
if !testing.Verbose() {
Expand Down
Loading

0 comments on commit 5ee62e0

Please sign in to comment.