diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index f9e626ebf..de0d986af 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -344,9 +344,8 @@ func FromExecutionModel(ctx context.Context, executionModel models.Execution, op } if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] - } + trimmedErrMessage := TrimErrorMessage(trimmedErrOutputResult.GetMessage()) + trimmedErrOutputResult.Message = trimmedErrMessage closure.OutputResult = &admin.ExecutionClosure_Error{ Error: trimmedErrOutputResult, } @@ -409,3 +408,11 @@ func FromExecutionModels(ctx context.Context, executionModels []models.Execution } return executions, nil } + +// TrimErrorMessage return the smallest possible trimmed error message >= trimmedErrMessageLen bytes in length that still forms a valid utf-8 string +func TrimErrorMessage(errMsg string) string { + if len(errMsg) <= trimmedErrMessageLen { + return errMsg + } + return strings.ToValidUTF8(errMsg[:trimmedErrMessageLen], "") +} diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index 43073deb4..215d700c2 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" "time" + "unicode/utf8" "github.com/flyteorg/flyteadmin/pkg/common" @@ -602,6 +603,30 @@ func TestFromExecutionModel_Error(t *testing.T) { assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError())) } +func TestFromExecutionModel_ValidUTF8TrimmedErrorMsg(t *testing.T) { + errMsg := "[1/1] currentAttempt done. Last Error: USER:: │\n│ ❱ 760 │ │ │ │ return __callback(*args, **kwargs) │\n││" + + executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_FAILED, + OutputResult: &admin.ExecutionClosure_Error{Error: &core.ExecutionError{Message: errMsg}}, + }) + executionModel := models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Phase: core.WorkflowExecution_FAILED.String(), + Closure: executionClosureBytes, + } + execution, err := FromExecutionModel(context.TODO(), executionModel, &ExecutionTransformerOptions{ + TrimErrorMessage: true, + }) + assert.NoError(t, err) + errMsgAreValidUTF8 := utf8.Valid([]byte(execution.GetClosure().GetError().GetMessage())) + assert.True(t, errMsgAreValidUTF8) +} + func TestFromExecutionModel_OverwriteNamespace(t *testing.T) { abortCause := "abort cause" executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ @@ -875,3 +900,10 @@ func TestUpdateExecutionModelStateChangeDetails(t *testing.T) { assert.False(t, strings.Contains(err.Error(), "Failed to unmarshal execution closure")) }) } + +func TestTrimErrorMessage(t *testing.T) { + errMsg := "[1/1] currentAttempt done. Last Error: USER:: │\n│ ❱ 760 │ │ │ │ return __callback(*args, **kwargs) │\n││" + trimmedErrMessage := TrimErrorMessage(errMsg) + errMsgAreValidUTF8 := utf8.Valid([]byte(trimmedErrMessage)) + assert.True(t, errMsgAreValidUTF8) +} diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index f1d90361f..7eb57a70b 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -321,11 +321,11 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *Execu if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") } + if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] - } + trimmedErrMessage := TrimErrorMessage(trimmedErrOutputResult.GetMessage()) + trimmedErrOutputResult.Message = trimmedErrMessage closure.OutputResult = &admin.NodeExecutionClosure_Error{ Error: trimmedErrOutputResult, } diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index f57f4b6b3..edfc32b19 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -444,9 +444,8 @@ func FromTaskExecutionModel(taskExecutionModel models.TaskExecution, opts *Execu } if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { trimmedErrOutputResult := closure.GetError() - if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen { - trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] - } + trimmedErrMessage := TrimErrorMessage(trimmedErrOutputResult.GetMessage()) + trimmedErrOutputResult.Message = trimmedErrMessage closure.OutputResult = &admin.TaskExecutionClosure_Error{ Error: trimmedErrOutputResult, }