From 83e8694e04f4e75f4b8bf71f32de620fba375c39 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Wed, 22 Mar 2023 08:33:49 -0500 Subject: [PATCH] Tracking reasons time-series (#540) * tracking reasons time-series Signed-off-by: Daniel Rammer * bumped flyteidl dep and added comment Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 +-- .../transformers/task_execution.go | 21 +++++++++++++++ .../transformers/task_execution_test.go | 27 ++++++++++++++++++- 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index d346fee51..5f099c8ae 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.9 + github.com/flyteorg/flyteidl v1.3.13 github.com/flyteorg/flyteplugins v1.0.40 github.com/flyteorg/flytepropeller v1.1.70 github.com/flyteorg/flytestdlib v1.0.15 diff --git a/go.sum b/go.sum index 7e54d5e84..1da6f45e1 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= -github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.13 h1:jOjiHl6jmSCOGC094QaRdSjjhThhzYPm0jHSxwAZ6UM= +github.com/flyteorg/flyteidl v1.3.13/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 849874e80..f335cc7ea 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -151,6 +151,15 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode EventVersion: input.Request.Event.EventVersion, } + if len(input.Request.Event.Reason) > 0 { + closure.Reasons = []*admin.Reason{ + &admin.Reason{ + OccurredAt: input.Request.Event.OccurredAt, + Message: input.Request.Event.Reason, + }, + } + } + eventPhase := input.Request.Event.Phase // Different tasks may report different phases as their first event. @@ -362,6 +371,18 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionClosure.UpdatedAt = request.Event.OccurredAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reason) > 0 { + if taskExecutionClosure.Reason != request.Event.Reason { + // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where + // a task reports a large number of unique reasons. if this size increase becomes problematic we this logic + // will need to be revisited. + taskExecutionClosure.Reasons = append( + taskExecutionClosure.Reasons, + &admin.Reason{ + OccurredAt: request.Event.OccurredAt, + Message: request.Event.Reason, + }) + } + taskExecutionClosure.Reason = request.Event.Reason } if existingTaskPhase != core.TaskExecution_RUNNING.String() && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() { diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index 0ce13cc94..da7af92a2 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -255,7 +255,13 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { CreatedAt: taskEventOccurredAtProto, UpdatedAt: taskEventOccurredAtProto, Reason: "Task was scheduled", - TaskType: "sidecar", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + TaskType: "sidecar", } expectedClosureBytes, err := proto.Marshal(expectedClosure) @@ -338,6 +344,8 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) { CustomInfo: &customInfo, } + t.Logf("expected %+v %+v\n", expectedClosure.Reason, expectedClosure.Reasons) + expectedClosureBytes, err := proto.Marshal(expectedClosure) assert.Nil(t, err) @@ -386,6 +394,13 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { CustomInfo: transformMapToStructPB(t, map[string]string{ "key1": "value1", }), + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, } closureBytes, err := proto.Marshal(existingClosure) @@ -481,6 +496,16 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { "key1": "value1 updated", }), Reason: "task failed", + Reasons: []*admin.Reason{ + &admin.Reason{ + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + &admin.Reason{ + OccurredAt: occuredAtProto, + Message: "task failed", + }, + }, } expectedClosureBytes, err := proto.Marshal(expectedClosure)