From ce447f0ce37647b7636f57604afe00d0066a4a89 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Mon, 2 Sep 2024 15:15:20 +0100 Subject: [PATCH] workflow: Use smaller logger dependency with override support --- _examples/callback/callback_test.go | 4 +- .../gettingstarted/gettingstarted_test.go | 4 +- _examples/schedule/schedule_test.go | 10 +- _examples/timeout/timeout_test.go | 4 +- adapters/adaptertest/eventstreaming.go | 5 +- adapters/adaptertest/recordstore.go | 71 ++++++------ adapters/adaptertest/rolescheduler.go | 11 +- adapters/adaptertest/timeoutstore.go | 33 +++--- adapters/kafkastreamer/kafka.go | 5 +- adapters/kafkastreamer/kafka_test.go | 4 +- adapters/memrecordstore/memrecordstore.go | 6 +- adapters/memstreamer/connector.go | 8 +- adapters/reflexstreamer/connector.go | 10 +- adapters/reflexstreamer/reflex_test.go | 11 +- adapters/reflexstreamer/util_test.go | 5 +- await.go | 3 +- await_test.go | 5 +- builder.go | 23 ++++ builder_test.go | 2 +- callback.go | 5 +- callback_internal_test.go | 20 ++-- consumer.go | 8 +- consumer_internal_test.go | 15 ++- delete.go | 6 +- delete_test.go | 15 ++- errors.go | 28 ++--- eventfilter_test.go | 3 +- go.mod | 5 +- go.sum | 18 +-- internal/errorcounter/errorcounter_test.go | 2 +- internal/errors/errors.go | 96 ++++++++++++---- internal/errors/errors_test.go | 105 ++++++++++++++++++ internal/logger/logger.go | 39 +++++++ internal/logger/logger_test.go | 45 ++++++++ logger.go | 6 +- metrics_test.go | 35 +++--- outbox.go | 9 +- run_test.go | 5 +- runstate.go | 7 +- runstate_internal_test.go | 10 +- runstate_test.go | 27 +++-- schedule.go | 9 +- schedule_test.go | 24 ++-- testing.go | 15 ++- testing_test.go | 4 +- timeout.go | 6 +- timeout_internal_test.go | 5 +- trigger.go | 12 +- trigger_internal_test.go | 18 ++- update.go | 7 +- update_internal_test.go | 14 +-- visualiser_test.go | 4 +- workflow.go | 3 +- workflow_test.go | 30 ++--- workflowpb/util.go | 6 +- workflowpb/util_test.go | 5 +- 56 files changed, 568 insertions(+), 327 deletions(-) create mode 100644 internal/errors/errors_test.go create mode 100644 internal/logger/logger.go create mode 100644 internal/logger/logger_test.go diff --git a/_examples/callback/callback_test.go b/_examples/callback/callback_test.go index 3c58a0d..495b9c1 100644 --- a/_examples/callback/callback_test.go +++ b/_examples/callback/callback_test.go @@ -4,11 +4,11 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" "github.com/luno/workflow" "github.com/luno/workflow/adapters/memrecordstore" "github.com/luno/workflow/adapters/memrolescheduler" "github.com/luno/workflow/adapters/memstreamer" + "github.com/stretchr/testify/require" "github.com/luno/workflow/_examples/callback" ) @@ -26,7 +26,7 @@ func TestCallbackWorkflow(t *testing.T) { foreignID := "andrew" runID, err := wf.Trigger(ctx, foreignID, callback.StatusStarted) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.TriggerCallbackOn(t, wf, foreignID, runID, callback.StatusStarted, callback.EmailConfirmationResponse{ Confirmed: true, diff --git a/_examples/gettingstarted/gettingstarted_test.go b/_examples/gettingstarted/gettingstarted_test.go index 8d9e572..d33e240 100644 --- a/_examples/gettingstarted/gettingstarted_test.go +++ b/_examples/gettingstarted/gettingstarted_test.go @@ -4,12 +4,12 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" "github.com/luno/workflow" "github.com/luno/workflow/adapters/memrecordstore" "github.com/luno/workflow/adapters/memrolescheduler" "github.com/luno/workflow/adapters/memstreamer" "github.com/luno/workflow/adapters/memtimeoutstore" + "github.com/stretchr/testify/require" "github.com/luno/workflow/_examples/gettingstarted" ) @@ -28,7 +28,7 @@ func TestWorkflow(t *testing.T) { foreignID := "82347982374982374" _, err := wf.Trigger(ctx, foreignID, gettingstarted.StatusStarted) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.Require(t, wf, foreignID, gettingstarted.StatusReadTheDocs, gettingstarted.GettingStarted{ ReadTheDocs: "✅", diff --git a/_examples/schedule/schedule_test.go b/_examples/schedule/schedule_test.go index 36330de..2b40f22 100644 --- a/_examples/schedule/schedule_test.go +++ b/_examples/schedule/schedule_test.go @@ -2,10 +2,10 @@ package schedule_test import ( "context" + "errors" "testing" "time" - "github.com/luno/jettison/jtest" "github.com/luno/workflow" "github.com/luno/workflow/adapters/memrecordstore" "github.com/luno/workflow/adapters/memrolescheduler" @@ -37,7 +37,7 @@ func TestExampleWorkflow(t *testing.T) { go func() { err := wf.Schedule(foreignID, schedule.StatusStarted, "@hourly") - jtest.RequireNil(t, err) + require.Nil(t, err) }() // Give time for go routine to spin up @@ -45,7 +45,7 @@ func TestExampleWorkflow(t *testing.T) { _, err := recordStore.Latest(ctx, wf.Name, foreignID) // Expect there to be no entries yet - jtest.Require(t, workflow.ErrRecordNotFound, err) + require.True(t, errors.Is(err, workflow.ErrRecordNotFound)) clock.Step(time.Hour) @@ -53,7 +53,7 @@ func TestExampleWorkflow(t *testing.T) { time.Sleep(200 * time.Millisecond) firstScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, "schedule trigger example", firstScheduled.WorkflowName) require.Equal(t, "hourly-run", firstScheduled.ForeignID) @@ -64,7 +64,7 @@ func TestExampleWorkflow(t *testing.T) { time.Sleep(200 * time.Millisecond) secondScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, "schedule trigger example", secondScheduled.WorkflowName) require.Equal(t, "hourly-run", secondScheduled.ForeignID) diff --git a/_examples/timeout/timeout_test.go b/_examples/timeout/timeout_test.go index be32281..5b7ec72 100644 --- a/_examples/timeout/timeout_test.go +++ b/_examples/timeout/timeout_test.go @@ -5,12 +5,12 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/luno/workflow" "github.com/luno/workflow/adapters/memrecordstore" "github.com/luno/workflow/adapters/memrolescheduler" "github.com/luno/workflow/adapters/memstreamer" "github.com/luno/workflow/adapters/memtimeoutstore" + "github.com/stretchr/testify/require" clocktesting "k8s.io/utils/clock/testing" "github.com/luno/workflow/_examples/timeout" @@ -33,7 +33,7 @@ func TestTimeoutWorkflow(t *testing.T) { foreignID := "andrew" runID, err := wf.Trigger(ctx, foreignID, timeout.StatusStarted) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, foreignID, runID, timeout.StatusStarted) diff --git a/adapters/adaptertest/eventstreaming.go b/adapters/adaptertest/eventstreaming.go index 098ac92..82c22ae 100644 --- a/adapters/adaptertest/eventstreaming.go +++ b/adapters/adaptertest/eventstreaming.go @@ -6,7 +6,6 @@ import ( "time" "github.com/google/uuid" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -91,14 +90,14 @@ func RunEventStreamerTest(t *testing.T, constructor workflow.EventStreamer) { CountryCode: "GB", } runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u)) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet) clock.Step(time.Hour) record, err := wf.Await(ctx, foreignID, runId, SyncStatusCompleted) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, "andrew@workflow.com", record.Object.Email) require.Equal(t, SyncStatusCompleted.String(), record.Status.String()) diff --git a/adapters/adaptertest/recordstore.go b/adapters/adaptertest/recordstore.go index bc15dff..474e08c 100644 --- a/adapters/adaptertest/recordstore.go +++ b/adapters/adaptertest/recordstore.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -39,20 +38,20 @@ func testLatest(t *testing.T, factory func() workflow.RecordStore) { maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil } err := store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) latest, err := store.Latest(ctx, expected.WorkflowName, expected.ForeignID) - jtest.RequireNil(t, err) + require.Nil(t, err) recordIsEqual(t, *expected, *latest) expected.Status = int(statusEnd) expected.RunState = workflow.RunStateCompleted err = store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) latest, err = store.Latest(ctx, expected.WorkflowName, expected.ForeignID) - jtest.RequireNil(t, err) + require.Nil(t, err) recordIsEqual(t, *expected, *latest) }) } @@ -65,10 +64,10 @@ func testLookup(t *testing.T, factory func() workflow.RecordStore) { maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil } err := store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) latest, err := store.Lookup(ctx, 1) - jtest.RequireNil(t, err) + require.Nil(t, err) recordIsEqual(t, *expected, *latest) }) @@ -82,27 +81,27 @@ func testStore(t *testing.T, factory func() workflow.RecordStore) { maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil } err := store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) latest, err := store.Lookup(ctx, 1) - jtest.RequireNil(t, err) + require.Nil(t, err) latest.Status = int(statusMiddle) expected.Status = int(statusMiddle) err = store.Store(ctx, latest, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) recordIsEqual(t, *expected, *latest) latest, err = store.Lookup(ctx, 1) - jtest.RequireNil(t, err) + require.Nil(t, err) latest.Status = int(statusEnd) expected.Status = int(statusEnd) err = store.Store(ctx, latest, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) recordIsEqual(t, *expected, *latest) }) @@ -121,10 +120,10 @@ func testListOutboxEvents(t *testing.T, factory func() workflow.RecordStore) { } err := store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) ls, err := store.ListOutboxEvents(ctx, expected.WorkflowName, 1000) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 1, len(ls)) @@ -133,7 +132,7 @@ func testListOutboxEvents(t *testing.T, factory func() workflow.RecordStore) { var r outboxpb.OutboxRecord err = proto.Unmarshal(ls[0].Data, &r) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, int32(statusStarted), r.Type) require.Equal(t, "my_workflow-1", r.Headers[string(workflow.HeaderTopic)]) @@ -155,21 +154,21 @@ func testDeleteOutboxEvent(t *testing.T, factory func() workflow.RecordStore) { } err := store.Store(ctx, expected, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) latest, err := store.Lookup(ctx, 1) - jtest.RequireNil(t, err) + require.Nil(t, err) latest.Status = int(statusMiddle) ls, err := store.ListOutboxEvents(ctx, expected.WorkflowName, 1000) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.DeleteOutboxEvent(ctx, ls[0].ID) - jtest.RequireNil(t, err) + require.Nil(t, err) ls, err = store.ListOutboxEvents(ctx, expected.WorkflowName, 1000) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 0, len(ls)) }) @@ -186,22 +185,22 @@ func testList(t *testing.T, factory func() workflow.RecordStore) { seedCount := 1000 for i := 0; i < seedCount; i++ { err := store.Store(ctx, dummyWireRecord(t), maker) - jtest.RequireNil(t, err) + require.Nil(t, err) } ls, err := store.List(ctx, workflowName, 0, 53, workflow.OrderTypeAscending) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 53, len(ls)) ls2, err := store.List(ctx, workflowName, 53, 100, workflow.OrderTypeAscending) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 100, len(ls2)) // Make sure the last of the first page is not the same as the first of the next page require.NotEqual(t, ls[52].ID, ls2[0]) ls3, err := store.List(ctx, workflowName, 153, seedCount-153, workflow.OrderTypeAscending) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, seedCount-153, len(ls3)) // Make sure the last of the first page is not the same as the first of the next page @@ -209,12 +208,12 @@ func testList(t *testing.T, factory func() workflow.RecordStore) { // Make sure that if 950 is the offset and we only have 1000 then only 1 item would be returned lastPageAsc, err := store.List(ctx, workflowName, 950, 1000, workflow.OrderTypeAscending) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 50, len(lastPageAsc)) require.Equal(t, int64(1000), lastPageAsc[len(lastPageAsc)-1].ID) lastPageDesc, err := store.List(ctx, workflowName, 950, 1000, workflow.OrderTypeDescending) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 50, len(lastPageDesc)) require.Equal(t, int64(1000), lastPageDesc[0].ID) }) @@ -229,20 +228,20 @@ func testList(t *testing.T, factory func() workflow.RecordStore) { wr.ForeignID = foreignID err := store.Store(ctx, wr, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) } } ls, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByForeignID(foreignIDs[0])) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 20, len(ls)) ls2, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByForeignID(foreignIDs[1])) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 20, len(ls2)) ls3, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByForeignID("random")) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 0, len(ls3)) }) @@ -263,13 +262,13 @@ func testList(t *testing.T, factory func() workflow.RecordStore) { wr.RunState = runState err := store.Store(ctx, wr, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) } } for runState, count := range config { ls, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByRunState(runState)) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, count, len(ls), fmt.Sprintf("Expected to have %v entries of %v", count, runState.String())) for _, l := range ls { @@ -292,13 +291,13 @@ func testList(t *testing.T, factory func() workflow.RecordStore) { newRecord.Status = int(status) err := store.Store(ctx, newRecord, maker) - jtest.RequireNil(t, err) + require.Nil(t, err) } } for status, count := range config { ls, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByStatus(int64(status))) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, count, len(ls)) for _, l := range ls { @@ -316,7 +315,7 @@ func dummyWireRecordWithID(t *testing.T, id int64) *workflow.Record { workflowName := "my_workflow" foreignID := "Andrew Wormald" runID, err := uuid.NewUUID() - jtest.RequireNil(t, err) + require.Nil(t, err) type example struct { name string @@ -324,7 +323,7 @@ func dummyWireRecordWithID(t *testing.T, id int64) *workflow.Record { e := example{name: foreignID} b, err := json.Marshal(e) - jtest.RequireNil(t, err) + require.Nil(t, err) createdAt := time.Now() diff --git a/adapters/adaptertest/rolescheduler.go b/adapters/adaptertest/rolescheduler.go index 73df61a..98c357b 100644 --- a/adapters/adaptertest/rolescheduler.go +++ b/adapters/adaptertest/rolescheduler.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -30,7 +29,7 @@ func testReturnedContext(t *testing.T, factory func() workflow.RoleScheduler) { ctxWithValue := context.WithValue(ctx, "parent", "context") ctx2, cancel, err := rs.Await(ctxWithValue, "leader") - jtest.RequireNil(t, err) + require.Nil(t, err) t.Cleanup(cancel) @@ -45,14 +44,14 @@ func testLocking(t *testing.T, factory func() workflow.RoleScheduler) { ctxWithValue := context.WithValue(ctx, "parent", "context") ctx2, cancel, err := rs.Await(ctxWithValue, "leader") - jtest.RequireNil(t, err) + require.Nil(t, err) t.Cleanup(cancel) roleReleased := make(chan bool, 1) go func(done chan bool) { _, _, err := rs.Await(ctxWithValue, "leader") - jtest.RequireNil(t, err) + require.Nil(t, err) roleReleased <- true @@ -78,7 +77,7 @@ func testReleasing(t *testing.T, factory func() workflow.RoleScheduler) { ctx := context.Background() _, cancel, err := rs.Await(ctx, "leader") - jtest.RequireNil(t, err) + require.Nil(t, err) t.Cleanup(cancel) @@ -88,7 +87,7 @@ func testReleasing(t *testing.T, factory func() workflow.RoleScheduler) { roleReleased := make(chan bool, 1) go func(ctx context.Context, done chan bool) { _, _, err := rs.Await(ctx2, "leader") - jtest.RequireNil(t, err) + require.Nil(t, err) roleReleased <- true }(ctx2, roleReleased) diff --git a/adapters/adaptertest/timeoutstore.go b/adapters/adaptertest/timeoutstore.go index ef6b0ba..c233af9 100644 --- a/adapters/adaptertest/timeoutstore.go +++ b/adapters/adaptertest/timeoutstore.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -28,16 +27,16 @@ func testCancelTimeout(t *testing.T, factory func() workflow.TimeoutStore) { ctx := context.Background() err := store.Create(ctx, "example", "andrew", "1", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "2", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "3", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) timeout, err := store.ListValid(ctx, "example", int(statusStarted), time.Now()) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 3, len(timeout)) @@ -63,10 +62,10 @@ func testCancelTimeout(t *testing.T, factory func() workflow.TimeoutStore) { require.WithinDuration(t, time.Now(), timeout[2].CreatedAt, time.Second) err = store.Cancel(ctx, 2) - jtest.RequireNil(t, err) + require.Nil(t, err) timeout, err = store.ListValid(ctx, "example", int(statusStarted), time.Now()) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 2, len(timeout)) @@ -90,16 +89,16 @@ func testCompleteTimeout(t *testing.T, factory func() workflow.TimeoutStore) { ctx := context.Background() err := store.Create(ctx, "example", "andrew", "1", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "2", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "3", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) timeout, err := store.ListValid(ctx, "example", int(statusStarted), time.Now()) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 3, len(timeout)) @@ -125,10 +124,10 @@ func testCompleteTimeout(t *testing.T, factory func() workflow.TimeoutStore) { require.WithinDuration(t, time.Now(), timeout[2].CreatedAt, time.Second) err = store.Complete(ctx, 2) - jtest.RequireNil(t, err) + require.Nil(t, err) timeout, err = store.ListValid(ctx, "example", int(statusStarted), time.Now()) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 2, len(timeout)) @@ -152,16 +151,16 @@ func testListTimeout(t *testing.T, factory func() workflow.TimeoutStore) { ctx := context.Background() err := store.Create(ctx, "example", "andrew", "1", int(statusStarted), time.Now().Add(-time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "2", int(statusMiddle), time.Now().Add(time.Hour)) - jtest.RequireNil(t, err) + require.Nil(t, err) err = store.Create(ctx, "example", "andrew", "3", int(statusEnd), time.Now().Add(time.Hour*2)) - jtest.RequireNil(t, err) + require.Nil(t, err) timeout, err := store.List(ctx, "example") - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, 3, len(timeout)) } diff --git a/adapters/kafkastreamer/kafka.go b/adapters/kafkastreamer/kafka.go index 958ed3f..c96820f 100644 --- a/adapters/kafkastreamer/kafka.go +++ b/adapters/kafkastreamer/kafka.go @@ -2,13 +2,12 @@ package kafkastreamer import ( "context" + "errors" "strconv" "time" - "github.com/luno/jettison/errors" - "github.com/segmentio/kafka-go" - "github.com/luno/workflow" + "github.com/segmentio/kafka-go" ) func New(brokers []string) *StreamConstructor { diff --git a/adapters/kafkastreamer/kafka_test.go b/adapters/kafkastreamer/kafka_test.go index bc696ce..a8e8bb5 100644 --- a/adapters/kafkastreamer/kafka_test.go +++ b/adapters/kafkastreamer/kafka_test.go @@ -6,10 +6,10 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/luno/workflow" "github.com/luno/workflow/adapters/adaptertest" "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/require" "github.com/luno/workflow/adapters/kafkastreamer" ) @@ -58,7 +58,7 @@ func TestConnector(t *testing.T) { Key: []byte(e.ForeignID), } err := writer.WriteMessages(ctx, m) - jtest.RequireNil(t, err) + require.Nil(t, err) } return constructor diff --git a/adapters/memrecordstore/memrecordstore.go b/adapters/memrecordstore/memrecordstore.go index f5c13b8..83010e2 100644 --- a/adapters/memrecordstore/memrecordstore.go +++ b/adapters/memrecordstore/memrecordstore.go @@ -6,10 +6,10 @@ import ( "strconv" "sync" - "github.com/luno/jettison/errors" "k8s.io/utils/clock" "github.com/luno/workflow" + werrors "github.com/luno/workflow/internal/errors" ) func New(opts ...Option) *Store { @@ -70,7 +70,7 @@ func (s *Store) Lookup(ctx context.Context, id int64) (*workflow.Record, error) record, ok := s.store[id] if !ok { - return nil, errors.Wrap(workflow.ErrRecordNotFound, "") + return nil, werrors.Wrap(workflow.ErrRecordNotFound, "") } // Return a new pointer so modifications don't affect the store. @@ -128,7 +128,7 @@ func (s *Store) Latest(ctx context.Context, workflowName, foreignID string) (*wo uk := uniqueKey(workflowName, foreignID) record, ok := s.keyIndex[uk] if !ok { - return nil, errors.Wrap(workflow.ErrRecordNotFound, "") + return nil, werrors.Wrap(workflow.ErrRecordNotFound, "") } // Return a new pointer so modifications don't affect the store. diff --git a/adapters/memstreamer/connector.go b/adapters/memstreamer/connector.go index 6d489d7..6e5f138 100644 --- a/adapters/memstreamer/connector.go +++ b/adapters/memstreamer/connector.go @@ -2,14 +2,14 @@ package memstreamer import ( "context" + "errors" "sync" "time" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/j" "k8s.io/utils/clock" "github.com/luno/workflow" + werrors "github.com/luno/workflow/internal/errors" ) func NewConnector(events []workflow.ConnectorEvent, opts ...Option) *connector { @@ -78,7 +78,7 @@ func (c *consumer) Recv(ctx context.Context) (*workflow.ConnectorEvent, workflow return nil, nil, ctx.Err() } -var errReachedHeadOfStream = errors.New("reached head of stream", j.C("ERR_547682425078cf6d")) +var errReachedHeadOfStream = errors.New("reached head of stream") func (c *consumer) next() (*workflow.ConnectorEvent, error) { c.mu.Lock() @@ -88,7 +88,7 @@ func (c *consumer) next() (*workflow.ConnectorEvent, error) { cursorOffset := c.cursorStore.Get(c.cursorName) if len(log)-1 < cursorOffset { - return nil, errors.Wrap(errReachedHeadOfStream, "") + return nil, werrors.Wrap(errReachedHeadOfStream, "") } return log[cursorOffset], nil diff --git a/adapters/reflexstreamer/connector.go b/adapters/reflexstreamer/connector.go index 6c020e8..ff7764b 100644 --- a/adapters/reflexstreamer/connector.go +++ b/adapters/reflexstreamer/connector.go @@ -4,11 +4,9 @@ import ( "context" "io" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/j" "github.com/luno/reflex" - "github.com/luno/workflow" + werrors "github.com/luno/workflow/internal/errors" ) func NewConnector(streamFn reflex.StreamFunc, cursorStore reflex.CursorStore, t ReflexTranslator) *connector { @@ -30,7 +28,7 @@ type connector struct { func (c *connector) Make(ctx context.Context, name string) (workflow.ConnectorConsumer, error) { cursor, err := c.cursorStore.GetCursor(ctx, name) if err != nil { - return nil, errors.Wrap(err, "failed to collect cursor") + return nil, werrors.Wrap(err, "failed to collect cursor") } streamClient, err := c.streamFn(ctx, cursor) @@ -68,7 +66,7 @@ func (c consumer) Recv(ctx context.Context) (*workflow.ConnectorEvent, workflow. return event, func() error { // Increment cursor for consumer only if ack function is called. if err := c.cursorStore.SetCursor(ctx, c.cursorName, event.ID); err != nil { - return errors.Wrap(err, "failed to set cursor", j.MKV{ + return werrors.WrapWithMeta(err, "failed to set cursor", map[string]string{ "cursor_name": c.cursorName, "event_id": reflexEvent.ID, "event_fid": reflexEvent.ForeignID, @@ -87,7 +85,7 @@ func (c consumer) Close() error { // Provide new context for flushing of cursor values to underlying store err := c.cursorStore.Flush(context.Background()) if err != nil { - return errors.Wrap(err, "failed here") + return werrors.Wrap(err, "failed to flush cursor") } if closer, ok := c.streamClient.(io.Closer); ok { diff --git a/adapters/reflexstreamer/reflex_test.go b/adapters/reflexstreamer/reflex_test.go index dd3546d..8acadb1 100644 --- a/adapters/reflexstreamer/reflex_test.go +++ b/adapters/reflexstreamer/reflex_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" "github.com/luno/reflex" "github.com/luno/reflex/rpatterns" "github.com/luno/reflex/rsql" @@ -100,7 +99,7 @@ func TestStreamFunc(t *testing.T) { fid := "23847923847" _, err := wf.Trigger(ctx, fid, statusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow") @@ -131,7 +130,7 @@ func TestStreamFunc(t *testing.T) { ) err = reflex.Run(ctx, spec) - jtest.Require(t, context.Canceled, err) + require.Equal(t, context.Canceled, err) } func TestConnector(t *testing.T) { @@ -142,14 +141,14 @@ func TestConnector(t *testing.T) { ctx := context.Background() tx, err := dbc.BeginTx(ctx, nil) - jtest.RequireNil(t, err) + require.Nil(t, err) for _, event := range seedEvents { notify, err := eventsTable.Insert(ctx, tx, event.ForeignID, reflexstreamer.EventType(1)) if err != nil { originalErr := err err = tx.Rollback() - jtest.RequireNil(t, err) + require.Nil(t, err) t.Fatal("failed to insert event", event.ForeignID, originalErr.Error()) } @@ -157,7 +156,7 @@ func TestConnector(t *testing.T) { } err = tx.Commit() - jtest.RequireNil(t, err) + require.Nil(t, err) return reflexstreamer.NewConnector(eventsTable.ToStream(dbc), cTable.ToStore(dbc), reflexstreamer.DefaultReflexTranslator) }) diff --git a/adapters/reflexstreamer/util_test.go b/adapters/reflexstreamer/util_test.go index e0b07f8..a2be1d6 100644 --- a/adapters/reflexstreamer/util_test.go +++ b/adapters/reflexstreamer/util_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/corverroos/truss" - "github.com/luno/jettison/jtest" "github.com/luno/reflex" "github.com/stretchr/testify/require" @@ -63,10 +62,10 @@ func TestDefaultTranslators(t *testing.T) { } connectorEvent, err := reflexstreamer.DefaultReflexTranslator(&reflexEvent) - jtest.RequireNil(t, err) + require.Nil(t, err) translated, err := reflexstreamer.DefaultConnectorTranslator(connectorEvent) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, reflexEvent.ID, translated.ID) require.Equal(t, reflexEvent.Type.ReflexType(), translated.Type.ReflexType()) diff --git a/await.go b/await.go index 7e88a41..c2495f6 100644 --- a/await.go +++ b/await.go @@ -2,10 +2,9 @@ package workflow import ( "context" + "errors" "strconv" "time" - - "github.com/luno/jettison/errors" ) func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) { diff --git a/await_test.go b/await_test.go index 64a3187..1411023 100644 --- a/await_test.go +++ b/await_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -35,10 +34,10 @@ func TestAwait(t *testing.T) { t.Cleanup(wf.Stop) runID, err := wf.Trigger(ctx, "1", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) res, err := wf.Await(ctx, "1", runID, StatusEnd, workflow.WithAwaitPollingFrequency(10*time.Nanosecond)) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, StatusEnd, res.Status) require.Equal(t, "hello world", *res.Object) diff --git a/builder.go b/builder.go index 93b45d0..0a1e1e1 100644 --- a/builder.go +++ b/builder.go @@ -2,12 +2,14 @@ package workflow import ( "context" + "os" "time" "k8s.io/utils/clock" "github.com/luno/workflow/internal/errorcounter" "github.com/luno/workflow/internal/graph" + "github.com/luno/workflow/internal/logger" ) const ( @@ -31,6 +33,7 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status] statusGraph: graph.New(), errorCounter: errorcounter.New(), internalState: make(map[string]State), + logger: logger.New(os.Stdout), }, } } @@ -217,6 +220,7 @@ type buildOptions struct { defaultOptions options outboxConfig outboxConfig timeoutStore TimeoutStore + logger Logger } func defaultBuildOptions() buildOptions { @@ -228,24 +232,40 @@ func defaultBuildOptions() buildOptions { type BuildOption func(w *buildOptions) +// WithTimeoutStore allows the configuration of a TimeoutStore which is required when using timeouts in a workflow. It is +// not required by default as timeouts are less common of a feature requirement but when needed the abstraction +// of complexity of handling scheduling, expiring, and executing are incredibly useful and is included as one of the +// three key feature offerings of workflow which are sequential steps, callbacks, and timeouts. func WithTimeoutStore(s TimeoutStore) BuildOption { return func(w *buildOptions) { w.timeoutStore = s } } +// WithClock allows the configuring of workflow's use and access of time. Instead of using time.Now() and other +// associated functionality from the time package a clock is used instead in order to make it testable. func WithClock(c clock.Clock) BuildOption { return func(bo *buildOptions) { bo.clock = c } } +// WithDebugMode enabled debug mode for a workflow which results in increased logs such as when processes ar launched, +// shutdown, events are skipped etc. func WithDebugMode() BuildOption { return func(bo *buildOptions) { bo.debugMode = true } } +// WithLogger allows for specifying a custom logger. The default is to use a wrapped version of log/slog's Logger. +func WithLogger(l Logger) BuildOption { + return func(bo *buildOptions) { + bo.logger = l + } +} + +// WithDefaultOptions applies the provided options to the entire workflow and not just to an individual process. func WithDefaultOptions(opts ...Option) BuildOption { return func(bo *buildOptions) { var o options @@ -257,6 +277,9 @@ func WithDefaultOptions(opts ...Option) BuildOption { } } +// WithCustomDelete allows for specifying a custom deleter function for scrubbing PII data when a workflow Run enters +// RunStateRequestedDataDeleted and is the function that once executed successfully allows for the RunState to move to +// RunStateDataDeleted. func WithCustomDelete[Type any](fn func(object *Type) error) BuildOption { return func(bo *buildOptions) { bo.customDelete = func(wr *Record) ([]byte, error) { diff --git a/builder_test.go b/builder_test.go index 414b790..60aaf67 100644 --- a/builder_test.go +++ b/builder_test.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "errors" "fmt" "io" "reflect" @@ -9,7 +10,6 @@ import ( "testing" "time" - "github.com/luno/jettison/errors" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" diff --git a/callback.go b/callback.go index f72f4bb..f3d77d3 100644 --- a/callback.go +++ b/callback.go @@ -6,7 +6,7 @@ import ( "io" "strconv" - internal_errors "github.com/luno/workflow/internal/errors" + werrors "github.com/luno/workflow/internal/errors" ) type callback[Type any, Status StatusType] struct { @@ -43,8 +43,7 @@ func processCallback[Type any, Status StatusType]( ) error { wr, err := latest(ctx, w.Name, foreignID) if err != nil { - - return internal_errors.Wrap(err, "failed to latest record for callback", map[string]string{ + return werrors.WrapWithMeta(err, "failed to find latest record for callback", map[string]string{ "foreign_id": foreignID, }) } diff --git a/callback_internal_test.go b/callback_internal_test.go index 1178439..cf292ab 100644 --- a/callback_internal_test.go +++ b/callback_internal_test.go @@ -2,12 +2,11 @@ package workflow import ( "context" + "errors" "io" "testing" "time" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -27,7 +26,7 @@ func TestProcessCallback(t *testing.T) { value := "data" b, err := Marshal(&value) - jtest.RequireNil(t, err) + require.Nil(t, err) current := &Record{ ID: 1, @@ -79,7 +78,7 @@ func TestProcessCallback(t *testing.T) { } err := processCallback(ctx, w, testStatus(current.Status), callbackFn, current.ForeignID, nil, latestLookup, store, updater) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "callbackFunc": 1, @@ -119,7 +118,7 @@ func TestProcessCallback(t *testing.T) { } err := processCallback(ctx, w, testStatus(current.Status), callbackFn, current.ForeignID, nil, latestLookup, store, updater) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "callbackFunc": 1, @@ -157,7 +156,7 @@ func TestProcessCallback(t *testing.T) { } err := processCallback(ctx, w, testStatus(current.Status), callbackFn, current.ForeignID, nil, latestLookup, store, updater) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "callbackFunc": 1, @@ -168,12 +167,13 @@ func TestProcessCallback(t *testing.T) { }) t.Run("Return on lookup error", func(t *testing.T) { + testErr := errors.New("test error") latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) { - return nil, errors.New("test error") + return nil, testErr } err := processCallback(ctx, w, testStatus(current.Status), nil, current.ForeignID, nil, latestLookup, nil, nil) - jtest.Require(t, errors.New("failed to latest record for callback"), err) + require.Truef(t, errors.Is(err, testErr), "actual: %s", err.Error()) }) t.Run("Return on callbackFunc error", func(t *testing.T) { @@ -186,7 +186,7 @@ func TestProcessCallback(t *testing.T) { }) err := processCallback(ctx, w, testStatus(current.Status), callbackFn, current.ForeignID, nil, latestLookup, nil, nil) - jtest.Require(t, errors.New("test error"), err) + require.Equal(t, errors.New("test error"), err) }) t.Run("Ignore if record is in different state", func(t *testing.T) { @@ -199,6 +199,6 @@ func TestProcessCallback(t *testing.T) { } err := processCallback(ctx, w, statusStart, nil, current.ForeignID, nil, latestLookup, nil, nil) - jtest.RequireNil(t, err) + require.Nil(t, err) }) } diff --git a/consumer.go b/consumer.go index 5d60b59..310080f 100644 --- a/consumer.go +++ b/consumer.go @@ -2,11 +2,11 @@ package workflow import ( "context" - internal_errors "github.com/luno/workflow/internal/errors" + "errors" "strconv" "time" - "github.com/luno/jettison/errors" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/metrics" ) @@ -238,7 +238,7 @@ func consume[Type any, Status StatusType]( originalErr := err _, err := record.Pause(ctx) if err != nil { - return internal_errors.Wrap(err, "failed to pause record after exceeding allowed error count", map[string]string{ + return werrors.WrapWithMeta(err, "failed to pause record after exceeding allowed error count", map[string]string{ "workflow_name": record.WorkflowName, "foreign_id": record.ForeignID, "current_status": record.Status.String(), @@ -252,7 +252,7 @@ func consume[Type any, Status StatusType]( } } - return internal_errors.Wrap(err, "failed to consume", map[string]string{ + return werrors.WrapWithMeta(err, "failed to consume", map[string]string{ "workflow_name": record.WorkflowName, "foreign_id": record.ForeignID, "current_status": record.Status.String(), diff --git a/consumer_internal_test.go b/consumer_internal_test.go index 309005b..e4d2cb5 100644 --- a/consumer_internal_test.go +++ b/consumer_internal_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -24,7 +23,7 @@ func TestConsume(t *testing.T) { value := "data" b, err := Marshal(&value) - jtest.RequireNil(t, err) + require.Nil(t, err) current := &Record{ ID: 1, @@ -76,7 +75,7 @@ func TestConsume(t *testing.T) { } err := consume(ctx, w, currentRecord, consumer, ack, store, updater, "processName", 0) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "consumerFunc": 1, @@ -116,7 +115,7 @@ func TestConsume(t *testing.T) { } err := consume(ctx, w, current, consumer, ack, store, updater, "processName", 0) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "consumerFunc": 1, @@ -155,7 +154,7 @@ func TestConsume(t *testing.T) { } err := consume(ctx, w, current, consumer, ack, store, updater, "processName", 0) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "consumerFunc": 1, @@ -199,13 +198,13 @@ func TestConsume(t *testing.T) { } err := consume(ctx, w, current, consumer, ack, store, updater, "processName", 3) - jtest.Require(t, testErr, err) + require.True(t, errors.Is(err, testErr)) err = consume(ctx, w, current, consumer, ack, store, updater, "processName", 3) - jtest.Require(t, testErr, err) + require.True(t, errors.Is(err, testErr)) err = consume(ctx, w, current, consumer, ack, store, updater, "processName", 3) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedCalls := map[string]int{ "consumerFunc": 3, diff --git a/delete.go b/delete.go index aa03b86..78be2f0 100644 --- a/delete.go +++ b/delete.go @@ -2,12 +2,12 @@ package workflow import ( "context" - internal_errors "github.com/luno/workflow/internal/errors" + "errors" "time" - "github.com/luno/jettison/errors" "k8s.io/utils/clock" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/metrics" ) @@ -91,7 +91,7 @@ func DeleteForever( record.RunState = RunStateDataDeleted err = updateWireRecord(ctx, store, record, RunStateRequestedDataDeleted) if err != nil { - return internal_errors.Wrap(err, "unable to delete record data", map[string]string{}) + return werrors.Wrap(err, "unable to delete record data") } metrics.ProcessLatency.WithLabelValues(workflowName, processName).Observe(clock.Since(t2).Seconds()) diff --git a/delete_test.go b/delete_test.go index 7ae2dc7..7a4eed1 100644 --- a/delete_test.go +++ b/delete_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "k8s.io/utils/clock" @@ -44,7 +43,7 @@ func TestDeleteForever(t *testing.T) { } b, err := workflow.Marshal(&o) - jtest.RequireNil(t, err) + require.Nil(t, err) return &workflow.Record{ Object: b, @@ -55,7 +54,7 @@ func TestDeleteForever(t *testing.T) { var o object err := workflow.Unmarshal(wr.Object, &o) - jtest.RequireNil(t, err) + require.Nil(t, err) o.pii = "" @@ -78,7 +77,7 @@ func TestDeleteForever(t *testing.T) { } b, err := workflow.Marshal(&o) - jtest.RequireNil(t, err) + require.Nil(t, err) return &workflow.Record{ Object: b, @@ -115,7 +114,7 @@ func TestDeleteForever(t *testing.T) { workflowName := "example" producer, err := streamer.NewProducer(ctx, workflow.DeleteTopic(workflowName)) - jtest.RequireNil(t, err) + require.Nil(t, err) t.Cleanup(func() { producer.Close() }) @@ -128,16 +127,16 @@ func TestDeleteForever(t *testing.T) { workflow.HeaderRunState: workflow.RunStateRequestedDataDeleted.String(), workflow.HeaderPreviousRunState: workflow.RunStateCompleted.String(), }) - jtest.RequireNil(t, err) + require.Nil(t, err) consumer, err := streamer.NewConsumer(ctx, workflow.DeleteTopic(workflowName), "consumer-1") - jtest.RequireNil(t, err) + require.Nil(t, err) t.Cleanup(func() { consumer.Close() }) err = workflow.DeleteForever(ctx, workflowName, "process-name", consumer, tc.storeFn, tc.lookupFn, tc.deleteFn, time.Hour, clock.RealClock{}) - jtest.Require(t, tc.expectedErr, err) + require.True(t, errors.Is(err, tc.expectedErr)) }) } } diff --git a/errors.go b/errors.go index c407e27..a45bee5 100644 --- a/errors.go +++ b/errors.go @@ -1,24 +1,18 @@ package workflow import ( - "github.com/luno/jettison/errors" - "github.com/luno/jettison/j" + "github.com/luno/workflow/internal/errors" ) var ( - ErrRecordNotFound = errors.New("record not found", j.C("ERR_6d982e73339f351a")) - ErrTimeoutNotFound = errors.New("timeout not found", j.C("ERR_2f6c8edf63f7ac8d")) - ErrWorkflowInProgress = errors.New("current workflow still in progress - retry once complete", j.C("ERR_cd79765555450db7")) - ErrWorkflowNotRunning = errors.New("trigger failed - workflow is not running", j.C("ERR_6b414d1eb843a681")) - ErrStatusProvidedNotConfigured = errors.New("status provided is not configured for workflow", j.C("ERR_169c7465995cf7aa")) - ErrOutboxRecordNotFound = errors.New("outbox record not found", j.C("ERR_1ef1afdf9f7ae684")) - ErrUnableToPause = errors.New("run is unable to be paused", j.C("ERR_3b776661fe2c56c7")) - ErrUnableToResume = errors.New("run is unable to be resumed", j.C("ERR_fdbedb1059368f3e")) - ErrUnableToCancel = errors.New("run is unable to be cancelled", j.C("ERR_9128169c3d47eb1d")) - ErrUnableToDelete = errors.New("cannot delete data as run has not finished", j.C("ERR_2dec819246977dd9")) - ErrUnableToMarkAsRunning = errors.New("run is unable to be marked as Running", j.C("ERR_704b88a1eddad3dc")) + ErrRecordNotFound = errors.New("record not found") + ErrTimeoutNotFound = errors.New("timeout not found") + ErrWorkflowInProgress = errors.New("current workflow still in progress - retry once complete") + ErrWorkflowNotRunning = errors.New("trigger failed - workflow is not running") + ErrStatusProvidedNotConfigured = errors.New("status provided is not configured for workflow") + ErrOutboxRecordNotFound = errors.New("outbox record not found") + ErrUnableToPause = errors.New("run is unable to be paused") + ErrUnableToResume = errors.New("run is unable to be resumed") + ErrUnableToCancel = errors.New("run is unable to be cancelled") + ErrUnableToDelete = errors.New("cannot delete data as run has not finished") ) - -type Error interface { - error -} diff --git a/eventfilter_test.go b/eventfilter_test.go index 6f30f1d..160f11e 100644 --- a/eventfilter_test.go +++ b/eventfilter_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/google/uuid" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" ) @@ -59,7 +58,7 @@ func TestShardNonNumerical(t *testing.T) { ) for i := 0; i < total; i++ { uid, err := uuid.NewUUID() - jtest.RequireNil(t, err) + require.Nil(t, err) e := &ConnectorEvent{ ID: uid.String(), diff --git a/go.mod b/go.mod index 5442327..387a463 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.3 require ( github.com/google/uuid v1.3.0 - github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9 github.com/prometheus/client_golang v1.15.0 github.com/robfig/cron/v3 v3.0.0 github.com/stretchr/testify v1.8.3 @@ -16,7 +15,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-stack/stack v1.8.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -24,8 +22,7 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect golang.org/x/sys v0.9.0 // indirect - golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d0d95b4..a6800c1 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,6 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= -github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -17,14 +15,16 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9 h1:ykQ0/3gmOFypz1tNTyisrEM51rW7bx/UCGqEqrm/VyM= -github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9/go.mod h1:Fc8KRfQEydIud0rjBHUNxUyyEKSidxP0kOlg4Dwn3JA= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM= @@ -39,27 +39,21 @@ github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= -github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624Y= -github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI= -github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= -github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/utils v0.0.0-20230505201702-9f6742963106 h1:EObNQ3TW2D+WptiYXlApGNLVy0zm/JIBVY9i+M4wpAU= diff --git a/internal/errorcounter/errorcounter_test.go b/internal/errorcounter/errorcounter_test.go index a7057d7..798f294 100644 --- a/internal/errorcounter/errorcounter_test.go +++ b/internal/errorcounter/errorcounter_test.go @@ -1,9 +1,9 @@ package errorcounter_test import ( + "errors" "testing" - "github.com/luno/jettison/errors" "github.com/stretchr/testify/require" "github.com/luno/workflow/internal/errorcounter" diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 8042cfa..eb6f8a9 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -2,45 +2,97 @@ package errors import ( "errors" - "slices" - "strings" ) +// Error is a trivial implementation of error that allows key value +// metadata to accompany the error along with support for wrapping errors. +// Error's Error method formats the wrapped errors and collected metadata. +// Note that the metadata is kept at a single level and thus keys need to +// be unique if they are not to over-written. type Error struct { - err error + msg string - // Meta contains fields that help pin down and find where the error originates and/or + // child relates to the next error in the linked-list of errors or "chain" or errors. + child error + + // meta contains fields that help pin down and find where the error originates and/or // what it relates to. - Meta map[string]string + meta map[string]string +} + +func (e *Error) Unwrap() error { + return e.child +} + +func (e *Error) Is(target error) bool { + if target == nil { + return e.msg == "" && e.child == nil + } + + return e.msg == target.Error() } -func Wrap(err error, msg string, meta map[string]string) *Error { +func New(msg string) error { + if msg == "" { + return nil + } + return &Error{ - err: errors.Join(errors.New(msg), err), - Meta: meta, + msg: msg, + meta: make(map[string]string), } } -func (e *Error) Error() string { - var sb strings.Builder - sb.WriteString(e.err.Error()) +func Wrap(err error, msg string) error { + return WrapWithMeta(err, msg, map[string]string{}) +} - keys := make([]string, 0, len(e.Meta)) - for key := range e.Meta { - keys = append(keys, key) +func WrapWithMeta(err error, msg string, meta map[string]string) error { + if err == nil { + return nil } - slices.Sort(keys) + e, isInternalError := err.(*Error) + if isInternalError { + var newErr Error + newErr.meta = make(map[string]string) + for k, v := range meta { + newErr.meta[k] = v + } - for i, key := range keys { - if i > 0 { - sb.WriteString(", ") + for k, v := range e.meta { + newErr.meta[k] = v } - sb.WriteString(key) - sb.WriteString(": ") - sb.WriteString(e.Meta[key]) + newErr.msg = msg + newErr.child = err + // metadata is kept as a single copy at the top level to avoid needing to traverse the tree to collect + // the meta key value pairs. The trade-off here is losing record of where the key value pairs originate from + // and having a top level namespacing restriction. + e.meta = nil + + return &newErr + } + + return &Error{ + msg: msg, + child: err, + meta: meta, + } +} + +func (e *Error) Error() string { + if e.child == nil && e.msg != "" { + return e.msg + } + + if e.msg == "" { + return e.child.Error() } - return sb.String() + return errors.Join(errors.New(e.msg), e.child).Error() +} + +func (e *Error) Meta() map[string]string { + return e.meta } diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go new file mode 100644 index 0000000..6479bc3 --- /dev/null +++ b/internal/errors/errors_test.go @@ -0,0 +1,105 @@ +package errors_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + + werrors "github.com/luno/workflow/internal/errors" +) + +func TestError_Is(t *testing.T) { + testErr := errors.New("level 1") + err := werrors.Wrap(testErr, "level 2") + require.True(t, errors.Is(err, testErr)) + + e, ok := err.(*werrors.Error) + require.True(t, ok) + require.False(t, e.Is(nil)) +} + +func TestWrappingNil(t *testing.T) { + require.Nil(t, werrors.Wrap(nil, "")) + require.Nil(t, werrors.WrapWithMeta(nil, "", map[string]string{})) +} + +func TestNewBlankMsgNilError(t *testing.T) { + require.Nil(t, werrors.New("")) +} + +func TestWrap(t *testing.T) { + testCases := []struct { + name string + err error + msg string + expected string + }{ + { + name: "Test error wrapped with message", + err: errors.New("test error"), + msg: "", + expected: "test error", + }, + { + name: "Test error wrapped with message", + err: werrors.New("test error"), + msg: "test", + expected: "test\ntest error", + }, + { + name: "Test error wrapped with message", + err: werrors.Wrap(werrors.New("level 1"), "level 2"), + msg: "level 3", + expected: "level 3\nlevel 2\nlevel 1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := werrors.Wrap(tc.err, tc.msg) + require.Equal(t, tc.expected, err.Error()) + }) + } +} + +func TestWrapWithMeta(t *testing.T) { + testCases := []struct { + name string + err error + msg string + meta map[string]string + expectedError string + expectedMeta map[string]string + }{ + { + name: "Test error wrapped with message and meta", + err: errors.New("test error"), + msg: "test", + meta: map[string]string{"key1": "value1", "key2": "value2"}, + expectedError: "test\ntest error", + expectedMeta: map[string]string{"key1": "value1", "key2": "value2"}, + }, + { + name: "Test wrapped error with message and wrapped meta", + err: werrors.WrapWithMeta(werrors.New("level 1"), "level 2", map[string]string{ + "wrappedKey": "wrappedValue", + }), + msg: "level 3", + meta: map[string]string{"key1": "value1", "key2": "value2"}, + expectedError: "level 3\nlevel 2\nlevel 1", + expectedMeta: map[string]string{"key1": "value1", "key2": "value2", "wrappedKey": "wrappedValue"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := werrors.WrapWithMeta(tc.err, tc.msg, tc.meta) + require.Equal(t, tc.expectedError, err.Error()) + + e, ok := err.(*werrors.Error) + require.True(t, ok) + require.Equal(t, tc.expectedMeta, e.Meta()) + }) + } +} diff --git a/internal/logger/logger.go b/internal/logger/logger.go new file mode 100644 index 0000000..f462b7c --- /dev/null +++ b/internal/logger/logger.go @@ -0,0 +1,39 @@ +package logger + +import ( + "context" + "io" + "log/slog" + + "github.com/luno/workflow/internal/errors" +) + +type logger struct { + log *slog.Logger +} + +func (l logger) Debug(ctx context.Context, msg string, meta map[string]string) { + l.log.DebugContext(ctx, msg, "meta", meta) +} + +func (l logger) Error(ctx context.Context, err error, meta map[string]string) { + e, isInternalError := err.(*errors.Error) + if isInternalError { + for k, v := range e.Meta() { + meta[k] = v + } + } + + l.log.ErrorContext(ctx, err.Error(), "meta", meta) +} + +func New(w io.Writer) *logger { + // LevelDebug is set by default as the workflow has a debug configuration. + opts := slog.HandlerOptions{ + Level: slog.LevelDebug, + } + sl := slog.New(slog.NewJSONHandler(w, &opts)) + return &logger{ + log: sl, + } +} diff --git a/internal/logger/logger_test.go b/internal/logger/logger_test.go new file mode 100644 index 0000000..562de6a --- /dev/null +++ b/internal/logger/logger_test.go @@ -0,0 +1,45 @@ +package logger_test + +import ( + "bytes" + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + werrors "github.com/luno/workflow/internal/errors" + "github.com/luno/workflow/internal/logger" +) + +func TestLoggerDebug(t *testing.T) { + var buf bytes.Buffer + log := logger.New(&buf) + + ctx := context.Background() + log.Debug(ctx, "test message", map[string]string{"key": "value"}) + + require.Contains(t, buf.String(), "\"level\":\"DEBUG\",\"msg\":\"test message\",\"meta\":{\"key\":\"value\"}") +} + +func TestLogger_Error(t *testing.T) { + var buf bytes.Buffer + log := logger.New(&buf) + + ctx := context.Background() + log.Error(ctx, errors.New("test error"), map[string]string{"key": "value"}) + + require.Contains(t, buf.String(), "\"level\":\"ERROR\",\"msg\":\"test error\",\"meta\":{\"key\":\"value\"}") +} + +func TestLogger_InternalError(t *testing.T) { + var buf bytes.Buffer + log := logger.New(&buf) + + ctx := context.Background() + log.Error(ctx, werrors.WrapWithMeta(werrors.New("test error"), "", map[string]string{ + "errorMetaKey": "errorMetaValue", + }), map[string]string{}) + + require.Contains(t, buf.String(), "\"level\":\"ERROR\",\"msg\":\"test error\",\"meta\":{\"errorMetaKey\":\"errorMetaValue\"}") +} diff --git a/logger.go b/logger.go index 97c2efe..8e1258b 100644 --- a/logger.go +++ b/logger.go @@ -4,10 +4,10 @@ import "context" type Logger interface { // Debug will be used by workflow for debug logs when in debug mode. - Debug(ctx context.Context, msg string, meta MKV) + Debug(ctx context.Context, msg string, meta map[string]string) // Error is used when writing errors to the logs. - Error(ctx context.Context, err error) + Error(ctx context.Context, err error, meta map[string]string) } -// MKV is a multiple key value store for the logger to format into its output. +// MKV is alias for ma[string]string to simplify the passing of Multiple Key Values to the logger. type MKV map[string]string diff --git a/metrics_test.go b/metrics_test.go index bf9f035..5edef31 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -2,14 +2,13 @@ package workflow_test import ( "context" + "errors" "strings" "sync" "testing" "time" "github.com/google/uuid" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/jtest" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -46,13 +45,13 @@ func runWorkflow(t *testing.T) *workflow.Workflow[string, status] { ctx := context.Background() uid, err := uuid.NewUUID() - jtest.RequireNil(t, err) + require.Nil(t, err) runID := uid.String() var s string payload, err := workflow.Marshal(&s) - jtest.RequireNil(t, err) + require.Nil(t, err) err = update(ctx, recordStore, &workflow.Record{ WorkflowName: "example", @@ -63,7 +62,7 @@ func runWorkflow(t *testing.T) *workflow.Workflow[string, status] { Object: payload, CreatedAt: clock.Now(), }) - jtest.RequireNil(t, err) + require.Nil(t, err) // 1 hour = 3600 seconds clock.Step(time.Hour) @@ -89,7 +88,7 @@ workflow_process_lag_seconds{process_name="start-consumer-1-of-1",workflow_name= ` err := testutil.CollectAndCompare(metrics.ConsumerLag, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) metrics.ConsumerLag.Reset() } @@ -123,7 +122,7 @@ workflow_process_states{process_name="start-consumer-1-of-1",workflow_name="exam ` err := testutil.CollectAndCompare(metrics.ProcessStates, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) w.Stop() @@ -138,7 +137,7 @@ workflow_process_states{process_name="outbox-consumer-2-of-2",workflow_name="exa ` err = testutil.CollectAndCompare(metrics.ProcessStates, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) metrics.ProcessStates.Reset() } @@ -222,7 +221,7 @@ workflow_process_states{process_name="outbox-consumer-1-of-1",workflow_name="exa ` err := testutil.CollectAndCompare(metrics.ProcessStates, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) scheduler.mu.Lock() scheduler.allow = true @@ -241,7 +240,7 @@ workflow_process_states{process_name="outbox-consumer-1-of-1",workflow_name="exa ` err = testutil.CollectAndCompare(metrics.ProcessStates, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) wf.Stop() @@ -256,7 +255,7 @@ workflow_process_states{process_name="outbox-consumer-1-of-1",workflow_name="exa ` err = testutil.CollectAndCompare(metrics.ProcessStates, strings.NewReader(expected)) - jtest.RequireNil(t, err) + require.Nil(t, err) metrics.ProcessStates.Reset() } @@ -298,13 +297,13 @@ func TestMetricProcessErrors(t *testing.T) { ctx := context.Background() uid, err := uuid.NewUUID() - jtest.RequireNil(t, err) + require.Nil(t, err) runID := uid.String() var s string payload, err := workflow.Marshal(&s) - jtest.RequireNil(t, err) + require.Nil(t, err) err = update(ctx, recordStore, &workflow.Record{ WorkflowName: "example", @@ -315,7 +314,7 @@ func TestMetricProcessErrors(t *testing.T) { Object: payload, CreatedAt: clock.Now(), }) - jtest.RequireNil(t, err) + require.Nil(t, err) wf.Run(ctx) t.Cleanup(wf.Stop) @@ -358,7 +357,7 @@ func TestRunStateChanges(t *testing.T) { t.Cleanup(w.Stop) _, err := w.Trigger(ctx, "983467934", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) time.Sleep(time.Millisecond * 500) @@ -391,13 +390,13 @@ func TestMetricProcessSkippedEvents(t *testing.T) { t.Cleanup(w.Stop) _, err := w.Trigger(ctx, "9834679343", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) _, err = w.Trigger(ctx, "2349839483", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) _, err = w.Trigger(ctx, "7548702398", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) time.Sleep(time.Millisecond * 500) diff --git a/outbox.go b/outbox.go index d0caf17..e71a0cc 100644 --- a/outbox.go +++ b/outbox.go @@ -2,14 +2,13 @@ package workflow import ( "context" - internal_errors "github.com/luno/workflow/internal/errors" "strconv" "time" - "github.com/luno/jettison/errors" "google.golang.org/protobuf/proto" "k8s.io/utils/clock" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/metrics" "github.com/luno/workflow/internal/outboxpb" ) @@ -125,7 +124,7 @@ func purgeOutbox[Type any, Status StatusType]( var outboxRecord outboxpb.OutboxRecord err := proto.Unmarshal(e.Data, &outboxRecord) if err != nil { - return internal_errors.Wrap(err, "Unable to proto unmarshal outbox record", map[string]string) + return werrors.Wrap(err, "Unable to proto unmarshal outbox record") } headers := make(map[Header]string) @@ -154,12 +153,12 @@ func purgeOutbox[Type any, Status StatusType]( topic := headers[HeaderTopic] producer, err := streamer.NewProducer(ctx, topic) if err != nil { - return internal_errors.Wrap(err, "Unable to construct new producer for outbox purging", map[string]string{}) + return werrors.Wrap(err, "Unable to construct new producer for outbox purging") } err = producer.Send(ctx, event.ForeignID, event.Type, event.Headers) if err != nil { - return internal_errors.Wrap(err, "Unable to send outbox event to event streamer", map[string]string{}) + return werrors.Wrap(err, "Unable to send outbox event to event streamer") } err = recordStore.DeleteOutboxEvent(ctx, event.ID) diff --git a/run_test.go b/run_test.go index c411270..52f3f7c 100644 --- a/run_test.go +++ b/run_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -15,10 +14,10 @@ func TestNewTestingRun(t *testing.T) { ctx := context.Background() pauseStatus, err := r.Pause(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, status(workflow.SkipTypeRunStateUpdate), pauseStatus) cancelStatus, err := r.Cancel(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, status(workflow.SkipTypeRunStateUpdate), cancelStatus) } diff --git a/runstate.go b/runstate.go index 232977e..d01d24c 100644 --- a/runstate.go +++ b/runstate.go @@ -2,8 +2,9 @@ package workflow import ( "context" - internal_errors "github.com/luno/workflow/internal/errors" "strconv" + + werrors "github.com/luno/workflow/internal/errors" ) type RunState int @@ -131,7 +132,7 @@ func (rsc *runStateControllerImpl) update(ctx context.Context, rs RunState, inva func validateRunStateTransition(record *Record, runState RunState, sentinelErr error) error { valid, ok := runStateTransitions[record.RunState] if !ok { - return internal_errors.Wrap(sentinelErr, "current run state is terminal", map[string]string{ + return werrors.WrapWithMeta(sentinelErr, "current run state is terminal", map[string]string{ "record_id": strconv.FormatInt(record.ID, 10), "workflow_name": record.WorkflowName, "run_state": record.RunState.String(), @@ -141,7 +142,7 @@ func validateRunStateTransition(record *Record, runState RunState, sentinelErr e if !valid[runState] { msg := "Current run state cannot transition to " + runState.String() - return internal_errors.Wrap(sentinelErr, msg, map[string]string{ + return werrors.WrapWithMeta(sentinelErr, msg, map[string]string{ "record_id": strconv.FormatInt(record.ID, 10), "workflow_name": record.WorkflowName, "run_state": record.RunState.String(), diff --git a/runstate_internal_test.go b/runstate_internal_test.go index f209ca9..6449428 100644 --- a/runstate_internal_test.go +++ b/runstate_internal_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" + "github.com/stretchr/testify/require" ) func TestNoopRunStateController(t *testing.T) { @@ -12,14 +12,14 @@ func TestNoopRunStateController(t *testing.T) { ctx := context.Background() err := ctrl.Pause(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) err = ctrl.Resume(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) err = ctrl.Cancel(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) err = ctrl.DeleteData(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) } diff --git a/runstate_test.go b/runstate_test.go index a875208..1b05793 100644 --- a/runstate_test.go +++ b/runstate_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -72,7 +71,7 @@ func TestRunState(t *testing.T) { // Trigger workflow before it's running to assert that the initial state is workflow.RunStateInitiated runID, err := w.Trigger(ctx, "fid", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) time.Sleep(time.Second) @@ -125,7 +124,7 @@ func TestWorkflowRunStateController(t *testing.T) { Name: "Andrew Wormald", Car: "Audi", }) - jtest.RequireNil(t, err) + require.Nil(t, err) ctx := context.Background() workflowName := "test-workflow" @@ -140,49 +139,49 @@ func TestWorkflowRunStateController(t *testing.T) { }) record, err := recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) time.Sleep(time.Millisecond * 500) record, err = recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, workflow.RunStateInitiated, record.RunState) wr, err := recordStore.Lookup(ctx, 1) - jtest.RequireNil(t, err) + require.Nil(t, err) rsc := workflow.NewRunStateController(recordStore.Store, wr) err = rsc.Pause(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) record, err = recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, workflow.RunStatePaused, record.RunState) err = rsc.Resume(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) record, err = recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, workflow.RunStateRunning, record.RunState) err = rsc.Cancel(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) record, err = recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, workflow.RunStateCancelled, record.RunState) err = rsc.DeleteData(ctx) - jtest.RequireNil(t, err) + require.Nil(t, err) record, err = recordStore.Latest(ctx, workflowName, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, workflow.RunStateRequestedDataDeleted, record.RunState) } diff --git a/schedule.go b/schedule.go index 94209a5..7cd6a70 100644 --- a/schedule.go +++ b/schedule.go @@ -2,23 +2,24 @@ package workflow import ( "context" + "errors" "fmt" - internal_errors "github.com/luno/workflow/internal/errors" "strconv" "time" - "github.com/luno/jettison/errors" "github.com/robfig/cron/v3" "k8s.io/utils/clock" + + werrors "github.com/luno/workflow/internal/errors" ) func (w *Workflow[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, opts ...ScheduleOption[Type, Status]) error { if !w.calledRun { - return internal_errors.Wrap(ErrWorkflowNotRunning, "ensure Run() is called before attempting to trigger the workflow", map[string]string{}) + return werrors.Wrap(ErrWorkflowNotRunning, "ensure Run() is called before attempting to trigger the workflow") } if !w.statusGraph.IsValid(int(startingStatus)) { - return internal_errors.Wrap(ErrStatusProvidedNotConfigured, fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name), map[string]string{}) + return werrors.Wrap(ErrStatusProvidedNotConfigured, fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name)) } var options scheduleOpts[Type, Status] diff --git a/schedule_test.go b/schedule_test.go index d5d926e..3169705 100644 --- a/schedule_test.go +++ b/schedule_test.go @@ -2,11 +2,11 @@ package workflow_test import ( "context" + "errors" "sync" "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -47,7 +47,7 @@ func TestSchedule(t *testing.T) { go func() { err := wf.Schedule("andrew", StatusStart, "@monthly") - jtest.RequireNil(t, err) + require.Nil(t, err) }() // Allow scheduling to take place @@ -55,7 +55,7 @@ func TestSchedule(t *testing.T) { _, err := recordStore.Latest(ctx, workflowName, "andrew") // Expect there to be no entries yet - jtest.Require(t, workflow.ErrRecordNotFound, err) + require.True(t, errors.Is(err, workflow.ErrRecordNotFound)) // Grab the time from the clock for expectation as to the time we expect the entry to have expectedTimestamp := time.Date(2023, time.May, 1, 0, 0, 0, 0, time.UTC) @@ -65,10 +65,10 @@ func TestSchedule(t *testing.T) { time.Sleep(200 * time.Millisecond) firstScheduled, err := recordStore.Latest(ctx, workflowName, "andrew") - jtest.RequireNil(t, err) + require.Nil(t, err) _, err = wf.Await(ctx, firstScheduled.ForeignID, firstScheduled.RunID, StatusEnd) - jtest.RequireNil(t, err) + require.Nil(t, err) expectedTimestamp = time.Date(2023, time.June, 1, 0, 0, 0, 0, time.UTC) clock.SetTime(expectedTimestamp) @@ -77,7 +77,7 @@ func TestSchedule(t *testing.T) { time.Sleep(200 * time.Millisecond) secondScheduled, err := recordStore.Latest(ctx, workflowName, "andrew") - jtest.RequireNil(t, err) + require.Nil(t, err) require.NotEqual(t, firstScheduled.RunID, secondScheduled.RunID) } @@ -106,7 +106,7 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) { go func() { wg.Done() err := wf.Schedule("andrew", StatusStart, "@monthly") - jtest.RequireNil(t, err) + require.Nil(t, err) }() wg.Wait() @@ -168,7 +168,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) { go func() { err := wf.Schedule("andrew", StatusStart, "@monthly", opt) - jtest.RequireNil(t, err) + require.Nil(t, err) }() // Allow scheduling to take place @@ -176,7 +176,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) { _, err := recordStore.Latest(ctx, workflowName, "andrew") // Expect there to be no entries yet - jtest.Require(t, workflow.ErrRecordNotFound, err) + require.True(t, errors.Is(err, workflow.ErrRecordNotFound)) // Grab the time from the clock for expectation as to the time we expect the entry to have expectedTimestamp := time.Date(2023, time.May, 1, 0, 0, 0, 0, time.UTC) @@ -187,7 +187,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) { _, err = recordStore.Latest(ctx, workflowName, "andrew") // Expect there to be no entries yet - jtest.Require(t, workflow.ErrRecordNotFound, err) + require.True(t, errors.Is(err, workflow.ErrRecordNotFound)) // Disable the filter to enable scheduling *shouldSkip = true @@ -199,10 +199,10 @@ func TestWorkflow_ScheduleFilter(t *testing.T) { time.Sleep(200 * time.Millisecond) latest, err := recordStore.Latest(ctx, workflowName, "andrew") - jtest.RequireNil(t, err) + require.Nil(t, err) resp, err := wf.Await(ctx, latest.ForeignID, latest.RunID, StatusEnd) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, expectedTimestamp, resp.CreatedAt) } diff --git a/testing.go b/testing.go index 53c6414..732c14b 100644 --- a/testing.go +++ b/testing.go @@ -4,11 +4,10 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "testing" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" ) @@ -20,13 +19,13 @@ func TriggerCallbackOn[Type any, Status StatusType, Payload any](t testing.TB, w ctx := context.TODO() _, err := w.Await(ctx, foreignID, runID, waitFor) - jtest.RequireNil(t, err) + require.Nil(t, err) b, err := json.Marshal(p) - jtest.RequireNil(t, err) + require.Nil(t, err) err = w.Callback(ctx, foreignID, waitFor, bytes.NewReader(b)) - jtest.RequireNil(t, err) + require.Nil(t, err) } func AwaitTimeoutInsert[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status], foreignID, runID string, waitFor Status) { @@ -41,7 +40,7 @@ func AwaitTimeoutInsert[Type any, Status StatusType](t testing.TB, w *Workflow[T } ls, err := w.timeoutStore.List(w.ctx, w.Name) - jtest.RequireNil(t, err) + require.Nil(t, err) for _, l := range ls { if l.Status != int(waitFor) { @@ -83,7 +82,7 @@ func Require[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status if errors.Is(err, ErrRecordNotFound) { continue } else { - jtest.RequireNil(t, err) + require.Nil(t, err) } runID = latest.RunID @@ -108,7 +107,7 @@ func Require[Type any, Status StatusType](t testing.TB, w *Workflow[Type, Status var typ Type err := json.Unmarshal(wr.Object, &typ) - jtest.RequireNil(t, err) + require.Nil(t, err) actual := &Run[Type, Status]{ Record: *wr, diff --git a/testing_test.go b/testing_test.go index f7fa57d..67d7a58 100644 --- a/testing_test.go +++ b/testing_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" + "github.com/stretchr/testify/require" "github.com/luno/workflow" "github.com/luno/workflow/adapters/memrecordstore" @@ -43,7 +43,7 @@ func TestRequireForCircularStatus(t *testing.T) { fid := "10298309123" _, err := wf.Trigger(ctx, fid, StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.Require(t, wf, fid, StatusStart, Counter{Count: 0}) workflow.Require(t, wf, fid, StatusMiddle, Counter{Count: 1}) diff --git a/timeout.go b/timeout.go index 40383a6..83d9e86 100644 --- a/timeout.go +++ b/timeout.go @@ -2,10 +2,10 @@ package workflow import ( "context" - internal_errors "github.com/luno/workflow/internal/errors" "strconv" "time" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/metrics" ) @@ -123,7 +123,7 @@ func processTimeout[Type any, Status StatusType]( originalErr := err _, err := record.Pause(ctx) if err != nil { - return internal_errors.Wrap(err, "failed to pause record after exceeding allowed error count", map[string]string{ + return werrors.WrapWithMeta(err, "failed to pause record after exceeding allowed error count", map[string]string{ "workflow_name": record.WorkflowName, "foreign_id": record.ForeignID, "current_status": record.Status.String(), @@ -136,7 +136,7 @@ func processTimeout[Type any, Status StatusType]( return nil } } - return internal_errors.Wrap(err, "failed to process timeout", map[string]string{ + return werrors.WrapWithMeta(err, "failed to process timeout", map[string]string{ "workflow_name": record.WorkflowName, "foreign_id": record.ForeignID, "current_status": record.Status.String(), diff --git a/timeout_internal_test.go b/timeout_internal_test.go index e3f4db4..bbcc56d 100644 --- a/timeout_internal_test.go +++ b/timeout_internal_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -24,7 +23,7 @@ func TestProcessTimeout(t *testing.T) { value := "data" b, err := Marshal(&value) - jtest.RequireNil(t, err) + require.Nil(t, err) type calls struct { updater func(ctx context.Context, current testStatus, next testStatus, record *Run[string, testStatus]) error @@ -193,7 +192,7 @@ func TestProcessTimeout(t *testing.T) { } err := processTimeout(ctx, w, timeout, tc.record, tr, tc.caller(calls).completeFunc, tc.caller(calls).store, tc.caller(calls).updater, "processName", 1) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, tc.expectedCalls, calls) }) diff --git a/trigger.go b/trigger.go index 5481448..6f5f18c 100644 --- a/trigger.go +++ b/trigger.go @@ -2,12 +2,12 @@ package workflow import ( "context" + "errors" "fmt" - internal_errors "github.com/luno/workflow/internal/errors" "github.com/google/uuid" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/j" + + werrors "github.com/luno/workflow/internal/errors" ) func (w *Workflow[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status]) (runID string, err error) { @@ -16,11 +16,11 @@ func (w *Workflow[Type, Status]) Trigger(ctx context.Context, foreignID string, func trigger[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], lookup latestLookup, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status]) (runID string, err error) { if !w.calledRun { - return "", internal_errors.Wrap(ErrWorkflowNotRunning, "ensure Run() is called before attempting to trigger the workflow", map[string]string{}) + return "", werrors.Wrap(ErrWorkflowNotRunning, "ensure Run() is called before attempting to trigger the workflow") } if !w.statusGraph.IsValid(int(startingStatus)) { - return "", internal_errors.Wrap(ErrStatusProvidedNotConfigured, fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name), map[string]string{}) + return "", werrors.Wrap(ErrStatusProvidedNotConfigured, fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name)) } var o triggerOpts[Type, Status] @@ -48,7 +48,7 @@ func trigger[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, // Check that the last run has completed before triggering a new run. if lastRecord.RunState.Valid() && !lastRecord.RunState.Finished() { // Cannot trigger a new run for this foreignID if there is a workflow in progress. - return "", errors.Wrap(ErrWorkflowInProgress, "", j.MKV{ + return "", werrors.WrapWithMeta(ErrWorkflowInProgress, "", map[string]string{ "run_id": lastRecord.RunID, "run_state": lastRecord.RunState.String(), "status": Status(lastRecord.Status).String(), diff --git a/trigger_internal_test.go b/trigger_internal_test.go index 97a8e24..3f19c51 100644 --- a/trigger_internal_test.go +++ b/trigger_internal_test.go @@ -2,9 +2,13 @@ package workflow import ( "context" + "errors" + "fmt" "testing" - "github.com/luno/jettison/jtest" + "github.com/stretchr/testify/require" + + werrors "github.com/luno/workflow/internal/errors" ) func Test_trigger(t *testing.T) { @@ -17,7 +21,13 @@ func Test_trigger(t *testing.T) { t.Run("Expected ErrWorkflowNotRunning when Trigger called before Run()", func(t *testing.T) { ctx := context.Background() _, err := trigger(ctx, w, nil, "1", statusStart) - jtest.Require(t, ErrWorkflowNotRunning, err) + + eval, is := err.(*werrors.Error) + if is { + fmt.Println(*eval) + } + + require.Truef(t, errors.Is(err, ErrWorkflowNotRunning), "actual: %s", err.Error()) }) t.Run("Expects ErrStatusProvidedNotConfigured when starting status is not configured", func(t *testing.T) { @@ -25,7 +35,7 @@ func Test_trigger(t *testing.T) { w.calledRun = true _, err := trigger(ctx, w, nil, "1", statusEnd) - jtest.Require(t, ErrStatusProvidedNotConfigured, err) + require.True(t, errors.Is(err, ErrStatusProvidedNotConfigured)) }) t.Run("Expects ErrWorkflowInProgress if a workflow run is already in progress", func(t *testing.T) { @@ -41,6 +51,6 @@ func Test_trigger(t *testing.T) { Status: int(statusMiddle), }, nil }, "1", statusStart) - jtest.Require(t, ErrWorkflowInProgress, err) + require.True(t, errors.Is(err, ErrWorkflowInProgress)) }) } diff --git a/update.go b/update.go index 60f1eb6..cc65c83 100644 --- a/update.go +++ b/update.go @@ -3,10 +3,9 @@ package workflow import ( "context" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/j" "k8s.io/utils/clock" + "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/graph" "github.com/luno/workflow/internal/metrics" ) @@ -74,7 +73,7 @@ func validateTransition[Status StatusType](current, next Status, graph *graph.Gr // Lookup all available transitions from the current status nodes := graph.Transitions(int(current)) if len(nodes) == 0 { - return errors.New("current status not predefined", j.MKV{ + return errors.WrapWithMeta(errors.New("current status not predefined"), "", map[string]string{ "current_status": current.String(), }) } @@ -90,7 +89,7 @@ func validateTransition[Status StatusType](current, next Status, graph *graph.Gr // If no valid transition matches that of the next status then error. if !found { - return errors.New("invalid transition attempted", j.MKV{ + return errors.WrapWithMeta(errors.New("invalid transition attempted"), "", map[string]string{ "current_status": current.String(), "next_status": next.String(), }) diff --git a/update_internal_test.go b/update_internal_test.go index a7a6b62..b6e5c10 100644 --- a/update_internal_test.go +++ b/update_internal_test.go @@ -2,14 +2,14 @@ package workflow import ( "context" + "errors" "testing" "time" - "github.com/luno/jettison/errors" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/graph" ) @@ -94,9 +94,9 @@ func TestUpdater(t *testing.T) { { name: "Return error on lookup", lookup: func(ctx context.Context, id int64) (*Record, error) { - return nil, errors.New("lookup error") + return nil, werrors.New("lookup error") }, - expectedErr: errors.New("lookup error"), + expectedErr: werrors.New("lookup error"), }, { name: "Exit early if lookup record status has changed", @@ -129,7 +129,7 @@ func TestUpdater(t *testing.T) { To: int(statusEnd), }, }, - expectedErr: errors.New("invalid transition attempted"), + expectedErr: werrors.New("invalid transition attempted"), }, } for _, tc := range testCases { @@ -144,13 +144,13 @@ func TestUpdater(t *testing.T) { store := func(ctx context.Context, r *Record, maker OutboxEventDataMaker) error { require.Equal(t, tc.expectedRunState, r.RunState) _, err := maker(1) - jtest.RequireNil(t, err) + require.Nil(t, err) return nil } updater := newUpdater[string, testStatus](tc.lookup, store, g, c) err := updater(ctx, tc.current, tc.update.Status, &tc.update) - jtest.Require(t, tc.expectedErr, err) + require.True(t, errors.Is(err, tc.expectedErr)) }) } } diff --git a/visualiser_test.go b/visualiser_test.go index 9d5f871..4257e35 100644 --- a/visualiser_test.go +++ b/visualiser_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "github.com/luno/jettison/jtest" + "github.com/stretchr/testify/require" "github.com/luno/workflow" ) @@ -23,5 +23,5 @@ func TestVisualiser(t *testing.T) { wf := b.Build(nil, nil, nil) err := workflow.MermaidDiagram(wf, "./testdata/graph-visualisation.md", workflow.LeftToRightDirection) - jtest.RequireNil(t, err) + require.Nil(t, err) } diff --git a/workflow.go b/workflow.go index 852a91d..36fa167 100644 --- a/workflow.go +++ b/workflow.go @@ -10,6 +10,7 @@ import ( "k8s.io/utils/clock" "github.com/luno/workflow/internal/errorcounter" + werrors "github.com/luno/workflow/internal/errors" "github.com/luno/workflow/internal/graph" "github.com/luno/workflow/internal/metrics" ) @@ -203,7 +204,7 @@ func runOnce[Type any, Status StatusType](w *Workflow[Type, Status], role, proce // and if the parent context was cancelled then that will exit safely. return nil } else if err != nil { - w.logger.Error(ctx, errors.Join(errors.New("process error"), err), MKV{ + w.logger.Error(ctx, werrors.Wrap(err, "process error"), MKV{ "role": role, }) metrics.ProcessErrors.WithLabelValues(w.Name, processName).Inc() diff --git a/workflow_test.go b/workflow_test.go index 8d1bc57..03c2604 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -3,13 +3,13 @@ package workflow_test import ( "context" "encoding/json" + "errors" "fmt" "io" "strconv" "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" clock_testing "k8s.io/utils/clock/testing" @@ -134,7 +134,7 @@ func TestWorkflowAcceptanceTest(t *testing.T) { } runID, err := wf.Trigger(ctx, fid, StatusInitiated, workflow.WithInitialValue[MyType, status](&mt)) - jtest.RequireNil(t, err) + require.Nil(t, err) // Once in the correct status, trigger third party callbacks workflow.TriggerCallbackOn(t, wf, fid, runID, StatusEmailConfirmationSent, ExternalEmailVerified{ @@ -156,15 +156,15 @@ func TestWorkflowAcceptanceTest(t *testing.T) { clock.Step(time.Hour) _, err = wf.Await(ctx, fid, runID, StatusCompleted) - jtest.RequireNil(t, err) + require.Nil(t, err) r, err := recordStore.Latest(ctx, "user sign up", fid) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, int(expectedFinalStatus), r.Status) var actual MyType err = workflow.Unmarshal(r.Object, &actual) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, expectedUserID, actual.UserID) require.Equal(t, strconv.FormatInt(expectedUserID, 10), actual.ForeignID()) @@ -223,7 +223,9 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) { } for range b.N { _, err := wf.Trigger(ctx, fid, 0, workflow.WithInitialValue[MyType, status](&mt)) - jtest.RequireNil(b, err) + if err != nil { + b.Fatal(err) + } workflow.Require(b, wf, fid, status(numberOfSteps), MyType{ UserID: expectedUserID, @@ -265,7 +267,7 @@ func TestTimeout(t *testing.T) { start := time.Now() runID, err := wf.Trigger(ctx, "example", StatusInitiated) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, "example", runID, StatusProfileCreated) @@ -273,7 +275,7 @@ func TestTimeout(t *testing.T) { clock.Step(time.Hour) _, err = wf.Await(ctx, "example", runID, StatusCompleted) - jtest.RequireNil(t, err) + require.Nil(t, err) end := time.Now() @@ -390,10 +392,10 @@ func TestWorkflow_ErrWorkflowNotRunning(t *testing.T) { }) _, err := wf.Trigger(ctx, "andrew", StatusStart) - jtest.Require(t, workflow.ErrWorkflowNotRunning, err) + require.True(t, errors.Is(err, workflow.ErrWorkflowNotRunning)) err = wf.Schedule("andrew", StatusStart, "@monthly") - jtest.Require(t, workflow.ErrWorkflowNotRunning, err) + require.True(t, errors.Is(err, workflow.ErrWorkflowNotRunning)) } func TestWorkflow_TestingRequire(t *testing.T) { @@ -425,7 +427,7 @@ func TestWorkflow_TestingRequire(t *testing.T) { foreignID := "andrew" _, err := wf.Trigger(ctx, foreignID, StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) expected := MyType{ Email: "andrew@workflow.com", @@ -478,7 +480,7 @@ func TestTimeTimerFunc(t *testing.T) { t.Cleanup(wf.Stop) runID, err := wf.Trigger(ctx, "Andrew Wormald", StatusStart) - jtest.RequireNil(t, err) + require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, "Andrew Wormald", runID, StatusStart) @@ -592,12 +594,12 @@ func TestStepConsumerLag(t *testing.T) { _, err := wf.Trigger(ctx, foreignID, StatusStart, workflow.WithInitialValue[TimeWatcher, status](&TimeWatcher{ StartTime: clock.Now(), })) - jtest.RequireNil(t, err) + require.Nil(t, err) time.Sleep(time.Second) latest, err := recordStore.Latest(ctx, wf.Name, foreignID) - jtest.RequireNil(t, err) + require.Nil(t, err) // Ensure that the record has not been consumer or updated require.Equal(t, int64(1), latest.ID) diff --git a/workflowpb/util.go b/workflowpb/util.go index 7a594de..c8e11a5 100644 --- a/workflowpb/util.go +++ b/workflowpb/util.go @@ -1,17 +1,17 @@ package workflowpb import ( - "github.com/luno/jettison/errors" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/luno/workflow" + werrors "github.com/luno/workflow/internal/errors" ) func ProtoMarshal(r *workflow.Record) ([]byte, error) { pb, err := proto.Marshal(ToProto(r)) if err != nil { - return nil, errors.Wrap(err, "failed to proto marshal record") + return nil, werrors.Wrap(err, "failed to proto marshal record") } return pb, nil @@ -35,7 +35,7 @@ func UnmarshalRecord(b []byte) (*workflow.Record, error) { var wpb Record err := proto.Unmarshal(b, &wpb) if err != nil { - return nil, errors.Wrap(err, "failed to proto marshal record") + return nil, werrors.Wrap(err, "failed to proto marshal record") } return &workflow.Record{ diff --git a/workflowpb/util_test.go b/workflowpb/util_test.go index 7948834..5529965 100644 --- a/workflowpb/util_test.go +++ b/workflowpb/util_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/workflow" @@ -26,10 +25,10 @@ func TestProtoMarshalAndUnmarshal(t *testing.T) { } protoBytes, err := workflowpb.ProtoMarshal(&wireRecord) - jtest.RequireNil(t, err) + require.Nil(t, err) deserialised, err := workflowpb.UnmarshalRecord(protoBytes) - jtest.RequireNil(t, err) + require.Nil(t, err) require.Equal(t, wireRecord, *deserialised) }