diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 1a50a808f8e..deafd993c81 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -5,7 +5,7 @@ import ( "io" "time" - "github.com/treeverse/lakefs/db" + "github.com/lib/pq" ) const ( @@ -145,24 +145,47 @@ type Cataloger interface { // ExportStateCallback returns the new ref, state and message regarding the old ref and state type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error) +// ExportConfiguration describes the export configuration of a branch, as passed on wire, used +// internally, and stored in DB. +type ExportConfiguration struct { + Path string `db:"export_path" json:"export_path"` + StatusPath string `db:"export_status_path" json:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous" json:"is_continuous"` +} + +// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. +// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of +// ExportConfiguration embedded here. +type ExportConfigurationForBranch struct { + Repository string `db:"repository"` + Branch string `db:"branch"` + + Path string `db:"export_path"` + StatusPath string `db:"export_status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` + IsContinuous bool `db:"continuous"` +} + +type PostCommitFunc func(ctx context.Context, repo, branch string, commitLog CommitLog) error +type PostMergeFunc func(ctx context.Context, repo, branch string, mergeResult MergeResult) error + // CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are -// called in a current transaction context; if they return an error the transaction is rolled -// back. Because these transactions are current, the hook can see the effect the operation only -// on the passed transaction. +// called after the transaction ends; if they return an error they do not affect commit/merge. type CatalogerHooks struct { // PostCommit hooks are called at the end of a commit. - PostCommit []func(ctx context.Context, tx db.Tx, commitLog *CommitLog) error + PostCommit []PostCommitFunc // PostMerge hooks are called at the end of a merge. - PostMerge []func(ctx context.Context, tx db.Tx, mergeResult *MergeResult) error + PostMerge []PostMergeFunc } -func (h *CatalogerHooks) AddPostCommit(f func(context.Context, db.Tx, *CommitLog) error) *CatalogerHooks { +func (h *CatalogerHooks) AddPostCommit(f PostCommitFunc) *CatalogerHooks { h.PostCommit = append(h.PostCommit, f) return h } -func (h *CatalogerHooks) AddPostMerge(f func(context.Context, db.Tx, *MergeResult) error) *CatalogerHooks { +func (h *CatalogerHooks) AddPostMerge(f PostMergeFunc) *CatalogerHooks { h.PostMerge = append(h.PostMerge, f) return h } diff --git a/catalog/export.go b/catalog/export.go index 4a0d7d897a9..fdd7142b21e 100644 --- a/catalog/export.go +++ b/catalog/export.go @@ -4,32 +4,8 @@ import ( "database/sql/driver" "fmt" "strings" - - "github.com/lib/pq" ) -// ExportConfiguration describes the export configuration of a branch, as passed on wire, used -// internally, and stored in DB. -type ExportConfiguration struct { - Path string `db:"export_path" json:"export_path"` - StatusPath string `db:"export_status_path" json:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous" json:"is_continuous"` -} - -// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. -// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of -// ExportConfiguration embedded here. -type ExportConfigurationForBranch struct { - Repository string `db:"repository"` - Branch string `db:"branch"` - - Path string `db:"export_path"` - StatusPath string `db:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` - IsContinuous bool `db:"continuous"` -} - type CatalogBranchExportStatus string const ( @@ -42,15 +18,10 @@ const ( // ExportStatus describes the current export status of a branch, as passed on wire, used // internally, and stored in DB. -type ExportStatus struct { - CurrentRef string `db:"current_ref"` - State CatalogBranchExportStatus -} - type ExportState struct { - CurrentRef string - State CatalogBranchExportStatus - ErrorMessage *string + CurrentRef string `db:"current_ref"` + State CatalogBranchExportStatus `db:"state"` + ErrorMessage *string `db:"error_message"` } // nolint: stylecheck diff --git a/catalog/mvcc/cataloger_commit.go b/catalog/mvcc/cataloger_commit.go index 7757034c418..165f37d73f4 100644 --- a/catalog/mvcc/cataloger_commit.go +++ b/catalog/mvcc/cataloger_commit.go @@ -76,20 +76,20 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa Parents: []string{parentReference}, } - for _, hook := range c.hooks.PostCommit { - err = hook(ctx, tx, commitLog) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } - return commitLog, nil }, c.txOpts(ctx)...) if err != nil { return nil, err } - return res.(*catalog.CommitLog), nil + commitLog := res.(*catalog.CommitLog) + for _, hook := range c.Hooks().PostCommit { + anotherErr := hook(ctx, repository, branch, *commitLog) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + + return commitLog, nil } func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) { diff --git a/catalog/mvcc/cataloger_commit_test.go b/catalog/mvcc/cataloger_commit_test.go index e24b939b6c3..ec01874eb86 100644 --- a/catalog/mvcc/cataloger_commit_test.go +++ b/catalog/mvcc/cataloger_commit_test.go @@ -4,15 +4,14 @@ import ( "context" "errors" "fmt" - "reflect" "strconv" "strings" "testing" "time" - "github.com/davecgh/go-spew/spew" + "github.com/go-test/deep" + "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -112,8 +111,8 @@ func TestCataloger_Commit(t *testing.T) { got.CreationDate = tt.want.CreationDate } } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Commit() got = %s, want = %s", spew.Sdump(got), spew.Sdump(tt.want)) + if diffs := deep.Equal(got, tt.want); diffs != nil { + t.Errorf("unexpected Commit(): %s", diffs) } }) } @@ -290,69 +289,44 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) { } } +type CommitData struct { + Repo string + Branch string + Log catalog.CommitLog +} + // CommitHookLogger - commit hook that will return an error if set by Err. // When no Err is set it will log commit log into Logs. type CommitHookLogger struct { - Err error - Logs []*catalog.CommitLog + Commits []CommitData } -func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.CommitLog) error { - if h.Err != nil { - return h.Err - } - h.Logs = append(h.Logs, log) +func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log catalog.CommitLog) error { + h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: log}) return nil } func TestCataloger_CommitHooks(t *testing.T) { - errHookFailed := errors.New("for testing") - tests := []struct { - name string - path string - hookErr error - wantErr error - }{ - { - name: "no_block", - hookErr: nil, - }, - { - name: "block", - hookErr: errHookFailed, - }, + ctx := context.Background() + c := testCataloger(t) + + // register hooks (more than one to verify all get called) + hooks := make([]CommitHookLogger, 2) + for i := range hooks { + c.Hooks().AddPostCommit(hooks[i].Hook) } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - c := testCataloger(t) - // register hooks (more than one to verify all get called) - hooks := []CommitHookLogger{ - {Err: tt.hookErr}, - {Err: tt.hookErr}, - } - for i := range hooks { - c.Hooks().AddPostCommit(hooks[i].Hook) - } + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") - repository := testCatalogerRepo(t, ctx, c, "repository", "master") - _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") + commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) + if err != nil { + t.Fatalf("Commit err=%s", err) + } - commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) - // check that hook err is the commit error - if !errors.Is(tt.hookErr, err) { - t.Fatalf("Commit err=%s, expected=%s", err, tt.hookErr) - } - // on successful commit the commit log should be found on hook's logs - if err != nil { - return - } - for i := range hooks { - if len(hooks[i].Logs) != 1 || hooks[i].Logs[0] != commitLog { - t.Errorf("hook %d: expected one commit %+v but got logs: %s", i, commitLog, spew.Sprint(hooks[i].Logs)) - } - } - }) + for i := range hooks { + if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { + t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) + } } } diff --git a/catalog/mvcc/cataloger_diff_test.go b/catalog/mvcc/cataloger_diff_test.go index 605faf16c2c..17332412fcc 100644 --- a/catalog/mvcc/cataloger_diff_test.go +++ b/catalog/mvcc/cataloger_diff_test.go @@ -11,6 +11,31 @@ import ( "github.com/treeverse/lakefs/testutil" ) +func TestCataloger_DiffEmpty(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repo", "master") + + // create N files and commit + commitChanges := func(n int, msg, branch string) { + for i := 0; i < n; i++ { + testCatalogerCreateEntry(t, ctx, c, repository, branch, "/file"+strconv.Itoa(i), nil, branch) + } + _, err := c.Commit(ctx, repository, branch, msg, "tester", nil) + testutil.MustDo(t, msg, err) + } + commitChanges(10, "Changes on master", "master") + + res, hasMore, err := c.Diff(ctx, repository, "master", "master", catalog.DiffParams{Limit: 10}) + testutil.MustDo(t, "Diff", err) + if len(res) != 0 { + t.Errorf("Diff: got %+v but expected nothing", res) + } + if hasMore { + t.Errorf("Diff: got *more* diffs but expected nothing") + } +} + func TestCataloger_Diff(t *testing.T) { ctx := context.Background() c := testCataloger(t) diff --git a/catalog/mvcc/cataloger_export.go b/catalog/mvcc/cataloger_export.go index 0418856df8c..c36610374e2 100644 --- a/catalog/mvcc/cataloger_export.go +++ b/catalog/mvcc/cataloger_export.go @@ -6,17 +6,20 @@ import ( "regexp" "github.com/georgysavva/scany/pgxscan" + "github.com/jackc/pgconn" + "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/logging" ) func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (catalog.ExportConfiguration, error) { ret, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { branchID, err := c.getBranchIDCache(tx, repository, branch) - var ret catalog.ExportConfiguration if err != nil { - return nil, err + return nil, fmt.Errorf("repository %s branch %s: %w", repository, branch, err) } + var ret catalog.ExportConfiguration err = c.db.Get(&ret, `SELECT export_path, export_status_path, last_keys_in_prefix_regexp, continuous FROM catalog_branches_export @@ -90,57 +93,67 @@ func (c *cataloger) GetExportState(repo string, branch string) (catalog.ExportSt } func (c *cataloger) ExportStateSet(repo, branch string, cb catalog.ExportStateCallback) error { - _, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { - var res struct { - CurrentRef string - State catalog.CatalogBranchExportStatus - ErrorMessage *string - } + _, err := c.db.Transact(db.Void(func(tx db.Tx) error { + var res catalog.ExportState branchID, err := c.getBranchIDCache(tx, repo, branch) if err != nil { - return nil, err + return err } // get current state err = tx.Get(&res, ` - SELECT current_ref, state, error_message - FROM catalog_branches_export_state - WHERE branch_id=$1 FOR UPDATE`, + SELECT current_ref, state, error_message + FROM catalog_branches_export_state + WHERE branch_id=$1 FOR NO KEY UPDATE`, branchID) missing := errors.Is(err, db.ErrNotFound) if err != nil && !missing { - err = fmt.Errorf("ExportStateSet: failed to get existing state: %w", err) - return nil, err + return fmt.Errorf("ExportStateMarkStart: failed to get existing state: %w", err) } oldRef := res.CurrentRef - state := res.State + oldStatus := res.State + + l := logging.Default().WithFields(logging.Fields{ + "old_ref": oldRef, + "old_status": oldStatus, + "repo": repo, + "branch": branch, + "branch_id": branchID, + }) // run callback - newRef, newState, newMsg, err := cb(oldRef, state) + newRef, newStatus, newMessage, err := cb(oldRef, oldStatus) if err != nil { - return nil, err + return err } + l = l.WithFields(logging.Fields{ + "new_ref": newRef, + "new_status": newStatus, + }) + // update new state - var query string + var tag pgconn.CommandTag if missing { - query = ` - INSERT INTO catalog_branches_export_state (branch_id, current_ref, state, error_message) - VALUES ($1, $2, $3, $4)` + l.Info("insert on DB") + tag, err = tx.Exec(` + INSERT INTO catalog_branches_export_state (branch_id, current_ref, state, error_message) + VALUES ($1, $2, $3, $4)`, + branchID, newRef, newStatus, newMessage) } else { - query = ` - UPDATE catalog_branches_export_state - SET current_ref=$2, state=$3, error_message=$4 - WHERE branch_id=$1` + l.Info("update on DB") + tag, err = tx.Exec(` + UPDATE catalog_branches_export_state + SET current_ref=$2, state=$3, error_message=$4 + WHERE branch_id=$1`, + branchID, newRef, newStatus, newMessage) } - - tag, err := tx.Exec(query, branchID, newRef, newState, newMsg) if err != nil { - return nil, fmt.Errorf("ExportStateSet: update state: %w", err) + return err } if tag.RowsAffected() != 1 { - return nil, fmt.Errorf("ExportStateSet: could not update single row %s: %w", tag, catalog.ErrExportFailed) + return fmt.Errorf("[I] ExportMarkSet: could not update single row %s: %w", tag, catalog.ErrEntryNotFound) } - return nil, err - }) + return err + })) return err } diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index 36bdaafb0a9..f5ff35a8ba0 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -89,15 +89,18 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran return nil, err } mergeResult.Reference = MakeReference(rightBranch, nextCommitID) - for _, hook := range c.hooks.PostMerge { - err = hook(ctx, tx, mergeResult) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } return nil, nil }, c.txOpts(ctx, db.ReadCommitted())...) + + if err == nil { + for _, hook := range c.Hooks().PostMerge { + anotherErr := hook(ctx, repository, rightBranch, *mergeResult) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + } + return mergeResult, err } diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index 855687f4824..c5f19fbff36 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -8,7 +8,6 @@ import ( "github.com/go-test/deep" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -139,8 +138,12 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) { if !errors.Is(err, catalog.ErrConflictFound) { t.Errorf("Merge err = %s, expected conflict with err = %s", err, catalog.ErrConflictFound) } - if res.Reference != "" { - t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + if res == nil { + t.Errorf("Merge returned nil, err %s", err) + } else { + if res.Reference != "" { + t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + } } } @@ -1183,18 +1186,24 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) { } } +type MergeData struct { + Repo string + Branch string + Result catalog.MergeResult +} + // MergeHookLogger - merge hook that will return an error if set by Err. // When no Err is set it will log merge log into Logs. type MergeHookLogger struct { - Err error - Logs []*catalog.MergeResult + Err error + Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result catalog.MergeResult) error { if h.Err != nil { return h.Err } - h.Logs = append(h.Logs, log) + h.Merges = append(h.Merges, MergeData{Repo: repo, Branch: branch, Result: result}) return nil } @@ -1242,9 +1251,13 @@ func TestCataloger_Merge_Hooks(t *testing.T) { {Path: "/file1"}, }) - expected := []*catalog.MergeResult{res} + expected := []MergeData{{ + Repo: repository, + Branch: "master", + Result: *res, + }} for _, hook := range hooks { - if diffs := deep.Equal(expected, hook.Logs); diffs != nil { + if diffs := deep.Equal(expected, hook.Merges); diffs != nil { t.Error("hook received unexpected merge result: ", diffs) } } diff --git a/db/database.go b/db/database.go index 39de9a428e1..3c0a0d0c48f 100644 --- a/db/database.go +++ b/db/database.go @@ -28,6 +28,11 @@ type Database interface { Pool() *pgxpool.Pool } +// Void wraps a procedure with no return value as a TxFunc +func Void(fn func(tx Tx) error) TxFunc { + return func(tx Tx) (interface{}, error) { return nil, fn(tx) } +} + type QueryOptions struct { logger logging.Logger ctx context.Context @@ -157,7 +162,7 @@ func (d *PgxDatabase) Transact(fn TxFunc, opts ...TxOpt) (interface{}, error) { var ret interface{} for attempt < SerializationRetryMaxAttempts { if attempt > 0 { - duration := time.Duration(int(SerializationRetryStartInterval) * attempt) + duration := SerializationRetryStartInterval * time.Duration(attempt) dbRetriesCount.Inc() options.logger. WithField("attempt", attempt). diff --git a/db/tx.go b/db/tx.go index 4a15ef84bc8..4722118930b 100644 --- a/db/tx.go +++ b/db/tx.go @@ -75,8 +75,8 @@ func (d *dbTx) Get(dest interface{}, query string, args ...interface{}) error { }) err := pgxscan.Get(context.Background(), d.tx, dest, query, args...) if pgxscan.NotFound(err) { - // Don't wrap this err: it might come from a different version of pgx and then - // !errors.Is(err, pgx.ErrNoRows). + // This err comes directly from scany, not directly from pgx, so *must* use + // pgxscan.NotFound. log.Trace("SQL query returned no results") return ErrNotFound } diff --git a/export/export.go b/export/export.go index c97758de82f..dc1738d29d9 100644 --- a/export/export.go +++ b/export/export.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" + "github.com/jackc/pgx/v4" nanoid "github.com/matoous/go-nanoid" + "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" "github.com/treeverse/lakefs/catalog" @@ -20,11 +23,14 @@ func getExportID(repo, branch, commitRef string) (string, error) { return fmt.Sprintf("%s-%s-%s-%s", repo, branch, commitRef, nid), nil } -var ErrExportInProgress = errors.New("export currently in progress") +var ( + ErrExportInProgress = errors.New("export currently in progress") + ErrNothingToExport = errors.New("nothing to export") +) // ExportBranchStart inserts a start task on branch, sets branch export state to pending. -// It returns an error if an export is already in progress. -func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo, branch string) (string, error) { +// It returns ErrExportInProgress if an export is already in progress. +func ExportBranchStart(parade parade.Parade, cataloger catalog.Cataloger, repo, branch string) (string, error) { commit, err := cataloger.GetCommit(context.Background(), repo, branch) if err != nil { return "", err @@ -34,13 +40,28 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo if err != nil { return "", err } + l := logging.Default().WithFields(logging.Fields{ + "repo": repo, + "branch": branch, + "commit_ref": commitRef, + "export_id": exportID, + }) + err = cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { + l.WithFields(logging.Fields{ + "initial_state": state, + "old_ref": oldRef, + "old_state": state, + }).Info("export update state") if state == catalog.ExportStatusInProgress { return oldRef, state, nil, ErrExportInProgress } if state == catalog.ExportStatusFailed { return oldRef, state, nil, catalog.ErrExportFailed } + if oldRef == commitRef { + return oldRef, state, nil, ErrNothingToExport + } config, err := cataloger.GetExportConfigurationForBranch(repo, branch) if err != nil { return oldRef, "", nil, err @@ -50,7 +71,10 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo return oldRef, "", nil, err } - err = paradeDB.InsertTasks(context.Background(), tasks) + l.WithFields(logging.Fields{"num_tasks": len(tasks), "target_path": config.Path}). + Info("insert export tasks") + + err = parade.InsertTasks(context.Background(), tasks) if err != nil { return "", "", nil, err } @@ -59,28 +83,78 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo return exportID, err } -var ErrConflictingRefs = errors.New("conflicting references") +var ( + ErrConflictingRefs = errors.New("conflicting references") + ErrWrongStatus = errors.New("incorrect status") +) // ExportBranchDone ends the export branch process by changing the status -func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { - err := cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { +func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { + l := logging.Default().WithFields(logging.Fields{"repo": repo, "branch": branch, "commit_ref": commitRef, "status": status, "status_message": statusMsg}) + + err := cataloger.ExportStateSet(repo, branch, func(oldRef string, oldStatus catalog.CatalogBranchExportStatus) (newRef string, newStatus catalog.CatalogBranchExportStatus, newMessage *string, err error) { if commitRef != oldRef { - return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef:%s, newRef:%s: %w", oldRef, commitRef, ErrConflictingRefs) + return "", "", nil, fmt.Errorf("ExportBranchDone: currentRef: %s, newRef: %s: %w", oldRef, commitRef, ErrConflictingRefs) + } + if oldStatus != catalog.ExportStatusInProgress { + l.WithField("old_state", oldStatus).Error("expected old_state to be in-progress") + return "", "", nil, fmt.Errorf("expected old status to be in-progress not %s: %w", oldStatus, ErrWrongStatus) } + l.WithField("old_state", oldStatus).Info("updating branch export") return oldRef, status, statusMsg, nil }) - return err -} + if err != nil { + return err + } -var ErrRepairWrongStatus = errors.New("incorrect status") + if status == catalog.ExportStatusSuccess { + // Start the next export if continuous. + isContinuous, err := hasContinuousExport(cataloger, repo, branch) + if err != nil { + // Consider branch export failed: it was supposed to be continuous but + // might have stopped. So set an error for the admin to fix before + // re-enabling continuous export. + return err + } + if isContinuous { + l.Info("start new continuous export") + _, err := ExportBranchStart(parade, cataloger, repo, branch) + if errors.Is(err, ErrExportInProgress) { + l.Info("export already in progress when restarting continuous export (unlikely)") + err = nil + } else if errors.Is(err, ErrNothingToExport) { + l.Info("nothing further to export continuously; stop (likely)") + err = nil + } + if err != nil { + return fmt.Errorf("restart continuous export repo %s branch %s: %w", repo, branch, err) + } + return nil + } + return err + } + + return nil +} // ExportBranchRepair changes state from Failed To Repair and starts a new export. // It fails if the current state is not ExportStatusFailed. func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error { return cataloger.ExportStateSet(repo, branch, func(oldRef string, state catalog.CatalogBranchExportStatus) (newRef string, newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { if state != catalog.ExportStatusFailed { - return oldRef, "", nil, ErrRepairWrongStatus + return oldRef, "", nil, ErrWrongStatus } return oldRef, catalog.ExportStatusRepaired, nil, nil }) } + +func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) { + exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch) + if errors.Is(err, db.ErrNotFound) || errors.Is(err, pgx.ErrNoRows) { + return false, nil + } + if err != nil { + return false, fmt.Errorf("check if export configuration is continuous for repo %s branch %s %w", repo, branch, err) + } + return exportConfiguration.IsContinuous, nil +} diff --git a/export/export_handler.go b/export/export_handler.go index e0391d80bbf..014f5027a30 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -9,9 +9,8 @@ import ( "regexp" "strings" - "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" ) @@ -25,11 +24,17 @@ type Handler struct { } func NewHandler(adapter block.Adapter, cataloger catalog.Cataloger, parade parade.Parade) *Handler { - return &Handler{ + ret := &Handler{ adapter: adapter, cataloger: cataloger, parade: parade, } + if cataloger != nil { + hooks := cataloger.Hooks() + hooks.AddPostCommit(ret.exportCommitHook) + hooks.AddPostMerge(ret.exportMergeHook) + } + return ret } type TaskBody struct { @@ -65,6 +70,13 @@ func (h *Handler) start(body *string) error { if err != nil { return err } + logging.Default().WithFields(logging.Fields{ + "repo": startData.Repo, + "branch": startData.Branch, + "from_ref": startData.FromCommitRef, + "to_ref": startData.ToCommitRef, + "export_id": startData.ExportID, + }).Info("action: start export") return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace) } @@ -162,11 +174,11 @@ func getFinishBodyString(repo, branch, commitRef, statusPath string) (string, er CommitRef: commitRef, StatusPath: statusPath, } - finisBody, err := json.Marshal(finishData) + finishBody, err := json.Marshal(finishData) if err != nil { return "", err } - return string(finisBody), nil + return string(finishBody), nil } func (h *Handler) copy(body *string) error { @@ -241,11 +253,19 @@ func (h *Handler) done(body *string, signalledErrors int) error { return err } status, msg := getStatus(signalledErrors) + logging.Default().WithFields(logging.Fields{ + "repo": finishData.Repo, + "branch": finishData.Branch, + "commit_ref": finishData.CommitRef, + "status_path": finishData.StatusPath, + "status": status, + "status_message": msg, + }).Info("action: export done") err = h.updateStatus(finishData, status, signalledErrors) if err != nil { return err } - return ExportBranchDone(h.cataloger, status, msg, finishData.Repo, finishData.Branch, finishData.CommitRef) + return ExportBranchDone(h.parade, h.cataloger, status, msg, finishData.Repo, finishData.Branch, finishData.CommitRef) } var errUnknownAction = errors.New("unknown action") @@ -291,3 +311,42 @@ func (h *Handler) Actions() []string { func (h *Handler) ActorID() parade.ActorID { return actorName } + +func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op interface{}, repo, branch string) error { + isContinuous, err := hasContinuousExport(c, repo, branch) + if err != nil { + // FAIL this commit: if we were meant to export it and did not then in practice + // there was no commit. + return fmt.Errorf("check continuous export for %+v: %w", op, err) + } + if !isContinuous { + return nil + } + exportID, err := ExportBranchStart(p, c, repo, branch) + if err != nil { + l = l.WithError(err) + } + if errors.Is(err, ErrExportInProgress) { + l = l.WithField("skipped", "export already in progress") + err = nil + } else if errors.Is(err, ErrNothingToExport) { + l = l.WithField("skipped", "nothing further to export") + err = nil + } + l.WithField("export_id", exportID).Info("continuous export started") + return err +} + +// exportCommitHook is a cataloger PostCommit hook for continuous export. +func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log catalog.CommitLog) error { + l := logging.Default(). + WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) + return startExport(l, h.parade, h.cataloger, log, repo, branch) +} + +// exportMergeHook is a cataloger PostMerge hook for continuous export. +func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge catalog.MergeResult) error { + l := logging.Default(). + WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference}) + return startExport(l, h.parade, h.cataloger, merge, repo, branch) +} diff --git a/go.sum b/go.sum index a1b7e2c193b..6cd3623f37b 100644 --- a/go.sum +++ b/go.sum @@ -600,6 +600,7 @@ github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= +github.com/jackc/pgconn v1.7.2 h1:195tt17jkjy+FrFlY0pgyrul5kRLb7BGXY3JTrNxeXU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6 h1:geJ1mgTGd0WQo67wEd+H4OjFG5uA2e3cEBz9D5+pftU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=