Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Remove NodeExecutionRepo.ListEvents (#473)
Browse files Browse the repository at this point in the history
* Remove NodeExecutionRepo.ListEvents

Signed-off-by: Andrew Dye <[email protected]>

* Lints

Signed-off-by: Andrew Dye <[email protected]>

Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
andrewwdye authored Sep 7, 2022
1 parent f94fd01 commit 72238ad
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 127 deletions.
34 changes: 0 additions & 34 deletions pkg/repositories/gormimpl/node_execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,40 +142,6 @@ func (r *NodeExecutionRepo) List(ctx context.Context, input interfaces.ListResou
}, nil
}

func (r *NodeExecutionRepo) ListEvents(
ctx context.Context, input interfaces.ListResourceInput) (interfaces.NodeExecutionEventCollectionOutput, error) {
// First validate input.
if err := ValidateListInput(input); err != nil {
return interfaces.NodeExecutionEventCollectionOutput{}, err
}
var nodeExecutionEvents []models.NodeExecutionEvent
tx := r.db.Limit(input.Limit).Offset(input.Offset)
// And add join condition (joining multiple tables is fine even we only filter on a subset of table attributes).
// (this query isn't called for deletes).
tx = tx.Joins(innerJoinNodeExecToNodeEvents)
tx = tx.Joins(innerJoinExecToNodeExec)

// Apply filters
tx, err := applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
if err != nil {
return interfaces.NodeExecutionEventCollectionOutput{}, err
}
// Apply sort ordering.
if input.SortParameter != nil {
tx = tx.Order(input.SortParameter.GetGormOrderExpr())
}

timer := r.metrics.ListDuration.Start()
tx = tx.Find(&nodeExecutionEvents)
timer.Stop()
if tx.Error != nil {
return interfaces.NodeExecutionEventCollectionOutput{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
}
return interfaces.NodeExecutionEventCollectionOutput{
NodeExecutionEvents: nodeExecutionEvents,
}, nil
}

func (r *NodeExecutionRepo) Exists(ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
var nodeExecution models.NodeExecution
timer := r.metrics.ExistsDuration.Start()
Expand Down
76 changes: 0 additions & 76 deletions pkg/repositories/gormimpl/node_execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,82 +333,6 @@ func TestListNodeExecutionsForExecution(t *testing.T) {
}
}

func getMockNodeExecutionEventResponseFromDb(expected models.NodeExecutionEvent) map[string]interface{} {
nodeExecutionEvent := make(map[string]interface{})
nodeExecutionEvent["execution_project"] = expected.ExecutionKey.Project
nodeExecutionEvent["execution_domain"] = expected.ExecutionKey.Domain
nodeExecutionEvent["execution_name"] = expected.ExecutionKey.Name
nodeExecutionEvent["node_id"] = expected.NodeExecutionKey.NodeID
nodeExecutionEvent["request_id"] = expected.RequestID
nodeExecutionEvent["phase"] = expected.Phase
nodeExecutionEvent["occurred_at"] = expected.OccurredAt
return nodeExecutionEvent
}

func TestListNodeExecutionEventsForNodeExecutionAndExecution(t *testing.T) {
nodeExecutionRepo := NewNodeExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())

nodeExecutions := make([]map[string]interface{}, 0)
nodeExecution := getMockNodeExecutionEventResponseFromDb(models.NodeExecutionEvent{
NodeExecutionKey: models.NodeExecutionKey{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "1",
},
},
RequestID: "1",
Phase: nodePhase,
OccurredAt: nodeStartedAt,
})
nodeExecutions = append(nodeExecutions, nodeExecution)

GlobalMock := mocket.Catcher.Reset()
query := `SELECT "node_execution_events"."id","node_execution_events"."created_at","node_execution_events"."updated_at","node_execution_events"."deleted_at","node_execution_events"."execution_project","node_execution_events"."execution_domain","node_execution_events"."execution_name","node_execution_events"."node_id","node_execution_events"."request_id","node_execution_events"."occurred_at","node_execution_events"."phase" FROM "node_execution_events" INNER JOIN node_executions ON node_event_executions.node_execution_id = node_executions.id INNER JOIN executions ON node_executions.execution_project = executions.execution_project AND node_executions.execution_domain = executions.execution_domain AND node_executions.execution_name = executions.execution_name WHERE node_executions.execution_id = $1 AND node_executions.node_id = $2 AND node_execution_events.request_id = $3 AND executions.execution_project = $4 AND executions.execution_domain = $5 AND executions.execution_name = $6 LIMIT 20`
GlobalMock.NewMock().WithQuery(query).WithReply(nodeExecutions)

