From d1ebaaea3b2025d78ce7c34e6881fa140143bb06 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Thu, 6 Oct 2022 08:51:20 -0700 Subject: [PATCH] Save CheckpointUri in NodeExecution.Closure (#479) * Save CheckpointUri in NodeExecution.Closure Signed-off-by: Andrew Dye * Test failures Signed-off-by: Andrew Dye Signed-off-by: Andrew Dye --- pkg/manager/impl/node_execution_manager_test.go | 8 ++++++++ pkg/repositories/transformers/node_execution.go | 13 ++++++++----- .../transformers/node_execution_test.go | 6 ++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 477bdcb16..1b47f16bf 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -197,6 +197,9 @@ func TestCreateNodeEvent_Update(t *testing.T) { StartedAt: occurredAtProto, Phase: core.NodeExecution_RUNNING, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{}, + }, } expectedClosureBytes, _ := proto.Marshal(&expectedClosure) actualClosure := admin.NodeExecutionClosure{} @@ -541,6 +544,11 @@ func TestGetNodeExecution(t *testing.T) { repository := repositoryMocks.NewMockRepository() expectedClosure := admin.NodeExecutionClosure{ Phase: core.NodeExecution_SUCCEEDED, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CheckpointUri: "last checkpoint uri", + }, + }, } expectedMetadata := admin.NodeExecutionMetaData{ SpecNodeId: "spec_node_id", diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index c56a1c028..2029f69f7 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -211,16 +211,19 @@ func UpdateNodeExecutionModel( } // Update TaskNodeMetadata, which includes caching information today. - if request.Event.GetTaskNodeMetadata() != nil && request.Event.GetTaskNodeMetadata().CatalogKey != nil { - st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + if request.Event.GetTaskNodeMetadata() != nil { targetMetadata := &admin.NodeExecutionClosure_TaskNodeMetadata{ TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: request.Event.GetTaskNodeMetadata().GetCacheStatus(), - CatalogKey: request.Event.GetTaskNodeMetadata().GetCatalogKey(), + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, }, } + if request.Event.GetTaskNodeMetadata().CatalogKey != nil { + st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + targetMetadata.TaskNodeMetadata.CacheStatus = request.Event.GetTaskNodeMetadata().GetCacheStatus() + targetMetadata.TaskNodeMetadata.CatalogKey = request.Event.GetTaskNodeMetadata().GetCatalogKey() + nodeExecutionModel.CacheStatus = &st + } nodeExecutionClosure.TargetMetadata = targetMetadata - nodeExecutionModel.CacheStatus = &st } marshaledClosure, err := proto.Marshal(&nodeExecutionClosure) diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index de755fe2a..a06d762c7 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -328,6 +328,7 @@ func TestUpdateNodeExecutionModel(t *testing.T) { }, }, }, + CheckpointUri: "last checkpoint uri", }, }, }, @@ -351,8 +352,9 @@ func TestUpdateNodeExecutionModel(t *testing.T) { UpdatedAt: occurredAtProto, TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, - CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, + CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, }, }, }