From cfae470f390f54209db13d0851df4b8c27e30612 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 17 Nov 2020 17:11:05 +0200 Subject: [PATCH 01/16] Continuous export: try to start a new export after each export Handles a commit or merge occurring concurrently with an export. --- export/export.go | 35 +++++++++++++++++++++++++++++------ export/export_handler.go | 6 +++--- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/export/export.go b/export/export.go index c97758de82f..f3b8e170612 100644 --- a/export/export.go +++ b/export/export.go @@ -7,6 +7,7 @@ import ( nanoid "github.com/matoous/go-nanoid" + "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" "github.com/treeverse/lakefs/catalog" @@ -23,8 +24,8 @@ func getExportID(repo, branch, commitRef string) (string, error) { var ErrExportInProgress = errors.New("export currently in progress") // ExportBranchStart inserts a start task on branch, sets branch export state to pending. -// It returns an error if an export is already in progress. -func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo, branch string) (string, error) { +// It returns ErrExportInProgress if an export is already in progress. +func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, branch string) (string, error) { commit, err := cataloger.GetCommit(context.Background(), repo, branch) if err != nil { return "", err @@ -50,7 +51,7 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo return oldRef, "", nil, err } - err = paradeDB.InsertTasks(context.Background(), tasks) + err = parade.InsertTasks(context.Background(), tasks) if err != nil { return "", "", nil, err } @@ -62,10 +63,32 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo var ErrConflictingRefs = errors.New("conflicting references") // ExportBranchDone ends the export branch process by changing the status -func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { - err := cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { +func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { + if status == catalog.ExportStatusSuccess { + // Start the next export if continuous. + exportConfiguration, err := cataloger.GetExportConfigurationForBranch(repo, branch) + if err != nil { + return fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err) + } + if exportConfiguration.IsContinuous { + _, err := ExportBranchStart(parade, cataloger, repo, branch) + if err == ErrExportInProgress { + logging.Default().WithFields(logging.Fields{ + "repo": repo, + "branch": branch, + }).Info("export already in progress when restarting continuous export (unlikely)") + err = nil + } + if err != nil { + return fmt.Errorf("restart continuous export repo %s branch %s: %w", repo, branch, err) + } + return nil + } + } + + err := cataloger.ExportStateSet(repo, branch, func(oldRef string, oldStatus catalog.CatalogBranchExportStatus) (newRef string, newStatus catalog.CatalogBranchExportStatus, newMessage *string, err error) { if commitRef != oldRef { - return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef:%s, newRef:%s: %w", oldRef, commitRef, ErrConflictingRefs) + return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef: %s, newRef: %s: %w", oldRef, commitRef, ErrConflictingRefs) } return oldRef, status, statusMsg, nil }) diff --git a/export/export_handler.go b/export/export_handler.go index e0391d80bbf..33868b659f3 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -162,11 +162,11 @@ func getFinishBodyString(repo, branch, commitRef, statusPath string) (string, er CommitRef: commitRef, StatusPath: statusPath, } - finisBody, err := json.Marshal(finishData) + finishBody, err := json.Marshal(finishData) if err != nil { return "", err } - return string(finisBody), nil + return string(finishBody), nil } func (h *Handler) copy(body *string) error { @@ -245,7 +245,7 @@ func (h *Handler) done(body *string, signalledErrors int) error { if err != nil { return err } - return ExportBranchDone(h.cataloger, status, msg, finishData.Repo, finishData.Branch, finishData.CommitRef) + return ExportBranchDone(h.parade, h.cataloger, status, msg, finishData.Repo, finishData.Branch, finishData.CommitRef) } var errUnknownAction = errors.New("unknown action") From a4b3ec04ec3f35b642c0d6158a407bbde9d54b93 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 17 Nov 2020 18:27:16 +0200 Subject: [PATCH 02/16] Continuous export: start a new export after each commit or merge Hook onto commits and merges to start an export. This export will attempt to export to the branch tip. --- catalog/cataloger.go | 11 ++++--- catalog/mvcc/cataloger_commit.go | 2 +- catalog/mvcc/cataloger_commit_test.go | 25 ++++++++------ catalog/mvcc/cataloger_merge.go | 2 +- catalog/mvcc/cataloger_merge_test.go | 21 ++++++++---- export/export.go | 20 +++++++++--- export/export_handler.go | 47 +++++++++++++++++++++++++-- 7 files changed, 99 insertions(+), 29 deletions(-) diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 1a50a808f8e..55b70b83156 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -145,24 +145,27 @@ type Cataloger interface { // ExportStateCallback returns the new ref, state and message regarding the old ref and state type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error) +type PostCommitFunc = func(ctx context.Context, tx db.Tx, repo, branch string, commitLog *CommitLog) error +type PostMergeFunc = func(ctx context.Context, tx db.Tx, repo, branch string, mergeResult *MergeResult) error + // CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are // called in a current transaction context; if they return an error the transaction is rolled // back. Because these transactions are current, the hook can see the effect the operation only // on the passed transaction. type CatalogerHooks struct { // PostCommit hooks are called at the end of a commit. - PostCommit []func(ctx context.Context, tx db.Tx, commitLog *CommitLog) error + PostCommit []PostCommitFunc // PostMerge hooks are called at the end of a merge. - PostMerge []func(ctx context.Context, tx db.Tx, mergeResult *MergeResult) error + PostMerge []PostMergeFunc } -func (h *CatalogerHooks) AddPostCommit(f func(context.Context, db.Tx, *CommitLog) error) *CatalogerHooks { +func (h *CatalogerHooks) AddPostCommit(f PostCommitFunc) *CatalogerHooks { h.PostCommit = append(h.PostCommit, f) return h } -func (h *CatalogerHooks) AddPostMerge(f func(context.Context, db.Tx, *MergeResult) error) *CatalogerHooks { +func (h *CatalogerHooks) AddPostMerge(f PostMergeFunc) *CatalogerHooks { h.PostMerge = append(h.PostMerge, f) return h } diff --git a/catalog/mvcc/cataloger_commit.go b/catalog/mvcc/cataloger_commit.go index 7757034c418..25ba145f4a6 100644 --- a/catalog/mvcc/cataloger_commit.go +++ b/catalog/mvcc/cataloger_commit.go @@ -77,7 +77,7 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa } for _, hook := range c.hooks.PostCommit { - err = hook(ctx, tx, commitLog) + err = hook(ctx, tx, repository, branch, commitLog) if err != nil { // Roll tx back if a hook failed return nil, err diff --git a/catalog/mvcc/cataloger_commit_test.go b/catalog/mvcc/cataloger_commit_test.go index e24b939b6c3..1a2045e6cb3 100644 --- a/catalog/mvcc/cataloger_commit_test.go +++ b/catalog/mvcc/cataloger_commit_test.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "reflect" "strconv" "strings" "testing" "time" - "github.com/davecgh/go-spew/spew" + "github.com/go-test/deep" + "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" @@ -112,8 +112,8 @@ func TestCataloger_Commit(t *testing.T) { got.CreationDate = tt.want.CreationDate } } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Commit() got = %s, want = %s", spew.Sdump(got), spew.Sdump(tt.want)) + if diffs := deep.Equal(got, tt.want); diffs != nil { + t.Errorf("unexpected Commit(): %s", diffs) } }) } @@ -290,18 +290,23 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) { } } +type CommitData struct { + Repo, Branch string + Log CommitLog +} + // CommitHookLogger - commit hook that will return an error if set by Err. // When no Err is set it will log commit log into Logs. type CommitHookLogger struct { - Err error - Logs []*catalog.CommitLog + Err error + Commits []CommitData } -func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.CommitLog) error { +func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, log *CommitLog) error { if h.Err != nil { return h.Err } - h.Logs = append(h.Logs, log) + h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log}) return nil } @@ -349,8 +354,8 @@ func TestCataloger_CommitHooks(t *testing.T) { return } for i := range hooks { - if len(hooks[i].Logs) != 1 || hooks[i].Logs[0] != commitLog { - t.Errorf("hook %d: expected one commit %+v but got logs: %s", i, commitLog, spew.Sprint(hooks[i].Logs)) + if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { + t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) } } }) diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index 36bdaafb0a9..3b669c04d8a 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -90,7 +90,7 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran } mergeResult.Reference = MakeReference(rightBranch, nextCommitID) for _, hook := range c.hooks.PostMerge { - err = hook(ctx, tx, mergeResult) + err = hook(ctx, tx, repository, rightBranch, mergeResult) if err != nil { // Roll tx back if a hook failed return nil, err diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index 855687f4824..11d73b2908b 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -1183,18 +1183,23 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) { } } +type MergeData struct { + Repo, Branch string + Result MergeResult +} + // MergeHookLogger - merge hook that will return an error if set by Err. // When no Err is set it will log merge log into Logs. type MergeHookLogger struct { - Err error - Logs []*catalog.MergeResult + Err error + Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, result *MergeResult) error { if h.Err != nil { return h.Err } - h.Logs = append(h.Logs, log) + h.Merges = append(h.Merges, MergeData{Repo: repo, Branch: branch, Result: *result}) return nil } @@ -1242,9 +1247,13 @@ func TestCataloger_Merge_Hooks(t *testing.T) { {Path: "/file1"}, }) - expected := []*catalog.MergeResult{res} + expected := []MergeData{{ + Repo: repository, + Branch: "master", + Result: *res, + }} for _, hook := range hooks { - if diffs := deep.Equal(expected, hook.Logs); diffs != nil { + if diffs := deep.Equal(expected, hook.Merges); diffs != nil { t.Error("hook received unexpected merge result: ", diffs) } } diff --git a/export/export.go b/export/export.go index f3b8e170612..8171ef17824 100644 --- a/export/export.go +++ b/export/export.go @@ -2,6 +2,7 @@ package export import ( "context" + "db" "errors" "fmt" @@ -66,13 +67,16 @@ var ErrConflictingRefs = errors.New("conflicting references") func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { if status == catalog.ExportStatusSuccess { // Start the next export if continuous. - exportConfiguration, err := cataloger.GetExportConfigurationForBranch(repo, branch) + isContinuous, err := hasContinuousExport(cataloger, repo, branch) if err != nil { - return fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err) + // Consider branch export failed: it was supposed to be continuous but + // might have stopped. So set an error for the admin to fix before + // re-enabling continuous export. + return err } - if exportConfiguration.IsContinuous { + if isContinuous { _, err := ExportBranchStart(parade, cataloger, repo, branch) - if err == ErrExportInProgress { + if errors.Is(err, ErrExportInProgress) { logging.Default().WithFields(logging.Fields{ "repo": repo, "branch": branch, @@ -107,3 +111,11 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error return oldRef, catalog.ExportStatusRepaired, nil, nil }) } + +func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) { + exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch) + if err != nil { + return false, fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err) + } + return exportConfiguration.IsContinuous, nil +} diff --git a/export/export_handler.go b/export/export_handler.go index 33868b659f3..8e70a8e73f5 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -9,9 +9,8 @@ import ( "regexp" "strings" - "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" ) @@ -25,11 +24,17 @@ type Handler struct { } func NewHandler(adapter block.Adapter, cataloger catalog.Cataloger, parade parade.Parade) *Handler { - return &Handler{ + ret := &Handler{ adapter: adapter, cataloger: cataloger, parade: parade, } + if cataloger != nil { + hooks := cataloger.Hooks() + hooks.AddPostCommit(ret.exportCommitHook) + hooks.AddPostMerge(ret.exportMergeHook) + } + return ret } type TaskBody struct { @@ -291,3 +296,39 @@ func (h *Handler) Actions() []string { func (h *Handler) ActorID() parade.ActorID { return actorName } + +// exportCommitHook is a cataloger PostCommit hook for continuous export. +func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { + isContinuous, err := hasContinuousExport(h.cataloger, repo, branch) + if err != nil { + // FAIL this commit: if we were meant to export it and did not then in practice + // there was no commit. + return fmt.Errorf("check continuous export for commit %+v: %w", *log, err) + } + if !isContinuous { + return nil + } + _, err = ExportBranchStart(h.parade, h.cataloger, repo, branch) + if errors.Is(err, ErrExportInProgress) { + err = nil + } + return err +} + +// exportMergeHook is a cataloger PostMerge hook for continuous export. +func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error { + isContinuous, err := hasContinuousExport(h.cataloger, repo, branch) + if err != nil { + // FAIL this merge: if we were meant to export it and did not then in practice + // there was no merge. + return fmt.Errorf("check continuous export for merge %+v: %w", *merge, err) + } + if !isContinuous { + return nil + } + _, err = ExportBranchStart(h.parade, h.cataloger, repo, branch) + if errors.Is(err, ErrExportInProgress) { + err = nil + } + return err +} From 35a8ecb9a8194b5d0f85233520fa1065e55560d9 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 18 Nov 2020 17:37:52 +0200 Subject: [PATCH 03/16] Clean up time.Duration type multiplication in Tx retries This is actually in [the docs](https://golang.org/pkg/time/#pkg-constants): To convert an integer number of units to a Duration, multiply: ``` seconds := 10 fmt.Print(time.Duration(seconds)*time.Second) // prints 10s ``` That's due to shortcomings of the Go type system. But this way is safer, it avoids conversion to an int type. --- db/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/database.go b/db/database.go index 39de9a428e1..95b3fe4c09a 100644 --- a/db/database.go +++ b/db/database.go @@ -157,7 +157,7 @@ func (d *PgxDatabase) Transact(fn TxFunc, opts ...TxOpt) (interface{}, error) { var ret interface{} for attempt < SerializationRetryMaxAttempts { if attempt > 0 { - duration := time.Duration(int(SerializationRetryStartInterval) * attempt) + duration := SerializationRetryStartInterval * time.Duration(attempt) dbRetriesCount.Inc() options.logger. WithField("attempt", attempt). From f32712f7e6d92300762091df334546f5e111fc12 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Sun, 22 Nov 2020 17:05:22 +0200 Subject: [PATCH 04/16] Test (empty) diff between a ref and itself --- catalog/mvcc/cataloger_diff_test.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/catalog/mvcc/cataloger_diff_test.go b/catalog/mvcc/cataloger_diff_test.go index 605faf16c2c..13e74ec44ba 100644 --- a/catalog/mvcc/cataloger_diff_test.go +++ b/catalog/mvcc/cataloger_diff_test.go @@ -11,6 +11,31 @@ import ( "github.com/treeverse/lakefs/testutil" ) +func TestCataloger_DiffEmpty(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repo", "master") + + // create N files and commit + commitChanges := func(n int, msg, branch string) { + for i := 0; i < n; i++ { + testCatalogerCreateEntry(t, ctx, c, repository, branch, "/file"+strconv.Itoa(i), nil, branch) + } + _, err := c.Commit(ctx, repository, branch, msg, "tester", nil) + testutil.MustDo(t, msg, err) + } + commitChanges(10, "Changes on master", "master") + + res, hasMore, err := c.Diff(ctx, repository, "master", "master", DiffParams{Limit: 10}) + testutil.MustDo(t, "Diff", err) + if len(res) != 0 { + t.Errorf("Diff: got %+v but expected nothing", res) + } + if hasMore { + t.Errorf("Diff: got *more* diffs but expected nothing") + } +} + func TestCataloger_Diff(t *testing.T) { ctx := context.Background() c := testCataloger(t) From b3acce764cb60f53ca0f474d5077011bfa67b569 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Sun, 22 Nov 2020 17:11:55 +0200 Subject: [PATCH 05/16] Improve logging and clean up flows Most notably, avoid double updates in ExportBranchDone. --- catalog/mvcc/cataloger_export.go | 70 ++++++++++++++++++++------------ db/database.go | 5 +++ export/export.go | 38 +++++++++++++---- export/export_handler.go | 6 ++- 4 files changed, 83 insertions(+), 36 deletions(-) diff --git a/catalog/mvcc/cataloger_export.go b/catalog/mvcc/cataloger_export.go index 0418856df8c..f0bbc045c15 100644 --- a/catalog/mvcc/cataloger_export.go +++ b/catalog/mvcc/cataloger_export.go @@ -5,9 +5,13 @@ import ( "fmt" "regexp" + "github.com/jackc/pgconn" + "github.com/lib/pq" + "github.com/georgysavva/scany/pgxscan" "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/logging" ) func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) { @@ -89,58 +93,72 @@ func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportSt return res.(catalog.ExportState), err } -func (c *cataloger) ExportStateSet(repo, branch string, cb catalog.ExportStateCallback) error { - _, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { +func (c *cataloger) ExportStateSet(repo, branch string, cb ExportStateCallback) error { + _, err := c.db.Transact(db.Void(func(tx db.Tx) error { var res struct { CurrentRef string - State catalog.CatalogBranchExportStatus + Status catalog.CatalogBranchExportStatus ErrorMessage *string } branchID, err := c.getBranchIDCache(tx, repo, branch) if err != nil { - return nil, err + return err } // get current state err = tx.Get(&res, ` - SELECT current_ref, state, error_message - FROM catalog_branches_export_state - WHERE branch_id=$1 FOR UPDATE`, + SELECT current_ref, state, error_message + FROM catalog_branches_export_state + WHERE branch_id=$1 FOR NO KEY UPDATE`, branchID) missing := errors.Is(err, db.ErrNotFound) if err != nil && !missing { - err = fmt.Errorf("ExportStateSet: failed to get existing state: %w", err) - return nil, err + return fmt.Errorf("ExportStateMarkStart: failed to get existing state: %w", err) } oldRef := res.CurrentRef - state := res.State + oldStatus := res.Status + + l := logging.Default().WithFields(logging.Fields{ + "old_ref": oldRef, + "old_status": oldStatus, + "repo": repo, + "branch": branch, + "branch_id": branchID, + }) // run callback - newRef, newState, newMsg, err := cb(oldRef, state) + newRef, newStatus, newMessage, err := cb(oldRef, oldStatus) if err != nil { - return nil, err + return err } + l = l.WithFields(logging.Fields{ + "new_ref": newRef, + "new_status": newStatus, + }) + // update new state - var query string + var tag pgconn.CommandTag if missing { - query = ` - INSERT INTO catalog_branches_export_state (branch_id, current_ref, state, error_message) - VALUES ($1, $2, $3, $4)` + l.Info("insert on DB") + tag, err = tx.Exec(` + INSERT INTO catalog_branches_export_state (branch_id, current_ref, state, error_message) + VALUES ($1, $2, 'in-progress', $3)`, + branchID, newRef, newMessage) } else { - query = ` - UPDATE catalog_branches_export_state - SET current_ref=$2, state=$3, error_message=$4 - WHERE branch_id=$1` + l.Info("update on DB") + tag, err = tx.Exec(` + UPDATE catalog_branches_export_state + SET current_ref=$2, state='in-progress', error_message=NULL + WHERE branch_id=$1`, + branchID, newRef) } - - tag, err := tx.Exec(query, branchID, newRef, newState, newMsg) if err != nil { - return nil, fmt.Errorf("ExportStateSet: update state: %w", err) + return err } if tag.RowsAffected() != 1 { - return nil, fmt.Errorf("ExportStateSet: could not update single row %s: %w", tag, catalog.ErrExportFailed) + return fmt.Errorf("[I] ExportMarkSet: could not update single row %s: %w", tag, catalog.ErrEntryNotFound) } - return nil, err - }) + return err + })) return err } diff --git a/db/database.go b/db/database.go index 95b3fe4c09a..3c0a0d0c48f 100644 --- a/db/database.go +++ b/db/database.go @@ -28,6 +28,11 @@ type Database interface { Pool() *pgxpool.Pool } +// Void wraps a procedure with no return value as a TxFunc +func Void(fn func(tx Tx) error) TxFunc { + return func(tx Tx) (interface{}, error) { return nil, fn(tx) } +} + type QueryOptions struct { logger logging.Logger ctx context.Context diff --git a/export/export.go b/export/export.go index 8171ef17824..e01b5800555 100644 --- a/export/export.go +++ b/export/export.go @@ -2,10 +2,10 @@ package export import ( "context" - "db" "errors" "fmt" + "github.com/jackc/pgx/v4" nanoid "github.com/matoous/go-nanoid" "github.com/treeverse/lakefs/logging" @@ -36,7 +36,15 @@ func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, if err != nil { return "", err } + l := logging.Default().WithFields(logging.Fields{ + "repo": repo, + "branch": branch, + "commit_ref": commit, + "export_id": exportID, + }) + err = cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { + l.WithField("initial_state", state).Info("export update state") if state == catalog.ExportStatusInProgress { return oldRef, state, nil, ErrExportInProgress } @@ -52,6 +60,9 @@ func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, return oldRef, "", nil, err } + l.WithFields(logging.Fields{"num_tasks": len(tasks), "target_path": config.Path}). + Info("insert export tasks") + err = parade.InsertTasks(context.Background(), tasks) if err != nil { return "", "", nil, err @@ -61,10 +72,14 @@ func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, return exportID, err } -var ErrConflictingRefs = errors.New("conflicting references") +var ( + ErrConflictingRefs = errors.New("conflicting references") + ErrWrongStatus = errors.New("incorrect status") +) // ExportBranchDone ends the export branch process by changing the status func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { + l := logging.Default().WithFields(logging.Fields{"repo": repo, "branch": branch, "commit_ref": commitRef, "status": status, "status_message": statusMsg}) if status == catalog.ExportStatusSuccess { // Start the next export if continuous. isContinuous, err := hasContinuousExport(cataloger, repo, branch) @@ -75,12 +90,10 @@ func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status return err } if isContinuous { + l.Info("start new continuous export") _, err := ExportBranchStart(parade, cataloger, repo, branch) if errors.Is(err, ErrExportInProgress) { - logging.Default().WithFields(logging.Fields{ - "repo": repo, - "branch": branch, - }).Info("export already in progress when restarting continuous export (unlikely)") + l.Info("export already in progress when restarting continuous export (unlikely)") err = nil } if err != nil { @@ -88,25 +101,29 @@ func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status } return nil } + return err } err := cataloger.ExportStateSet(repo, branch, func(oldRef string, oldStatus catalog.CatalogBranchExportStatus) (newRef string, newStatus catalog.CatalogBranchExportStatus, newMessage *string, err error) { if commitRef != oldRef { return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef: %s, newRef: %s: %w", oldRef, commitRef, ErrConflictingRefs) } + if oldStatus != catalog.ExportStatusInProgress { + l.WithField("old_state", oldStatus).Error("expected old_state to be in-progress") + return "", "", nil, fmt.Errorf("expected old status to be in-progress not %s: %w", oldStatus, ErrWrongStatus) + } + l.WithField("old_state", oldStatus).Info("updating branch export") return oldRef, status, statusMsg, nil }) return err } -var ErrRepairWrongStatus = errors.New("incorrect status") - // ExportBranchRepair changes state from Failed To Repair and starts a new export. // It fails if the current state is not ExportStatusFailed. func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error { return cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { if state != catalog.ExportStatusFailed { - return oldRef, "", nil, ErrRepairWrongStatus + return oldRef, "", nil, ErrWrongStatus } return oldRef, catalog.ExportStatusRepaired, nil, nil }) @@ -114,6 +131,9 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) { exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch) + if errors.Is(err, pgx.ErrNoRows) { + return false, nil + } if err != nil { return false, fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err) } diff --git a/export/export_handler.go b/export/export_handler.go index 8e70a8e73f5..383c0e25419 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -11,6 +11,7 @@ import ( "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" ) @@ -299,6 +300,8 @@ func (h *Handler) ActorID() parade.ActorID { // exportCommitHook is a cataloger PostCommit hook for continuous export. func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { + l := logging.Default(). + WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) isContinuous, err := hasContinuousExport(h.cataloger, repo, branch) if err != nil { // FAIL this commit: if we were meant to export it and did not then in practice @@ -308,7 +311,8 @@ func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch st if !isContinuous { return nil } - _, err = ExportBranchStart(h.parade, h.cataloger, repo, branch) + exportID, err := ExportBranchStart(h.parade, h.cataloger, repo, branch) + l.WithField("export_id", exportID).Info("continuous export started") if errors.Is(err, ErrExportInProgress) { err = nil } From 4871e4931f70abeaa7429874e5ffb3e471635511 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Mon, 23 Nov 2020 11:18:30 +0200 Subject: [PATCH 06/16] [checks] use a shorter error line to pass err113 golangci check Yup, it ends up checking for the line length. 132 columns are back!!?! (http://www.righto.com/2019/01/accounting-machines-ibm-1403-and-why.html) --- export/export.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/export/export.go b/export/export.go index e01b5800555..f333f60722d 100644 --- a/export/export.go +++ b/export/export.go @@ -135,7 +135,7 @@ func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) return false, nil } if err != nil { - return false, fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err) + return false, fmt.Errorf("check if export configuration is continuous for repo %s branch %s %w", repo, branch, err) } return exportConfiguration.IsContinuous, nil } From 40fec4bea45afb025641962ddacffbd3c4bc24fa Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Mon, 23 Nov 2020 12:02:18 +0200 Subject: [PATCH 07/16] [CR] Refactor merge & commit hooks Also extract a smaller interface `cataloger.Exporter` --- catalog/cataloger.go | 1 - export/export.go | 4 ++-- export/export_handler.go | 37 +++++++++++++++---------------------- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 55b70b83156..6e31f5af719 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -2,7 +2,6 @@ package catalog import ( "context" - "io" "time" "github.com/treeverse/lakefs/db" diff --git a/export/export.go b/export/export.go index f333f60722d..57a0582d532 100644 --- a/export/export.go +++ b/export/export.go @@ -5,9 +5,9 @@ import ( "errors" "fmt" - "github.com/jackc/pgx/v4" nanoid "github.com/matoous/go-nanoid" + "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" @@ -131,7 +131,7 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) { exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch) - if errors.Is(err, pgx.ErrNoRows) { + if errors.Is(err, db.ErrNotFound) { return false, nil } if err != nil { diff --git a/export/export_handler.go b/export/export_handler.go index 383c0e25419..fc6e70dca5d 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -74,7 +74,7 @@ func (h *Handler) start(body *string) error { return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace) } -func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string, storageNamespace string) error { +func (h *Handler) generateTasks(startData StartData, config catalog.Cataloger, finishBodyStr *string, storageNamespace string) error { tasksGenerator := NewTasksGenerator(startData.ExportID, config.Path, getGenerateSuccess(config.LastKeysInPrefixRegexp), finishBodyStr, storageNamespace) var diffs catalog.Differences var err error @@ -298,20 +298,17 @@ func (h *Handler) ActorID() parade.ActorID { return actorName } -// exportCommitHook is a cataloger PostCommit hook for continuous export. -func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { - l := logging.Default(). - WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) - isContinuous, err := hasContinuousExport(h.cataloger, repo, branch) +func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op interface{}, repo, branch string) error { + isContinuous, err := hasContinuousExport(c, repo, branch) if err != nil { // FAIL this commit: if we were meant to export it and did not then in practice // there was no commit. - return fmt.Errorf("check continuous export for commit %+v: %w", *log, err) + return fmt.Errorf("check continuous export for %+v: %w", op, err) } if !isContinuous { return nil } - exportID, err := ExportBranchStart(h.parade, h.cataloger, repo, branch) + exportID, err := ExportBranchStart(p, c, repo, branch) l.WithField("export_id", exportID).Info("continuous export started") if errors.Is(err, ErrExportInProgress) { err = nil @@ -319,20 +316,16 @@ func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch st return err } +// exportCommitHook is a cataloger PostCommit hook for continuous export. +func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { + l := logging.Default(). + WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) + return startExport(l, h.parade, h.cataloger, *log, repo, branch) +} + // exportMergeHook is a cataloger PostMerge hook for continuous export. func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error { - isContinuous, err := hasContinuousExport(h.cataloger, repo, branch) - if err != nil { - // FAIL this merge: if we were meant to export it and did not then in practice - // there was no merge. - return fmt.Errorf("check continuous export for merge %+v: %w", *merge, err) - } - if !isContinuous { - return nil - } - _, err = ExportBranchStart(h.parade, h.cataloger, repo, branch) - if errors.Is(err, ErrExportInProgress) { - err = nil - } - return err + l := logging.Default(). + WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference}) + return startExport(l, h.parade, h.cataloger, *merge, repo, branch) } From d55eb5d843809649577a249a17e69aade1b7a4f6 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 10:20:21 +0200 Subject: [PATCH 08/16] [post-rebase] Revert "status" back to "state" in field names --- catalog/mvcc/cataloger_export.go | 88 +++++++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/catalog/mvcc/cataloger_export.go b/catalog/mvcc/cataloger_export.go index f0bbc045c15..3d015c48265 100644 --- a/catalog/mvcc/cataloger_export.go +++ b/catalog/mvcc/cataloger_export.go @@ -14,13 +14,79 @@ import ( "github.com/treeverse/lakefs/logging" ) -func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) { +// ExportConfiguration describes the export configuration of a branch, as passed on wire, used +// internally, and stored in DB. +type ExportConfiguration struct { + Path string `db:"export_path" json:"export_path"` + StatusPath string `db:"export_status_path" json:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous" json:"is_continuous"` +} + +// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. +// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of +// ExportConfiguration embedded here. +type ExportConfigurationForBranch struct { + Repository string `db:"repository"` + Branch string `db:"branch"` + + Path string `db:"export_path"` + StatusPath string `db:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous"` +} + +type CatalogBranchExportStatus string + +const ( + ExportStatusInProgress = CatalogBranchExportStatus("in-progress") + ExportStatusSuccess = CatalogBranchExportStatus("exported-successfully") + ExportStatusFailed = CatalogBranchExportStatus("export-failed") + ExportStatusRepaired = CatalogBranchExportStatus("export-repaired") + ExportStatusUnknown = CatalogBranchExportStatus("[unknown]") +) + +// ExportStatus describes the current export status of a branch, as passed on wire, used +// internally, and stored in DB. +type ExportStatus struct { + CurrentRef string `db:"current_ref"` + State CatalogBranchExportStatus `db:"state"` +} + +var ErrBadTypeConversion = errors.New("bad type") + +// nolint: stylecheck +func (dst *CatalogBranchExportStatus) Scan(src interface{}) error { + var sc CatalogBranchExportStatus + switch s := src.(type) { + case string: + sc = CatalogBranchExportStatus(strings.ToLower(s)) + case []byte: + sc = CatalogBranchExportStatus(strings.ToLower(string(s))) + default: + return fmt.Errorf("cannot convert %T to CatalogBranchExportStatus: %w", src, ErrBadTypeConversion) + } + + if !(sc == ExportStatusInProgress || sc == ExportStatusSuccess || sc == ExportStatusFailed) { + // not a failure, "just" be a newer enum value than known + *dst = ExportStatusUnknown + return nil + } + *dst = sc + return nil +} + +func (src CatalogBranchExportStatus) Value() (driver.Value, error) { + return string(src), nil +} + +func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (ExportConfiguration, error) { ret, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { branchID, err := c.getBranchIDCache(tx, repository, branch) - var ret catalog.ExportConfiguration if err != nil { - return nil, err + return nil, fmt.Errorf("repository %s branch %s: %w", repository, branch, err) } + var ret ExportConfiguration err = c.db.Get(&ret, `SELECT export_path, export_status_path, last_keys_in_prefix_regexp, continuous FROM catalog_branches_export @@ -95,11 +161,7 @@ func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportSt func (c *cataloger) ExportStateSet(repo, branch string, cb ExportStateCallback) error { _, err := c.db.Transact(db.Void(func(tx db.Tx) error { - var res struct { - CurrentRef string - Status catalog.CatalogBranchExportStatus - ErrorMessage *string - } + var res catalog.ExportState branchID, err := c.getBranchIDCache(tx, repo, branch) if err != nil { @@ -116,7 +178,7 @@ func (c *cataloger) ExportStateSet(repo, branch string, cb ExportStateCallback) return fmt.Errorf("ExportStateMarkStart: failed to get existing state: %w", err) } oldRef := res.CurrentRef - oldStatus := res.Status + oldStatus := res.State l := logging.Default().WithFields(logging.Fields{ "old_ref": oldRef, @@ -142,15 +204,15 @@ func (c *cataloger) ExportStateSet(repo, branch string, cb ExportStateCallback) l.Info("insert on DB") tag, err = tx.Exec(` INSERT INTO catalog_branches_export_state (branch_id, current_ref, state, error_message) - VALUES ($1, $2, 'in-progress', $3)`, - branchID, newRef, newMessage) + VALUES ($1, $2, $3, $4)`, + branchID, newRef, newStatus, newMessage) } else { l.Info("update on DB") tag, err = tx.Exec(` UPDATE catalog_branches_export_state - SET current_ref=$2, state='in-progress', error_message=NULL + SET current_ref=$2, state=$3, error_message=$4 WHERE branch_id=$1`, - branchID, newRef) + branchID, newRef, newStatus, newMessage) } if err != nil { return err From 62de3b9f8ea48d2ac59cd24b1cd0e0c7979ac3af Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 11:34:56 +0200 Subject: [PATCH 09/16] [bugfix] return ErrNotFound correctly Package `scany` depends on a different version of `pgx` than the rest of lakeFS. So `errors.Is(err, pgx.ErrNoRows)` fails. Luckily it (sort-of) knows of this issue and wraps this call inside it as `pgxscan.NotFound`. Also make `ErrNotFound` wrap `pgx.ErrNoRows` rather than a new error. --- db/tx.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/db/tx.go b/db/tx.go index 4a15ef84bc8..701f135d05b 100644 --- a/db/tx.go +++ b/db/tx.go @@ -2,7 +2,6 @@ package db import ( "context" - "errors" "fmt" "strings" "time" @@ -75,8 +74,6 @@ func (d *dbTx) Get(dest interface{}, query string, args ...interface{}) error { }) err := pgxscan.Get(context.Background(), d.tx, dest, query, args...) if pgxscan.NotFound(err) { - // Don't wrap this err: it might come from a different version of pgx and then - // !errors.Is(err, pgx.ErrNoRows). log.Trace("SQL query returned no results") return ErrNotFound } @@ -99,7 +96,9 @@ func (d *dbTx) GetPrimitive(dest interface{}, query string, args ...interface{}) }) row := d.tx.QueryRow(context.Background(), query, args...) err := row.Scan(dest) - if errors.Is(err, pgx.ErrNoRows) { + if pgxscan.NotFound(err) { + // Don't wrap err: it might come from a different version of pgx and then + // !errors.Is(err, pgx.ErrNoRows). log.Trace("SQL query returned no results") return ErrNotFound } From 57432091d6b78559eb0e7bd3e85ecf4981e396c3 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 12:14:11 +0200 Subject: [PATCH 10/16] WIP try to fix db notfound errs --- db/tx.go | 9 ++++++--- go.sum | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/db/tx.go b/db/tx.go index 701f135d05b..5a01dd26481 100644 --- a/db/tx.go +++ b/db/tx.go @@ -2,6 +2,7 @@ package db import ( "context" + "errors" "fmt" "strings" "time" @@ -74,6 +75,8 @@ func (d *dbTx) Get(dest interface{}, query string, args ...interface{}) error { }) err := pgxscan.Get(context.Background(), d.tx, dest, query, args...) if pgxscan.NotFound(err) { + // This err comes directly from scany, not directly from pgx, so *must* use + // pgxscan.NotFound. log.Trace("SQL query returned no results") return ErrNotFound } @@ -96,9 +99,9 @@ func (d *dbTx) GetPrimitive(dest interface{}, query string, args ...interface{}) }) row := d.tx.QueryRow(context.Background(), query, args...) err := row.Scan(dest) - if pgxscan.NotFound(err) { - // Don't wrap err: it might come from a different version of pgx and then - // !errors.Is(err, pgx.ErrNoRows). + if errors.Is(err, pgx.ErrNoRows) { + // This err comes directly from pgx, not via scany, so *cannot* use + // pgxscan.NotFound. log.Trace("SQL query returned no results") return ErrNotFound } diff --git a/go.sum b/go.sum index a1b7e2c193b..6cd3623f37b 100644 --- a/go.sum +++ b/go.sum @@ -600,6 +600,7 @@ github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= +github.com/jackc/pgconn v1.7.2 h1:195tt17jkjy+FrFlY0pgyrul5kRLb7BGXY3JTrNxeXU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6 h1:geJ1mgTGd0WQo67wEd+H4OjFG5uA2e3cEBz9D5+pftU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= From 3e633225fd367911958ea579762494b82a0eb92c Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 13:11:22 +0200 Subject: [PATCH 11/16] [CR] GetPrimitive doesn't call pgxscan, use pgx directly there --- db/tx.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/db/tx.go b/db/tx.go index 5a01dd26481..4722118930b 100644 --- a/db/tx.go +++ b/db/tx.go @@ -100,8 +100,6 @@ func (d *dbTx) GetPrimitive(dest interface{}, query string, args ...interface{}) row := d.tx.QueryRow(context.Background(), query, args...) err := row.Scan(dest) if errors.Is(err, pgx.ErrNoRows) { - // This err comes directly from pgx, not via scany, so *cannot* use - // pgxscan.NotFound. log.Trace("SQL query returned no results") return ErrNotFound } From 43e6261140453ba26a4cff6389c9d6da2475c092 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 15:06:20 +0200 Subject: [PATCH 12/16] [WIP] avoid empty exports --- export/export.go | 51 +++++++++++++++++++++++++++------------- export/export_handler.go | 24 ++++++++++++++++++- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/export/export.go b/export/export.go index 57a0582d532..dc1738d29d9 100644 --- a/export/export.go +++ b/export/export.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/jackc/pgx/v4" nanoid "github.com/matoous/go-nanoid" "github.com/treeverse/lakefs/db" @@ -22,7 +23,10 @@ func getExportID(repo, branch, commitRef string) (string, error) { return fmt.Sprintf("%s-%s-%s-%s", repo, branch, commitRef, nid), nil } -var ErrExportInProgress = errors.New("export currently in progress") +var ( + ErrExportInProgress = errors.New("export currently in progress") + ErrNothingToExport = errors.New("nothing to export") +) // ExportBranchStart inserts a start task on branch, sets branch export state to pending. // It returns ErrExportInProgress if an export is already in progress. @@ -39,18 +43,25 @@ func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, l := logging.Default().WithFields(logging.Fields{ "repo": repo, "branch": branch, - "commit_ref": commit, + "commit_ref": commitRef, "export_id": exportID, }) err = cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { - l.WithField("initial_state", state).Info("export update state") + l.WithFields(logging.Fields{ + "initial_state": state, + "old_ref": oldRef, + "old_state": state, + }).Info("export update state") if state == catalog.ExportStatusInProgress { return oldRef, state, nil, ErrExportInProgress } if state == catalog.ExportStatusFailed { return oldRef, state, nil, catalog.ErrExportFailed } + if oldRef == commitRef { + return oldRef, state, nil, ErrNothingToExport + } config, err := cataloger.GetExportConfigurationForBranch(repo, branch) if err != nil { return oldRef, "", nil, err @@ -80,6 +91,22 @@ var ( // ExportBranchDone ends the export branch process by changing the status func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { l := logging.Default().WithFields(logging.Fields{"repo": repo, "branch": branch, "commit_ref": commitRef, "status": status, "status_message": statusMsg}) + + err := cataloger.ExportStateSet(repo, branch, func(oldRef string, oldStatus catalog.CatalogBranchExportStatus) (newRef string, newStatus catalog.CatalogBranchExportStatus, newMessage *string, err error) { + if commitRef != oldRef { + return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef: %s, newRef: %s: %w", oldRef, commitRef, ErrConflictingRefs) + } + if oldStatus != catalog.ExportStatusInProgress { + l.WithField("old_state", oldStatus).Error("expected old_state to be in-progress") + return "", "", nil, fmt.Errorf("expected old status to be in-progress not %s: %w", oldStatus, ErrWrongStatus) + } + l.WithField("old_state", oldStatus).Info("updating branch export") + return oldRef, status, statusMsg, nil + }) + if err != nil { + return err + } + if status == catalog.ExportStatusSuccess { // Start the next export if continuous. isContinuous, err := hasContinuousExport(cataloger, repo, branch) @@ -95,6 +122,9 @@ func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status if errors.Is(err, ErrExportInProgress) { l.Info("export already in progress when restarting continuous export (unlikely)") err = nil + } else if errors.Is(err, ErrNothingToExport) { + l.Info("nothing further to export continuously; stop (likely)") + err = nil } if err != nil { return fmt.Errorf("restart continuous export repo %s branch %s: %w", repo, branch, err) @@ -104,18 +134,7 @@ func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status return err } - err := cataloger.ExportStateSet(repo, branch, func(oldRef string, oldStatus catalog.CatalogBranchExportStatus) (newRef string, newStatus catalog.CatalogBranchExportStatus, newMessage *string, err error) { - if commitRef != oldRef { - return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef: %s, newRef: %s: %w", oldRef, commitRef, ErrConflictingRefs) - } - if oldStatus != catalog.ExportStatusInProgress { - l.WithField("old_state", oldStatus).Error("expected old_state to be in-progress") - return "", "", nil, fmt.Errorf("expected old status to be in-progress not %s: %w", oldStatus, ErrWrongStatus) - } - l.WithField("old_state", oldStatus).Info("updating branch export") - return oldRef, status, statusMsg, nil - }) - return err + return nil } // ExportBranchRepair changes state from Failed To Repair and starts a new export. @@ -131,7 +150,7 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) { exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch) - if errors.Is(err, db.ErrNotFound) { + if errors.Is(err, db.ErrNotFound) || errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { diff --git a/export/export_handler.go b/export/export_handler.go index fc6e70dca5d..5999ba09c35 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -71,6 +71,13 @@ func (h *Handler) start(body *string) error { if err != nil { return err } + logging.Default().WithFields(logging.Fields{ + "repo": startData.Repo, + "branch": startData.Branch, + "from_ref": startData.FromCommitRef, + "to_ref": startData.ToCommitRef, + "export_id": startData.ExportID, + }).Info("action: start export") return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace) } @@ -247,6 +254,14 @@ func (h *Handler) done(body *string, signalledErrors int) error { return err } status, msg := getStatus(signalledErrors) + logging.Default().WithFields(logging.Fields{ + "repo": finishData.Repo, + "branch": finishData.Branch, + "commit_ref": finishData.CommitRef, + "status_path": finishData.StatusPath, + "status": status, + "status_message": msg, + }).Info("action: export done") err = h.updateStatus(finishData, status, signalledErrors) if err != nil { return err @@ -309,10 +324,17 @@ func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op inte return nil } exportID, err := ExportBranchStart(p, c, repo, branch) - l.WithField("export_id", exportID).Info("continuous export started") + if err != nil { + l = l.WithError(err) + } if errors.Is(err, ErrExportInProgress) { + l = l.WithField("skipped", "export already in progress") + err = nil + } else if errors.Is(err, ErrNothingToExport) { + l = l.WithField("skipped", "nothing further to export") err = nil } + l.WithField("export_id", exportID).Info("continuous export started") return err } From 5e77681e49848c0e09a5f8fb282904714f2fd79d Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Tue, 24 Nov 2020 15:18:34 +0200 Subject: [PATCH 13/16] Run hooks outside of transactions Catalog methods do not support transactions anyway. So there is no way for a hook to use its transaction. OTOH, running inside the transaction means all catalog methods that the hook calls will NOT see the commit or merge. So fix that. --- catalog/cataloger.go | 9 ++-- catalog/mvcc/cataloger_commit.go | 18 +++---- catalog/mvcc/cataloger_commit_test.go | 70 ++++++++------------------- catalog/mvcc/cataloger_merge.go | 17 ++++--- catalog/mvcc/cataloger_merge_test.go | 11 +++-- export/export_handler.go | 5 +- 6 files changed, 51 insertions(+), 79 deletions(-) diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 6e31f5af719..8c698ba526f 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -2,6 +2,7 @@ package catalog import ( "context" + "io" "time" "github.com/treeverse/lakefs/db" @@ -144,13 +145,11 @@ type Cataloger interface { // ExportStateCallback returns the new ref, state and message regarding the old ref and state type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error) -type PostCommitFunc = func(ctx context.Context, tx db.Tx, repo, branch string, commitLog *CommitLog) error -type PostMergeFunc = func(ctx context.Context, tx db.Tx, repo, branch string, mergeResult *MergeResult) error +type PostCommitFunc = func(ctx context.Context, repo, branch string, commitLog *CommitLog) error +type PostMergeFunc = func(ctx context.Context, repo, branch string, mergeResult *MergeResult) error // CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are -// called in a current transaction context; if they return an error the transaction is rolled -// back. Because these transactions are current, the hook can see the effect the operation only -// on the passed transaction. +// called after the transaction ends; if they return an error they do not affect commit/merge. type CatalogerHooks struct { // PostCommit hooks are called at the end of a commit. PostCommit []PostCommitFunc diff --git a/catalog/mvcc/cataloger_commit.go b/catalog/mvcc/cataloger_commit.go index 25ba145f4a6..d2bc0a1299c 100644 --- a/catalog/mvcc/cataloger_commit.go +++ b/catalog/mvcc/cataloger_commit.go @@ -76,20 +76,20 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa Parents: []string{parentReference}, } - for _, hook := range c.hooks.PostCommit { - err = hook(ctx, tx, repository, branch, commitLog) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } - return commitLog, nil }, c.txOpts(ctx)...) if err != nil { return nil, err } - return res.(*catalog.CommitLog), nil + commitLog := res.(*catalog.CommitLog) + for _, hook := range c.Hooks().PostCommit { + anotherErr := hook(ctx, repository, branch, commitLog) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + + return commitLog, nil } func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) { diff --git a/catalog/mvcc/cataloger_commit_test.go b/catalog/mvcc/cataloger_commit_test.go index 1a2045e6cb3..cffb47111d3 100644 --- a/catalog/mvcc/cataloger_commit_test.go +++ b/catalog/mvcc/cataloger_commit_test.go @@ -12,7 +12,6 @@ import ( "github.com/go-test/deep" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -292,72 +291,41 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) { type CommitData struct { Repo, Branch string - Log CommitLog + Log catalog.CommitLog } // CommitHookLogger - commit hook that will return an error if set by Err. // When no Err is set it will log commit log into Logs. type CommitHookLogger struct { - Err error Commits []CommitData } -func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, log *CommitLog) error { - if h.Err != nil { - return h.Err - } +func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log *catalog.CommitLog) error { h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log}) return nil } func TestCataloger_CommitHooks(t *testing.T) { - errHookFailed := errors.New("for testing") - tests := []struct { - name string - path string - hookErr error - wantErr error - }{ - { - name: "no_block", - hookErr: nil, - }, - { - name: "block", - hookErr: errHookFailed, - }, + ctx := context.Background() + c := testCataloger(t) + + // register hooks (more than one to verify all get called) + hooks := make([]CommitHookLogger, 2) + for i := range hooks { + c.Hooks().AddPostCommit(hooks[i].Hook) } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - c := testCataloger(t) - // register hooks (more than one to verify all get called) - hooks := []CommitHookLogger{ - {Err: tt.hookErr}, - {Err: tt.hookErr}, - } - for i := range hooks { - c.Hooks().AddPostCommit(hooks[i].Hook) - } + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") - repository := testCatalogerRepo(t, ctx, c, "repository", "master") - _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") + commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) + if err != nil { + t.Fatalf("Commit err=%s", err) + } - commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) - // check that hook err is the commit error - if !errors.Is(tt.hookErr, err) { - t.Fatalf("Commit err=%s, expected=%s", err, tt.hookErr) - } - // on successful commit the commit log should be found on hook's logs - if err != nil { - return - } - for i := range hooks { - if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { - t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) - } - } - }) + for i := range hooks { + if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { + t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) + } } } diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index 3b669c04d8a..8fd91a1895f 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -89,15 +89,18 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran return nil, err } mergeResult.Reference = MakeReference(rightBranch, nextCommitID) - for _, hook := range c.hooks.PostMerge { - err = hook(ctx, tx, repository, rightBranch, mergeResult) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } return nil, nil }, c.txOpts(ctx, db.ReadCommitted())...) + + if err == nil { + for _, hook := range c.Hooks().PostMerge { + anotherErr := hook(ctx, repository, rightBranch, mergeResult) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + } + return mergeResult, err } diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index 11d73b2908b..dead34b702c 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -8,7 +8,6 @@ import ( "github.com/go-test/deep" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -139,8 +138,12 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) { if !errors.Is(err, catalog.ErrConflictFound) { t.Errorf("Merge err = %s, expected conflict with err = %s", err, catalog.ErrConflictFound) } - if res.Reference != "" { - t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + if res == nil { + t.Errorf("Merge returned nil, err %s", err) + } else { + if res.Reference != "" { + t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + } } } @@ -1195,7 +1198,7 @@ type MergeHookLogger struct { Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, result *MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *MergeResult) error { if h.Err != nil { return h.Err } diff --git a/export/export_handler.go b/export/export_handler.go index 5999ba09c35..23ede9b9c20 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -11,7 +11,6 @@ import ( "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" ) @@ -339,14 +338,14 @@ func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op inte } // exportCommitHook is a cataloger PostCommit hook for continuous export. -func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { +func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log *catalog.CommitLog) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) return startExport(l, h.parade, h.cataloger, *log, repo, branch) } // exportMergeHook is a cataloger PostMerge hook for continuous export. -func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error { +func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge *catalog.MergeResult) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference}) return startExport(l, h.parade, h.cataloger, *merge, repo, branch) From 74527c9ff8deb5d4b9d37c94e9597f61e99e62ad Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 25 Nov 2020 10:23:16 +0200 Subject: [PATCH 14/16] Post-rebase fixes --- catalog/cataloger.go | 24 ++++++++- catalog/export.go | 35 ++----------- catalog/mvcc/cataloger_diff_test.go | 2 +- catalog/mvcc/cataloger_export.go | 75 ++-------------------------- catalog/mvcc/cataloger_merge_test.go | 4 +- export/export_handler.go | 2 +- 6 files changed, 34 insertions(+), 108 deletions(-) diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 8c698ba526f..6bbbc8c8b1d 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -5,7 +5,7 @@ import ( "io" "time" - "github.com/treeverse/lakefs/db" + "github.com/lib/pq" ) const ( @@ -145,6 +145,28 @@ type Cataloger interface { // ExportStateCallback returns the new ref, state and message regarding the old ref and state type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error) +// ExportConfiguration describes the export configuration of a branch, as passed on wire, used +// internally, and stored in DB. +type ExportConfiguration struct { + Path string `db:"export_path" json:"export_path"` + StatusPath string `db:"export_status_path" json:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous" json:"is_continuous"` +} + +// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. +// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of +// ExportConfiguration embedded here. +type ExportConfigurationForBranch struct { + Repository string `db:"repository"` + Branch string `db:"branch"` + + Path string `db:"export_path"` + StatusPath string `db:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous"` +} + type PostCommitFunc = func(ctx context.Context, repo, branch string, commitLog *CommitLog) error type PostMergeFunc = func(ctx context.Context, repo, branch string, mergeResult *MergeResult) error diff --git a/catalog/export.go b/catalog/export.go index 4a0d7d897a9..fdd7142b21e 100644 --- a/catalog/export.go +++ b/catalog/export.go @@ -4,32 +4,8 @@ import ( "database/sql/driver" "fmt" "strings" - - "github.com/lib/pq" ) -// ExportConfiguration describes the export configuration of a branch, as passed on wire, used -// internally, and stored in DB. -type ExportConfiguration struct { - Path string `db:"export_path" json:"export_path"` - StatusPath string `db:"export_status_path" json:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous" json:"is_continuous"` -} - -// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. -// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of -// ExportConfiguration embedded here. -type ExportConfigurationForBranch struct { - Repository string `db:"repository"` - Branch string `db:"branch"` - - Path string `db:"export_path"` - StatusPath string `db:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous"` -} - type CatalogBranchExportStatus string const ( @@ -42,15 +18,10 @@ const ( // ExportStatus describes the current export status of a branch, as passed on wire, used // internally, and stored in DB. -type ExportStatus struct { - CurrentRef string `db:"current_ref"` - State CatalogBranchExportStatus -} - type ExportState struct { - CurrentRef string - State CatalogBranchExportStatus - ErrorMessage *string + CurrentRef string `db:"current_ref"` + State CatalogBranchExportStatus `db:"state"` + ErrorMessage *string `db:"error_message"` } // nolint: stylecheck diff --git a/catalog/mvcc/cataloger_diff_test.go b/catalog/mvcc/cataloger_diff_test.go index 13e74ec44ba..17332412fcc 100644 --- a/catalog/mvcc/cataloger_diff_test.go +++ b/catalog/mvcc/cataloger_diff_test.go @@ -26,7 +26,7 @@ func TestCataloger_DiffEmpty(t *testing.T) { } commitChanges(10, "Changes on master", "master") - res, hasMore, err := c.Diff(ctx, repository, "master", "master", DiffParams{Limit: 10}) + res, hasMore, err := c.Diff(ctx, repository, "master", "master", catalog.DiffParams{Limit: 10}) testutil.MustDo(t, "Diff", err) if len(res) != 0 { t.Errorf("Diff: got %+v but expected nothing", res) diff --git a/catalog/mvcc/cataloger_export.go b/catalog/mvcc/cataloger_export.go index 3d015c48265..c36610374e2 100644 --- a/catalog/mvcc/cataloger_export.go +++ b/catalog/mvcc/cataloger_export.go @@ -5,88 +5,21 @@ import ( "fmt" "regexp" + "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgconn" - "github.com/lib/pq" - "github.com/georgysavva/scany/pgxscan" "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" ) -// ExportConfiguration describes the export configuration of a branch, as passed on wire, used -// internally, and stored in DB. -type ExportConfiguration struct { - Path string `db:"export_path" json:"export_path"` - StatusPath string `db:"export_status_path" json:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous" json:"is_continuous"` -} - -// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. -// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of -// ExportConfiguration embedded here. -type ExportConfigurationForBranch struct { - Repository string `db:"repository"` - Branch string `db:"branch"` - - Path string `db:"export_path"` - StatusPath string `db:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous"` -} - -type CatalogBranchExportStatus string - -const ( - ExportStatusInProgress = CatalogBranchExportStatus("in-progress") - ExportStatusSuccess = CatalogBranchExportStatus("exported-successfully") - ExportStatusFailed = CatalogBranchExportStatus("export-failed") - ExportStatusRepaired = CatalogBranchExportStatus("export-repaired") - ExportStatusUnknown = CatalogBranchExportStatus("[unknown]") -) - -// ExportStatus describes the current export status of a branch, as passed on wire, used -// internally, and stored in DB. -type ExportStatus struct { - CurrentRef string `db:"current_ref"` - State CatalogBranchExportStatus `db:"state"` -} - -var ErrBadTypeConversion = errors.New("bad type") - -// nolint: stylecheck -func (dst *CatalogBranchExportStatus) Scan(src interface{}) error { - var sc CatalogBranchExportStatus - switch s := src.(type) { - case string: - sc = CatalogBranchExportStatus(strings.ToLower(s)) - case []byte: - sc = CatalogBranchExportStatus(strings.ToLower(string(s))) - default: - return fmt.Errorf("cannot convert %T to CatalogBranchExportStatus: %w", src, ErrBadTypeConversion) - } - - if !(sc == ExportStatusInProgress || sc == ExportStatusSuccess || sc == ExportStatusFailed) { - // not a failure, "just" be a newer enum value than known - *dst = ExportStatusUnknown - return nil - } - *dst = sc - return nil -} - -func (src CatalogBranchExportStatus) Value() (driver.Value, error) { - return string(src), nil -} - -func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (ExportConfiguration, error) { +func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) { ret, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { branchID, err := c.getBranchIDCache(tx, repository, branch) if err != nil { return nil, fmt.Errorf("repository %s branch %s: %w", repository, branch, err) } - var ret ExportConfiguration + var ret catalog.ExportConfiguration err = c.db.Get(&ret, `SELECT export_path, export_status_path, last_keys_in_prefix_regexp, continuous FROM catalog_branches_export @@ -159,7 +92,7 @@ func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportSt return res.(catalog.ExportState), err } -func (c *cataloger) ExportStateSet(repo, branch string, cb ExportStateCallback) error { +func (c *cataloger) ExportStateSet(repo, branch string, cb catalog.ExportStateCallback) error { _, err := c.db.Transact(db.Void(func(tx db.Tx) error { var res catalog.ExportState diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index dead34b702c..d9ed6b6ea73 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -1188,7 +1188,7 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) { type MergeData struct { Repo, Branch string - Result MergeResult + Result catalog.MergeResult } // MergeHookLogger - merge hook that will return an error if set by Err. @@ -1198,7 +1198,7 @@ type MergeHookLogger struct { Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *catalog.MergeResult) error { if h.Err != nil { return h.Err } diff --git a/export/export_handler.go b/export/export_handler.go index 23ede9b9c20..6798037dfba 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -80,7 +80,7 @@ func (h *Handler) start(body *string) error { return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace) } -func (h *Handler) generateTasks(startData StartData, config catalog.Cataloger, finishBodyStr *string, storageNamespace string) error { +func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string, storageNamespace string) error { tasksGenerator := NewTasksGenerator(startData.ExportID, config.Path, getGenerateSuccess(config.LastKeysInPrefixRegexp), finishBodyStr, storageNamespace) var diffs catalog.Differences var err error From b35b36e35bc7017d305fbd6ff1037d8c51030509 Mon Sep 17 00:00:00 2001 From: arielshaqed Date: Wed, 25 Nov 2020 13:26:28 +0200 Subject: [PATCH 15/16] [CR] Flatten if/elses in catalog/mvcc/cataloger_merge_test.go Co-authored-by: Barak Amar --- catalog/mvcc/cataloger_merge_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index d9ed6b6ea73..a01a3ad3b1c 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -140,10 +140,9 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) { } if res == nil { t.Errorf("Merge returned nil, err %s", err) - } else { - if res.Reference != "" { - t.Errorf("Merge reference = %s, expected to be empty", res.Reference) - } + } else if res.Reference != "" { + t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + } } } From 99770f851aa5318a77f3d93073f8b47f83af5d91 Mon Sep 17 00:00:00 2001 From: "Ariel Shaqed (Scolnicov)" Date: Wed, 25 Nov 2020 13:58:05 +0200 Subject: [PATCH 16/16] [CR] pass hook values by value --- catalog/cataloger.go | 4 ++-- catalog/mvcc/cataloger_commit.go | 2 +- catalog/mvcc/cataloger_commit_test.go | 9 +++++---- catalog/mvcc/cataloger_merge.go | 2 +- catalog/mvcc/cataloger_merge_test.go | 9 +++++---- export/export_handler.go | 8 ++++---- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 6bbbc8c8b1d..deafd993c81 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -167,8 +167,8 @@ type ExportConfigurationForBranch struct { IsContinuous bool `db:"continuous"` } -type PostCommitFunc = func(ctx context.Context, repo, branch string, commitLog *CommitLog) error -type PostMergeFunc = func(ctx context.Context, repo, branch string, mergeResult *MergeResult) error +type PostCommitFunc func(ctx context.Context, repo, branch string, commitLog CommitLog) error +type PostMergeFunc func(ctx context.Context, repo, branch string, mergeResult MergeResult) error // CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are // called after the transaction ends; if they return an error they do not affect commit/merge. diff --git a/catalog/mvcc/cataloger_commit.go b/catalog/mvcc/cataloger_commit.go index d2bc0a1299c..165f37d73f4 100644 --- a/catalog/mvcc/cataloger_commit.go +++ b/catalog/mvcc/cataloger_commit.go @@ -83,7 +83,7 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa } commitLog := res.(*catalog.CommitLog) for _, hook := range c.Hooks().PostCommit { - anotherErr := hook(ctx, repository, branch, commitLog) + anotherErr := hook(ctx, repository, branch, *commitLog) if anotherErr != nil && err == nil { err = anotherErr } diff --git a/catalog/mvcc/cataloger_commit_test.go b/catalog/mvcc/cataloger_commit_test.go index cffb47111d3..ec01874eb86 100644 --- a/catalog/mvcc/cataloger_commit_test.go +++ b/catalog/mvcc/cataloger_commit_test.go @@ -290,8 +290,9 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) { } type CommitData struct { - Repo, Branch string - Log catalog.CommitLog + Repo string + Branch string + Log catalog.CommitLog } // CommitHookLogger - commit hook that will return an error if set by Err. @@ -300,8 +301,8 @@ type CommitHookLogger struct { Commits []CommitData } -func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log *catalog.CommitLog) error { - h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log}) +func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log catalog.CommitLog) error { + h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: log}) return nil } diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index 8fd91a1895f..f5ff35a8ba0 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -94,7 +94,7 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran if err == nil { for _, hook := range c.Hooks().PostMerge { - anotherErr := hook(ctx, repository, rightBranch, mergeResult) + anotherErr := hook(ctx, repository, rightBranch, *mergeResult) if anotherErr != nil && err == nil { err = anotherErr } diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index d9ed6b6ea73..c5f19fbff36 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -1187,8 +1187,9 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) { } type MergeData struct { - Repo, Branch string - Result catalog.MergeResult + Repo string + Branch string + Result catalog.MergeResult } // MergeHookLogger - merge hook that will return an error if set by Err. @@ -1198,11 +1199,11 @@ type MergeHookLogger struct { Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *catalog.MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result catalog.MergeResult) error { if h.Err != nil { return h.Err } - h.Merges = append(h.Merges, MergeData{Repo: repo, Branch: branch, Result: *result}) + h.Merges = append(h.Merges, MergeData{Repo: repo, Branch: branch, Result: result}) return nil } diff --git a/export/export_handler.go b/export/export_handler.go index 6798037dfba..014f5027a30 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -338,15 +338,15 @@ func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op inte } // exportCommitHook is a cataloger PostCommit hook for continuous export. -func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log *catalog.CommitLog) error { +func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log catalog.CommitLog) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) - return startExport(l, h.parade, h.cataloger, *log, repo, branch) + return startExport(l, h.parade, h.cataloger, log, repo, branch) } // exportMergeHook is a cataloger PostMerge hook for continuous export. -func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge *catalog.MergeResult) error { +func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge catalog.MergeResult) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference}) - return startExport(l, h.parade, h.cataloger, *merge, repo, branch) + return startExport(l, h.parade, h.cataloger, merge, repo, branch) }