Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
More data in workflow exists err msg #2522 (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerempy authored Oct 28, 2022
1 parent c80bd7c commit 1817f38
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.1.21
github.com/flyteorg/flyteidl v1.2.0
github.com/flyteorg/flyteplugins v1.0.10
github.com/flyteorg/flytepropeller v1.1.28
github.com/flyteorg/flytestdlib v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.21 h1:09/xqYWFdUA22bVWKLjkSzhhSJfaJmDAraczpJ/Yiis=
github.com/flyteorg/flyteidl v1.1.21/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteidl v1.2.0 h1:snJPpc5a5Gr4GXYiAMX6Io1edT91ZxN/7oE6uhydrvk=
github.com/flyteorg/flyteidl v1.2.0/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
github.com/flyteorg/flyteplugins v1.0.10/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84=
github.com/flyteorg/flytepropeller v1.1.28 h1:68qQ0QRHoCzagF0oifkW/c4A1L4B4LdgyHCPLKMiY2g=
Expand Down
38 changes: 35 additions & 3 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -16,10 +17,9 @@ type FlyteAdminError interface {
Error() string
Code() codes.Code
GRPCStatus() *status.Status
WithDetails(details *admin.EventFailureReason) (FlyteAdminError, error)
WithDetails(details proto.Message) (FlyteAdminError, error)
String() string
}

type flyteAdminErrorImpl struct {
status *status.Status
}
Expand All @@ -43,7 +43,7 @@ func (e *flyteAdminErrorImpl) String() string {
// enclose the error in the format that grpc server expect from golang:
//
// https://github.com/grpc/grpc-go/blob/master/status/status.go#L133
func (e *flyteAdminErrorImpl) WithDetails(details *admin.EventFailureReason) (FlyteAdminError, error) {
func (e *flyteAdminErrorImpl) WithDetails(details proto.Message) (FlyteAdminError, error) {
s, err := e.status.WithDetails(details)
if err != nil {
return nil, err
Expand Down Expand Up @@ -107,3 +107,35 @@ func NewIncompatibleClusterError(ctx context.Context, errorMsg, curCluster strin
}
return statusErr
}

func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest) FlyteAdminError {
errorMsg := "workflow with different structure already exists"
statusErr, transformationErr := NewFlyteAdminError(codes.InvalidArgument, errorMsg).WithDetails(&admin.CreateWorkflowFailureReason{
Reason: &admin.CreateWorkflowFailureReason_ExistsDifferentStructure{
ExistsDifferentStructure: &admin.WorkflowErrorExistsDifferentStructure{
Id: request.Id,
},
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)
}
return statusErr
}

func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admin.WorkflowCreateRequest) FlyteAdminError {
errorMsg := "workflow with identical structure already exists"
statusErr, transformationErr := NewFlyteAdminError(codes.AlreadyExists, errorMsg).WithDetails(&admin.CreateWorkflowFailureReason{
Reason: &admin.CreateWorkflowFailureReason_ExistsIdenticalStructure{
ExistsIdenticalStructure: &admin.WorkflowErrorExistsIdenticalStructure{
Id: request.Id,
},
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}
return statusErr
}
47 changes: 47 additions & 0 deletions pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/status"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -43,3 +44,49 @@ func TestNewIncompatibleClusterError(t *testing.T) {
_, ok = details.GetReason().(*admin.EventFailureReason_IncompatibleCluster)
assert.True(t, ok)
}

func TestNewWorkflowExistsDifferentStructureError(t *testing.T) {
wf := &admin.WorkflowCreateRequest{
Id: &core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "testProj",
Domain: "domain",
Name: "name",
Version: "ver",
},
}
statusErr := NewWorkflowExistsDifferentStructureError(context.Background(), wf)
assert.NotNil(t, statusErr)
s, ok := status.FromError(statusErr)
assert.True(t, ok)
assert.Equal(t, codes.InvalidArgument, s.Code())
assert.Equal(t, "workflow with different structure already exists", s.Message())

details, ok := s.Details()[0].(*admin.CreateWorkflowFailureReason)
assert.True(t, ok)
_, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsDifferentStructure)
assert.True(t, ok)
}

func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) {
wf := &admin.WorkflowCreateRequest{
Id: &core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "testProj",
Domain: "domain",
Name: "name",
Version: "ver",
},
}
statusErr := NewWorkflowExistsIdenticalStructureError(context.Background(), wf)
assert.NotNil(t, statusErr)
s, ok := status.FromError(statusErr)
assert.True(t, ok)
assert.Equal(t, codes.AlreadyExists, s.Code())
assert.Equal(t, "workflow with identical structure already exists", s.Message())

