diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index a2ac00ce6..e166fa217 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request } sourceExecutionID = sourceExecutionModel.ID requestSpec.Metadata.Principal = sourceExecutionModel.User - sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel) + sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err) return parentNodeExecutionID, sourceExecutionID, err @@ -951,7 +951,7 @@ func (m *ExecutionManager) RelaunchExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err) return nil, err } - existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel) + existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { return nil, err } @@ -1008,7 +1008,7 @@ func (m *ExecutionManager) RecoverExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err) return nil, err } - existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel) + existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { return nil, err } @@ -1059,7 +1059,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics( return } // Find the reference launch plan to get the kickoff time argument - execution, err := transformers.FromExecutionModel(*executionModel) + execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Warningf(context.Background(), "failed to transform execution model when emitting scheduled workflow execution stats with for "+ @@ -1302,7 +1302,7 @@ func (m *ExecutionManager) GetExecution( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - execution, transformerErr := transformers.FromExecutionModel(*executionModel) + execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) if transformerErr != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, transformerErr) @@ -1345,7 +1345,7 @@ func (m *ExecutionManager) GetExecutionData( logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - execution, err := transformers.FromExecutionModel(*executionModel) + execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err) return nil, err @@ -1445,7 +1445,7 @@ func (m *ExecutionManager) ListExecutions( logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err) return nil, err } - executionList, err := transformers.FromExecutionModels(output.Executions) + executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions) if err != nil { logger.Errorf(ctx, "Failed to transform execution models [%+v] with err: %v", output.Executions, err) @@ -1475,7 +1475,7 @@ func (m *ExecutionManager) ListExecutions( func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest, execution models.Execution) error { // Notifications are stored in the Spec object of an admin.Execution object. - adminExecution, err := transformers.FromExecutionModel(execution) + adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions) if err != nil { // This shouldn't happen because execution manager marshaled the data into models.Execution. m.systemMetrics.TransformerError.Inc() diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index ca6d71e03..53a0a3665 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -308,7 +308,7 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi // Handles making additional database calls, if necessary, to populate IsParent & IsDynamic data using the historical pattern of // preloading child node executions. Otherwise, simply calls transform on the input model. func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, nodeExecutionModel models.NodeExecution, - nodeExecutionID *core.NodeExecutionIdentifier) (*admin.NodeExecution, error) { + nodeExecutionID *core.NodeExecutionIdentifier, opts *transformers.ExecutionTransformerOptions) (*admin.NodeExecution, error) { internalData, err := transformers.GetNodeExecutionInternalData(nodeExecutionModel.InternalData) if err != nil { return nil, err @@ -323,7 +323,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, } } - nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel) + nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel, opts) if err != nil { logger.Debugf(ctx, "failed to transform node execution model [%+v] to proto with err: %v", nodeExecutionID, err) return nil, err @@ -341,7 +341,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte Name: nodeExecutionModel.Name, }, NodeId: nodeExecutionModel.NodeID, - }) + }, transformers.ListExecutionTransformerOptions) if err != nil { return nil, err } @@ -362,7 +362,7 @@ func (m *NodeExecutionManager) GetNodeExecution( request.Id, err) return nil, err } - nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id) + nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id, nil) if err != nil { return nil, err } @@ -498,7 +498,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData( return nil, err } - nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel) + nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err) return nil, err diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 039ae2f66..c6b4edb27 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/flyteorg/flyteadmin/pkg/repositories/transformers" + "github.com/flyteorg/flyteadmin/pkg/manager/impl/util" genModel "github.com/flyteorg/flyteadmin/pkg/repositories/gen/models" @@ -444,7 +446,7 @@ func TestTransformNodeExecutionModel(t *testing.T) { manager := NodeExecutionManager{ db: repository, } - nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID) + nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions) assert.NoError(t, err) assert.True(t, proto.Equal(nodeExecID, nodeExecution.Id)) assert.True(t, nodeExecution.Metadata.IsParentNode) @@ -474,7 +476,7 @@ func TestTransformNodeExecutionModel(t *testing.T) { Closure: closureBytes, NodeExecutionMetadata: nodeExecutionMetadataBytes, InternalData: internalDataBytes, - }, nodeExecID) + }, nodeExecID, transformers.DefaultExecutionTransformerOptions) assert.NoError(t, err) assert.True(t, nodeExecution.Metadata.IsParentNode) assert.True(t, nodeExecution.Metadata.IsDynamic) @@ -485,7 +487,7 @@ func TestTransformNodeExecutionModel(t *testing.T) { } _, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{ InternalData: []byte("i'm invalid"), - }, nodeExecID) + }, nodeExecID, transformers.DefaultExecutionTransformerOptions) assert.NotNil(t, err) assert.Equal(t, err.(flyteAdminErrors.FlyteAdminError).Code(), codes.Internal) }) @@ -500,7 +502,7 @@ func TestTransformNodeExecutionModel(t *testing.T) { manager := NodeExecutionManager{ db: repository, } - _, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID) + _, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions) assert.Equal(t, err, expectedErr) }) } diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index db88b77c8..a870e7d05 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -233,7 +233,7 @@ func (m *TaskExecutionManager) GetTaskExecution( if err != nil { return nil, err } - taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel) + taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { logger.Debugf(ctx, "Failed to transform task execution model [%+v] to proto: %v", request.Id, err) return nil, err @@ -284,7 +284,7 @@ func (m *TaskExecutionManager) ListTaskExecutions( return nil, err } - taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions) + taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.ListExecutionTransformerOptions) if err != nil { logger.Debugf(ctx, "failed to transform task execution models for request [%+v] with err: %v", request, err) return nil, err diff --git a/pkg/repositories/transformers/execution.go b/pkg/repositories/transformers/execution.go index 7bc708969..5362e088d 100644 --- a/pkg/repositories/transformers/execution.go +++ b/pkg/repositories/transformers/execution.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) +const trimmedErrMessageLen = 100 + var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String()) // CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel. @@ -44,6 +46,15 @@ type CreateExecutionModelInput struct { LaunchEntity core.ResourceType } +type ExecutionTransformerOptions struct { + TrimErrorMessage bool +} + +var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{} +var ListExecutionTransformerOptions = &ExecutionTransformerOptions{ + TrimErrorMessage: true, +} + // CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) { requestSpec := input.RequestSpec @@ -305,7 +316,7 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu } } -func FromExecutionModel(executionModel models.Execution) (*admin.Execution, error) { +func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) { var spec admin.ExecutionSpec var err error if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil { @@ -315,6 +326,13 @@ func FromExecutionModel(executionModel models.Execution) (*admin.Execution, erro if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") } + if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { + trimmedErrOutputResult := closure.GetError() + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + closure.OutputResult = &admin.ExecutionClosure_Error{ + Error: trimmedErrOutputResult, + } + } if closure.StateChangeDetails == nil { // Update execution state details from model for older executions @@ -362,10 +380,10 @@ func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin. }, nil } -func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) { +func FromExecutionModels(executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) { executions := make([]*admin.Execution, len(executionModels)) for idx, executionModel := range executionModels { - execution, err := FromExecutionModel(executionModel) + execution, err := FromExecutionModel(executionModel, opts) if err != nil { return nil, err } diff --git a/pkg/repositories/transformers/execution_test.go b/pkg/repositories/transformers/execution_test.go index d45094686..fc42a82cd 100644 --- a/pkg/repositories/transformers/execution_test.go +++ b/pkg/repositories/transformers/execution_test.go @@ -528,7 +528,7 @@ func TestFromExecutionModel(t *testing.T) { StartedAt: &startedAt, State: &stateInt, } - execution, err := FromExecutionModel(executionModel) + execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.Execution{ Id: &core.WorkflowExecutionIdentifier{ @@ -556,7 +556,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) { AbortCause: abortCause, Closure: executionClosureBytes, } - execution, err := FromExecutionModel(executionModel) + execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Equal(t, core.WorkflowExecution_ABORTED, execution.Closure.Phase) assert.True(t, proto.Equal(&admin.AbortMetadata{ @@ -564,11 +564,41 @@ func TestFromExecutionModel_Aborted(t *testing.T) { }, execution.Closure.GetAbortMetadata())) executionModel.Phase = core.WorkflowExecution_RUNNING.String() - execution, err = FromExecutionModel(executionModel) + execution, err = FromExecutionModel(executionModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Empty(t, execution.Closure.GetAbortCause()) } +func TestFromExecutionModel_Error(t *testing.T) { + extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) + execErr := &core.ExecutionError{ + Code: "CODE", + Message: extraLongErrMsg, + Kind: core.ExecutionError_USER, + } + executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_FAILED, + OutputResult: &admin.ExecutionClosure_Error{Error: execErr}, + }) + executionModel := models.Execution{ + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Phase: core.WorkflowExecution_FAILED.String(), + Closure: executionClosureBytes, + } + execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{ + TrimErrorMessage: true, + }) + expectedExecErr := execErr + expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen)) + assert.Nil(t, err) + assert.Equal(t, core.WorkflowExecution_FAILED, execution.Closure.Phase) + assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError())) +} + func TestFromExecutionModels(t *testing.T) { spec := testutils.GetExecutionRequest().Spec specBytes, _ := proto.Marshal(spec) @@ -611,7 +641,7 @@ func TestFromExecutionModels(t *testing.T) { State: &stateInt, }, } - executions, err := FromExecutionModels(executionModels) + executions, err := FromExecutionModels(executionModels, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Len(t, executions, 1) assert.True(t, proto.Equal(&admin.Execution{ diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 83f9acbaf..093e70d3d 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -278,12 +278,19 @@ func UpdateNodeExecutionModel( return nil } -func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.NodeExecution, error) { +func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *ExecutionTransformerOptions) (*admin.NodeExecution, error) { var closure admin.NodeExecutionClosure err := proto.Unmarshal(nodeExecutionModel.Closure, &closure) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") } + if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { + trimmedErrOutputResult := closure.GetError() + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + closure.OutputResult = &admin.NodeExecutionClosure_Error{ + Error: trimmedErrOutputResult, + } + } var nodeExecutionMetadata admin.NodeExecutionMetaData err = proto.Unmarshal(nodeExecutionModel.NodeExecutionMetadata, &nodeExecutionMetadata) diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index 45a3f47bd..32b2bc899 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -490,7 +490,7 @@ func TestFromNodeExecutionModel(t *testing.T) { NodeExecutionMetadata: nodeExecutionMetadataBytes, InputURI: "input uri", Duration: duration, - }) + }, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.NodeExecution{ Id: &nodeExecutionIdentifier, @@ -500,6 +500,39 @@ func TestFromNodeExecutionModel(t *testing.T) { }, nodeExecution)) } +func TestFromNodeExecutionModel_Error(t *testing.T) { + extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) + execErr := &core.ExecutionError{ + Code: "CODE", + Message: extraLongErrMsg, + Kind: core.ExecutionError_USER, + } + executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_FAILED, + OutputResult: &admin.ExecutionClosure_Error{Error: execErr}, + }) + nodeExecution, err := FromNodeExecutionModel(models.NodeExecution{ + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "nodey", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + Closure: executionClosureBytes, + NodeExecutionMetadata: nodeExecutionMetadataBytes, + InputURI: "input uri", + Duration: duration, + }, &ExecutionTransformerOptions{TrimErrorMessage: true}) + assert.Nil(t, err) + + expectedExecErr := execErr + expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen)) + assert.Nil(t, err) + assert.True(t, proto.Equal(expectedExecErr, nodeExecution.Closure.GetError())) +} + func TestFromNodeExecutionModelWithChildren(t *testing.T) { nodeExecutionIdentifier := core.NodeExecutionIdentifier{ NodeId: "nodey", @@ -536,7 +569,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) { } t.Run("dynamic workflow", func(t *testing.T) { nodeExecModel.DynamicWorkflowRemoteClosureReference = "dummy_dynamic_worklfow_ref" - nodeExecution, err := FromNodeExecutionModel(nodeExecModel) + nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.NodeExecution{ Id: &nodeExecutionIdentifier, @@ -552,7 +585,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) { }) t.Run("non dynamic workflow", func(t *testing.T) { nodeExecModel.DynamicWorkflowRemoteClosureReference = "" - nodeExecution, err := FromNodeExecutionModel(nodeExecModel) + nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.NodeExecution{ Id: &nodeExecutionIdentifier, diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index 0a09f0828..92c066413 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -399,12 +399,19 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE return nil } -func FromTaskExecutionModel(taskExecutionModel models.TaskExecution) (*admin.TaskExecution, error) { +func FromTaskExecutionModel(taskExecutionModel models.TaskExecution, opts *ExecutionTransformerOptions) (*admin.TaskExecution, error) { var closure admin.TaskExecutionClosure err := proto.Unmarshal(taskExecutionModel.Closure, &closure) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") } + if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 { + trimmedErrOutputResult := closure.GetError() + trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen] + closure.OutputResult = &admin.TaskExecutionClosure_Error{ + Error: trimmedErrOutputResult, + } + } taskExecution := &admin.TaskExecution{ Id: &core.TaskExecutionIdentifier{ @@ -435,10 +442,10 @@ func FromTaskExecutionModel(taskExecutionModel models.TaskExecution) (*admin.Tas return taskExecution, nil } -func FromTaskExecutionModels(taskExecutionModels []models.TaskExecution) ([]*admin.TaskExecution, error) { +func FromTaskExecutionModels(taskExecutionModels []models.TaskExecution, opts *ExecutionTransformerOptions) ([]*admin.TaskExecution, error) { taskExecutions := make([]*admin.TaskExecution, len(taskExecutionModels)) for idx, taskExecutionModel := range taskExecutionModels { - taskExecution, err := FromTaskExecutionModel(taskExecutionModel) + taskExecution, err := FromTaskExecutionModel(taskExecutionModel, opts) if err != nil { return nil, err } diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index a5c07fc47..19ad2d051 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -548,7 +548,7 @@ func TestFromTaskExecutionModel(t *testing.T) { InputURI: "input uri", Duration: duration, Closure: closureBytes, - }) + }, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.TaskExecution{ Id: &core.TaskExecutionIdentifier{ @@ -574,6 +574,48 @@ func TestFromTaskExecutionModel(t *testing.T) { }, taskExecution)) } +func TestFromTaskExecutionModel_Error(t *testing.T) { + extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen)) + execErr := &core.ExecutionError{ + Code: "CODE", + Message: extraLongErrMsg, + Kind: core.ExecutionError_USER, + } + closureBytes, _ := proto.Marshal(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_FAILED, + OutputResult: &admin.ExecutionClosure_Error{Error: execErr}, + }) + taskExecution, err := FromTaskExecutionModel(models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "node id", + ExecutionKey: models.ExecutionKey{ + Project: "ex project", + Domain: "ex domain", + Name: "ex name", + }, + }, + RetryAttempt: &retryAttemptValue, + }, + InputURI: "input uri", + Duration: duration, + Closure: closureBytes, + }, &ExecutionTransformerOptions{ + TrimErrorMessage: true, + }) + + expectedExecErr := execErr + expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen)) + assert.Nil(t, err) + assert.True(t, proto.Equal(expectedExecErr, taskExecution.Closure.GetError())) +} + func TestFromTaskExecutionModels(t *testing.T) { taskClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING, @@ -609,7 +651,7 @@ func TestFromTaskExecutionModels(t *testing.T) { Duration: duration, Closure: closureBytes, }, - }) + }, DefaultExecutionTransformerOptions) assert.Nil(t, err) assert.Len(t, taskExecutions, 1) assert.True(t, proto.Equal(&admin.TaskExecution{