Skip to content

Commit

Permalink
Log a bunch more fields to clickhouse Executions (#8036)
Browse files Browse the repository at this point in the history
The code is a bit clunky because I have to put a bunch of data in
`ExecutionAuxiliaryMetadata`, then remove it for most of the code in
`execution_server.PublishOperation`, but keep it around so I can pass it
to `execution_server.recordExecution`. Some alternatives I considered:

1. Read this data from Redis. This still can't get everything, like
`isolation_type`.
2. Pass the data in the gRPC context. Not a fan of this as an API, since
it's very hidden.
3. Use the `Operation.metadata.partial_execution_metadata` field instead
of `Operation.response.result.execution_metadata`. Then the executor
would be passing metadata in 2 different ways, depending on what it
thought the app was doing to do with the data. Doesn't seem great.

I wrote some [notes for
myself](https://docs.google.com/document/d/14pCbYj9ERv4G_8lIngUQcN-9XqDyRRgnRH3sUhOAQEg/edit?tab=t.0)
about these tradeoffs.

## Estimated disk write increase

I queried clickhouse system tables 2 times, separated by about 24 hours,
to find the [daily write throughput for each column in the `Executions`
table](https://docs.google.com/spreadsheets/d/13UGAQ79BYCfnytzuWUyvDa51OZra9T98b-I69jl-ZVw/edit?gid=0#gid=0).
Related findings:

- We write about 8GB per day, which is about 3TB per year.
- Estimated task size fields write about 1.5% of the total. I'm adding
10 of these fields.
- Boolean fields like `success` write about 0.1% of the total. I'm
adding one of these (skip_cache_check).
- Small int32 fields like `stage` write about 0.01% of the total. I'm
adding one of these (execution_priority).
- Low cardinality, small string fields like `branch_name` write about
0.9% of the total. I'm adding one of these (isolation_type).

So in total, I think this change would increase write throughput by 16%,
with almost all of that coming from the task size fields.

## Other fields I would maybe like to add:
- requested timeout and effective timeout
- more info about the platform, like OS and architecture
- info about the executor pool

---------

Co-authored-by: Brandon Duffany <[email protected]>
  • Loading branch information
vanja-p and bduffany authored Dec 13, 2024
1 parent f5ce15e commit d72eaed
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 61 deletions.
2 changes: 2 additions & 0 deletions enterprise/server/remote_execution/execution_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_test(
"//proto:remote_execution_go_proto",
"//proto:resource_go_proto",
"//proto:scheduler_go_proto",
"//proto:stored_invocation_go_proto",
"//server/environment",
"//server/interfaces",
"//server/real_environment",
Expand All @@ -80,6 +81,7 @@ go_test(
"//server/util/prefix",
"//server/util/proto",
"//server/util/status",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
"@com_github_google_go_cmp//cmp",
"@com_github_google_uuid//:uuid",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,19 +327,16 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin
dbErr = status.NotFoundErrorf("Unable to update execution; no execution exists with id %s.", executionID)
}
}

if stage == repb.ExecutionStage_COMPLETED {
if err := s.recordExecution(ctx, executionID, executeResponse.GetResult().GetExecutionMetadata()); err != nil {
log.CtxErrorf(ctx, "failed to record execution %q: %s", executionID, err)
}
}
return dbErr
}

func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata) error {
func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata, auxMeta *espb.ExecutionAuxiliaryMetadata, properties *platform.Properties) error {
if s.env.GetExecutionCollector() == nil || !olapdbconfig.WriteExecutionsToOLAPDBEnabled() {
return nil
}
if s.env.GetDBHandle() == nil {
return status.FailedPreconditionError("database not configured")
}
var executionPrimaryDB tables.Execution

if err := s.env.GetDBHandle().NewQuery(ctx, "execution_server_lookup_execution").Raw(
Expand Down Expand Up @@ -367,6 +364,28 @@ func (s *ExecutionServer) recordExecution(ctx context.Context, executionID strin
executionProto.DiskBytesWritten = md.GetUsageStats().GetCgroupIoStats().GetWbytes()
executionProto.DiskWriteOperations = md.GetUsageStats().GetCgroupIoStats().GetWios()
executionProto.DiskReadOperations = md.GetUsageStats().GetCgroupIoStats().GetRios()

executionProto.EffectiveIsolationType = auxMeta.GetIsolationType()
executionProto.RequestedIsolationType = platform.CoerceContainerType(properties.WorkloadIsolationType)

executionProto.RequestedComputeUnits = properties.EstimatedComputeUnits
executionProto.RequestedMemoryBytes = properties.EstimatedMemoryBytes
executionProto.RequestedMilliCpu = properties.EstimatedMilliCPU
executionProto.RequestedFreeDiskBytes = properties.EstimatedFreeDiskBytes

schedulingMeta := auxMeta.GetSchedulingMetadata()
executionProto.EstimatedFreeDiskBytes = schedulingMeta.GetTaskSize().GetEstimatedFreeDiskBytes()
executionProto.PreviousMeasuredMemoryBytes = schedulingMeta.GetMeasuredTaskSize().GetEstimatedMemoryBytes()
executionProto.PreviousMeasuredMilliCpu = schedulingMeta.GetMeasuredTaskSize().GetEstimatedMilliCpu()
executionProto.PreviousMeasuredFreeDiskBytes = schedulingMeta.GetMeasuredTaskSize().GetEstimatedFreeDiskBytes()
executionProto.PredictedMemoryBytes = schedulingMeta.GetPredictedTaskSize().GetEstimatedMemoryBytes()
executionProto.PredictedMilliCpu = schedulingMeta.GetPredictedTaskSize().GetEstimatedMilliCpu()
executionProto.PredictedFreeDiskBytes = schedulingMeta.GetPredictedTaskSize().GetEstimatedFreeDiskBytes()

request := auxMeta.GetExecuteRequest()
executionProto.SkipCacheLookup = request.GetSkipCacheLookup()
executionProto.ExecutionPriority = request.GetExecutionPolicy().GetPriority()

inv, err := s.env.GetExecutionCollector().GetInvocation(ctx, link.GetInvocationId())
if err != nil {
log.CtxErrorf(ctx, "failed to get invocation %q from ExecutionCollector: %s", link.GetInvocationId(), err)
Expand Down Expand Up @@ -1007,6 +1026,17 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return err
}

response := operation.ExtractExecuteResponse(op)
trimmedResponse := response.CloneVT()
if trimmedResponse.GetResult().GetExecutionMetadata() != nil {
// Auxiliary metadata shouldn't be sent to bazel or saved in
// the action cache.
trimmedResponse.GetResult().GetExecutionMetadata().AuxiliaryMetadata = nil
if err := op.GetResponse().MarshalFrom(trimmedResponse); err != nil {
return status.InternalErrorf("Failed to marshall trimmed response: %s", err)
}
}

mu.Lock()
lastOp = op
taskID = op.GetName()
Expand All @@ -1020,24 +1050,33 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio

log.CtxDebugf(ctx, "PublishOperation: stage: %s", stage)

var response *repb.ExecuteResponse // Only set if stage == COMPLETE
if stage == repb.ExecutionStage_COMPLETED {
response = operation.ExtractExecuteResponse(op)
}
if response != nil { // The execution completed
arn, err := digest.ParseUploadResourceName(taskID)
var auxMeta *espb.ExecutionAuxiliaryMetadata
var properties *platform.Properties
if stage == repb.ExecutionStage_COMPLETED && response != nil {
auxMeta = new(espb.ExecutionAuxiliaryMetadata)
ok, err := rexec.AuxiliaryMetadata(response.GetResult().GetExecutionMetadata(), auxMeta)
if err != nil {
log.CtxWarningf(ctx, "Failed to parse ExecutionAuxiliaryMetadata: %s", err)
} else if !ok {
log.CtxWarningf(ctx, "Failed to find ExecutionAuxiliaryMetadata: %s", err)
}
actionRN, err := digest.ParseUploadResourceName(taskID)
if err != nil {
return status.WrapErrorf(err, "Failed to parse taskID")
}
arn = digest.NewResourceName(arn.GetDigest(), arn.GetInstanceName(), rspb.CacheType_AC, arn.GetDigestFunction())
action, cmd, err := s.fetchActionAndCommand(ctx, arn)
actionRN = digest.NewResourceName(actionRN.GetDigest(), actionRN.GetInstanceName(), rspb.CacheType_AC, actionRN.GetDigestFunction())
action, cmd, err := s.fetchActionAndCommand(ctx, actionRN)
if err != nil {
return status.UnavailableErrorf("Failed to fetch action and command: %s", err)
}
if err := s.cacheActionResult(ctx, arn, response, action); err != nil {
properties, err = platform.ParseProperties(&repb.ExecutionTask{Action: action, Command: cmd, PlatformOverrides: auxMeta.GetPlatformOverrides()})
if err != nil {
log.CtxWarningf(ctx, "Failed to parse platform properties: %s", err)
}
if err := s.cacheActionResult(ctx, actionRN, trimmedResponse, action); err != nil {
return status.UnavailableErrorf("Error uploading action result: %s", err.Error())
}
if err := s.markTaskComplete(ctx, arn, response, action, cmd); err != nil {
if err := s.markTaskComplete(ctx, actionRN, response, action, cmd, properties); err != nil {
// Errors updating the router or recording usage are non-fatal.
log.CtxErrorf(ctx, "Could not update post-completion metadata: %s", err)
}
Expand All @@ -1061,13 +1100,18 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return status.WrapErrorf(err, "failed to update execution %q", taskID)
}
lastWrite = time.Now()
if err := s.recordExecution(ctx, taskID, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil {
log.CtxErrorf(ctx, "failed to record execution %q: %s", taskID, err)
}
return nil
}()
if err != nil {
return err
}

if response != nil {
// TODO(vanja) should this be done when the executor got a
// cache hit?
if err := s.cacheExecuteResponse(ctx, taskID, response); err != nil {
log.CtxErrorf(ctx, "Failed to cache execute response: %s", err)
}
Expand Down Expand Up @@ -1112,7 +1156,7 @@ func (s *ExecutionServer) cacheActionResult(ctx context.Context, actionResourceN

// markTaskComplete contains logic to be run when the task is complete but
// before letting the client know that the task has completed.
func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceName *digest.ResourceName, executeResponse *repb.ExecuteResponse, action *repb.Action, cmd *repb.Command) error {
func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceName *digest.ResourceName, executeResponse *repb.ExecuteResponse, action *repb.Action, cmd *repb.Command, properties *platform.Properties) error {
execErr := gstatus.ErrorProto(executeResponse.GetStatus())
router := s.env.GetTaskRouter()
// Only update the router if a task was actually executed
Expand All @@ -1127,20 +1171,22 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceNa
}

if sizer := s.env.GetTaskSizer(); sizer != nil && execErr == nil && executeResponse.GetResult().GetExitCode() == 0 {
// TODO(vanja) should this be done when the executor got a cache hit?
md := executeResponse.GetResult().GetExecutionMetadata()
if err := sizer.Update(ctx, cmd, md); err != nil {
log.CtxWarningf(ctx, "Failed to update task size: %s", err)
}
}

if err := s.updateUsage(ctx, action, cmd, executeResponse); err != nil {
if err := s.updateUsage(ctx, executeResponse, properties); err != nil {
// TODO(vanja) should this be done when the executor got a cache hit?
log.CtxWarningf(ctx, "Failed to update usage for ExecuteResponse %+v: %s", executeResponse, err)
}

return nil
}

func (s *ExecutionServer) updateUsage(ctx context.Context, action *repb.Action, cmd *repb.Command, executeResponse *repb.ExecuteResponse) error {
func (s *ExecutionServer) updateUsage(ctx context.Context, executeResponse *repb.ExecuteResponse, plat *platform.Properties) error {
ut := s.env.GetUsageTracker()
if ut == nil {
return nil
Expand All @@ -1159,21 +1205,6 @@ func (s *ExecutionServer) updateUsage(ctx context.Context, action *repb.Action,
return err
}

// Fill out an ExecutionTask with enough info to be able to parse the
// effective platform.
task := &repb.ExecutionTask{Action: action, Command: cmd}
md := &espb.ExecutionAuxiliaryMetadata{}
ok, err := rexec.AuxiliaryMetadata(executeResponse.Result.GetExecutionMetadata(), md)
if err != nil {
log.CtxWarningf(ctx, "Failed to parse auxiliary metadata: %s", err)
} else if ok {
task.PlatformOverrides = md.GetPlatformOverrides()
}
plat, err := platform.ParseProperties(task)
if err != nil {
return err
}

pool, err := s.env.GetSchedulerService().GetPoolInfo(ctx, plat.OS, plat.Pool, plat.WorkflowID, plat.PoolType)
if err != nil {
return status.InternalErrorf("failed to determine executor pool: %s", err)
Expand Down
Loading

0 comments on commit d72eaed

Please sign in to comment.