details, ok := s.Details()[0].(*admin.CreateWorkflowFailureReason)
assert.True(t, ok)
_, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsIdenticalStructure)
assert.True(t, ok)
}
11 changes: 5 additions & 6 deletions pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,15 @@ func (w *WorkflowManager) CreateWorkflow(
}

// Assert that a matching workflow doesn't already exist before uploading the workflow closure.
existingMatchingWorkflow, err := util.GetWorkflowModel(ctx, w.db, *request.Id)
existingWorkflowModel, err := util.GetWorkflowModel(ctx, w.db, *request.Id)
// Check that no identical or conflicting workflows exist.
if err == nil {
// A workflow's structure is uniquely defined by its collection of nodes.
if bytes.Equal(workflowDigest, existingMatchingWorkflow.Digest) {
return nil, errors.NewFlyteAdminErrorf(
codes.AlreadyExists, "identical workflow already exists with id %v", request.Id)
if bytes.Equal(workflowDigest, existingWorkflowModel.Digest) {
return nil, errors.NewWorkflowExistsIdenticalStructureError(ctx, &request)
}
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"workflow with different structure already exists with id %v", request.Id)
// A workflow exists with different structure
return nil, errors.NewWorkflowExistsDifferentStructureError(ctx, &request)
} else if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Debugf(ctx, "Failed to get workflow for comparison in CreateWorkflow with ID [%+v] with err %v",
request.Id, err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/golang/protobuf/proto"

flyteErrors "github.com/flyteorg/flyteadmin/pkg/errors"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
runtimeMocks "github.com/flyteorg/flyteadmin/pkg/runtime/mocks"
workflowengineInterfaces "github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces"
Expand Down Expand Up @@ -177,12 +178,11 @@ func TestCreateWorkflow_ExistingWorkflow(t *testing.T) {
getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), mockStorageClient, storagePrefix, mockScope.NewTestScope())
request := testutils.GetWorkflowRequest()
response, err := workflowManager.CreateWorkflow(context.Background(), request)
assert.EqualError(t, err, "workflow with different structure already exists with id "+
"resource_type:WORKFLOW project:\"project\" domain:\"domain\" name:\"name\" version:\"version\" ")
assert.EqualError(t, err, "workflow with different structure already exists")
assert.Nil(t, response)
}

func TestCreateWorkflow_ExistingWorkflow_NotIdentical(t *testing.T) {
func TestCreateWorkflow_ExistingWorkflow_Different(t *testing.T) {
mockStorageClient := commonMocks.GetMockStorageClient()

mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb =
Expand All @@ -196,8 +196,9 @@ func TestCreateWorkflow_ExistingWorkflow_NotIdentical(t *testing.T) {

request := testutils.GetWorkflowRequest()
response, err := workflowManager.CreateWorkflow(context.Background(), request)
assert.EqualError(t, err, "workflow with different structure already exists with id "+
"resource_type:WORKFLOW project:\"project\" domain:\"domain\" name:\"name\" version:\"version\" ")
assert.EqualError(t, err, "workflow with different structure already exists")
flyteErr := err.(flyteErrors.FlyteAdminError)
assert.Equal(t, codes.InvalidArgument, flyteErr.Code())
assert.Nil(t, response)
}

Expand Down

0 comments on commit 1817f38

Please sign in to comment.