collection, err := nodeExecutionRepo.ListEvents(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.NodeExecution, "execution_id", uint(1)),
getEqualityFilter(common.NodeExecution, "node_id", uint(2)),
getEqualityFilter(common.NodeExecutionEvent, "request_id", "1"),
getEqualityFilter(common.Execution, "project", project),
getEqualityFilter(common.Execution, "domain", domain),
getEqualityFilter(common.Execution, "name", name),
},
Limit: 20,
})
assert.NoError(t, err)
assert.NotEmpty(t, collection)
assert.NotEmpty(t, collection.NodeExecutionEvents)
assert.Len(t, collection.NodeExecutionEvents, 1)
for _, event := range collection.NodeExecutionEvents {
assert.Equal(t, "1", event.RequestID)
assert.Equal(t, models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "1",
}, event.ExecutionKey)
assert.Equal(t, nodePhase, event.Phase)
assert.Equal(t, nodeStartedAt, event.OccurredAt)
}
}

func TestListNodeExecutionEvents_MissingParameters(t *testing.T) {
nodeExecutionRepo := NewNodeExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
_, err := nodeExecutionRepo.ListEvents(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
getEqualityFilter(common.NodeExecution, "node_id", "1234"),
},
})
assert.EqualError(t, err, "missing and/or invalid parameters: limit")

_, err = nodeExecutionRepo.List(context.Background(), interfaces.ListResourceInput{
Limit: 20,
})
assert.EqualError(t, err, "missing and/or invalid parameters: filters")
}

func TestNodeExecutionExists(t *testing.T) {
nodeExecutionRepo := NewNodeExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
id := uint(10)
Expand Down
2 changes: 0 additions & 2 deletions pkg/repositories/interfaces/node_execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ type NodeExecutionRepoInterface interface {
GetWithChildren(ctx context.Context, input NodeExecutionResource) (models.NodeExecution, error)
// List returns node executions matching query parameters. A limit must be provided for the results page size.
List(ctx context.Context, input ListResourceInput) (NodeExecutionCollectionOutput, error)
// ListEvents returns node execution events matching query parameters. A limit must be provided for the results page size.
ListEvents(ctx context.Context, input ListResourceInput) (NodeExecutionEventCollectionOutput, error)
// Exists returns whether a matching execution exists.
Exists(ctx context.Context, input NodeExecutionResource) (bool, error)
// Returns count of node executions matching query parameters.
Expand Down
15 changes: 0 additions & 15 deletions pkg/repositories/mocks/node_execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type UpdateNodeExecutionFunc func(ctx context.Context, nodeExecution *models.Nod
type GetNodeExecutionFunc func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error)
type ListNodeExecutionFunc func(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.NodeExecutionCollectionOutput, error)
type ListNodeExecutionEventFunc func(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.NodeExecutionEventCollectionOutput, error)
type ExistsNodeExecutionFunc func(ctx context.Context, input interfaces.NodeExecutionResource) (bool, error)
type CountNodeExecutionFunc func(ctx context.Context, input interfaces.CountResourceInput) (int64, error)

Expand All @@ -23,7 +21,6 @@ type MockNodeExecutionRepo struct {
getFunction GetNodeExecutionFunc
getWithChildrenFunction GetNodeExecutionFunc
listFunction ListNodeExecutionFunc
listEventFunction ListNodeExecutionEventFunc
existsFunction ExistsNodeExecutionFunc
countFunction CountNodeExecutionFunc
}
Expand Down Expand Up @@ -84,18 +81,6 @@ func (r *MockNodeExecutionRepo) SetListCallback(listFunction ListNodeExecutionFu
r.listFunction = listFunction
}

func (r *MockNodeExecutionRepo) ListEvents(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.NodeExecutionEventCollectionOutput, error) {
if r.listFunction != nil {
return r.listEventFunction(ctx, input)
}
return interfaces.NodeExecutionEventCollectionOutput{}, nil
}

func (r *MockNodeExecutionRepo) SetListEventCallback(listEventFunction ListNodeExecutionEventFunc) {
r.listEventFunction = listEventFunction
}

func (r *MockNodeExecutionRepo) Exists(ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
if r.existsFunction != nil {
return r.existsFunction(ctx, input)
Expand Down

0 comments on commit 72238ad

Please sign in to comment.