diff --git a/enterprise/server/cmd/executor/executor.go b/enterprise/server/cmd/executor/executor.go index dab76cbf1a1..679f74fecc8 100644 --- a/enterprise/server/cmd/executor/executor.go +++ b/enterprise/server/cmd/executor/executor.go @@ -263,7 +263,6 @@ func main() { log.Fatalf("Error starting task scheduler: %v", err) } - container.Metrics.Start(rootContext) monitoring.StartMonitoringHandler(env, fmt.Sprintf("%s:%d", *listen, *monitoringPort)) // Setup SSL for monitoring endpoints (optional). diff --git a/enterprise/server/remote_execution/container/BUILD b/enterprise/server/remote_execution/container/BUILD index 07d079c4d54..04b05d786bb 100644 --- a/enterprise/server/remote_execution/container/BUILD +++ b/enterprise/server/remote_execution/container/BUILD @@ -15,7 +15,6 @@ go_library( "//proto:remote_execution_go_proto", "//server/environment", "//server/interfaces", - "//server/metrics", "//server/util/alert", "//server/util/authutil", "//server/util/flag", diff --git a/enterprise/server/remote_execution/container/container.go b/enterprise/server/remote_execution/container/container.go index 5faa3318e3f..63232f95855 100644 --- a/enterprise/server/remote_execution/container/container.go +++ b/enterprise/server/remote_execution/container/container.go @@ -15,7 +15,6 @@ import ( "github.com/buildbuddy-io/buildbuddy/enterprise/server/util/oci" "github.com/buildbuddy-io/buildbuddy/server/environment" "github.com/buildbuddy-io/buildbuddy/server/interfaces" - "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/alert" "github.com/buildbuddy-io/buildbuddy/server/util/authutil" "github.com/buildbuddy-io/buildbuddy/server/util/flag" @@ -56,10 +55,6 @@ const ( ) var ( - // Metrics is a shared metrics object to handle proper prometheus metrics - // accounting across container instances. - Metrics = NewContainerMetrics() - // ErrRemoved is returned by TracedCommandContainer operations when an // operation fails due to the container already being removed. ErrRemoved = status.UnavailableError("container has been removed") @@ -124,99 +119,9 @@ type Init struct { Publisher *operation.Publisher } -// ContainerMetrics handles Prometheus metrics accounting for CommandContainer -// instances. -type ContainerMetrics struct { - mu sync.Mutex - // Latest stats observed, per-container. - latest map[CommandContainer]*repb.UsageStats - // CPU usage for the current usage interval, per-container. This is cleared - // every time we update the CPU gauge. - intervalCPUNanos int64 -} - -func NewContainerMetrics() *ContainerMetrics { - return &ContainerMetrics{ - latest: make(map[CommandContainer]*repb.UsageStats), - } -} - -// Start kicks off a goroutine that periodically updates the CPU gauge. -func (m *ContainerMetrics) Start(ctx context.Context) { - go func() { - t := time.NewTicker(cpuUsageUpdateInterval) - defer t.Stop() - lastTick := time.Now() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - tick := time.Now() - m.updateCPUMetric(tick.Sub(lastTick)) - lastTick = tick - } - } - }() -} - -func (m *ContainerMetrics) updateCPUMetric(dt time.Duration) { - m.mu.Lock() - defer m.mu.Unlock() - milliCPU := (float64(m.intervalCPUNanos) / 1e6) / dt.Seconds() - metrics.RemoteExecutionCPUUtilization.Set(milliCPU) - m.intervalCPUNanos = 0 -} - -// Observe records the latest stats for the current container execution. -func (m *ContainerMetrics) Observe(c CommandContainer, s *repb.UsageStats) { - m.mu.Lock() - defer m.mu.Unlock() - if s == nil { - delete(m.latest, c) - } else { - // Before recording CPU usage, sum the previous CPU usage so we know how - // much new usage has been incurred. - var prevCPUNanos int64 - for _, stats := range m.latest { - prevCPUNanos += stats.CpuNanos - } - m.latest[c] = s - var cpuNanos int64 - for _, stats := range m.latest { - cpuNanos += stats.CpuNanos - } - diffCPUNanos := cpuNanos - prevCPUNanos - // Note: This > 0 check is here to avoid panicking in case there are - // issues with process stats returning non-monotonically-increasing - // values for CPU usage. - if diffCPUNanos > 0 { - metrics.RemoteExecutionUsedMilliCPU.Add(float64(diffCPUNanos) / 1e6) - m.intervalCPUNanos += diffCPUNanos - } - } - var totalMemBytes, totalPeakMemBytes int64 - for _, stats := range m.latest { - totalMemBytes += stats.MemoryBytes - totalPeakMemBytes += stats.PeakMemoryBytes - } - metrics.RemoteExecutionMemoryUsageBytes.Set(float64(totalMemBytes)) - metrics.RemoteExecutionPeakMemoryUsageBytes.Set(float64(totalPeakMemBytes)) -} - -// Unregister records that the given container has completed execution. It must -// be called for each container whose stats are observed via ObserveStats, -// otherwise a memory leak will occur. -func (m *ContainerMetrics) Unregister(c CommandContainer) { - m.Observe(c, nil) -} - // UsageStats holds usage stats for a container. // It is useful for keeping track of usage relative to when the container // last executed a task. -// -// TODO: see whether its feasible to execute each task in its own cgroup -// so that we can avoid this bookkeeping and get stats without polling. type UsageStats struct { Clock clockwork.Clock @@ -265,9 +170,8 @@ func (s *UsageStats) clock() clockwork.Clock { } // Reset resets resource usage counters in preparation for a new task, so that -// the new task's resource usage can be accounted for. It should be called at -// the beginning of Run() as well as at the beginning of Exec() in the container -// lifecycle. +// the new task's resource usage can be accounted for. +// TODO: make this private - it should only be used by TrackExecution. func (s *UsageStats) Reset() { if s.last != nil { s.last.MemoryBytes = 0 @@ -354,6 +258,7 @@ func (s *UsageStats) updateTimeline(now time.Time) { // Update updates the usage for the current task, given a reading from the // lifetime stats (e.g. cgroup created when the task container was initially // created). +// TODO: make this private - it should only be used by TrackExecution. func (s *UsageStats) Update(lifetimeStats *repb.UsageStats) { s.last = lifetimeStats.CloneVT() if lifetimeStats.GetMemoryBytes() > s.peakMemoryUsageBytes { @@ -364,35 +269,31 @@ func (s *UsageStats) Update(lifetimeStats *repb.UsageStats) { } } -// TrackStats starts a goroutine to monitor the container's resource usage. It -// polls c.Stats() to get the cumulative usage since the start of the current -// task. +// TrackExecution starts a goroutine to monitor a container's resource usage +// during an execution, periodically calling Update. It polls the given stats +// function to get the cumulative usage for the lifetime of the container (not +// just the current task). // // The returned func stops tracking resource usage. It must be called, or else a // goroutine leak may occur. Monitoring can safely be stopped more than once. -// -// The returned channel should be received from at most once, *after* calling -// the returned stop function. The received value can be nil if stats were not -// successfully sampled at least once. -func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-chan *repb.UsageStats) { +func (s *UsageStats) TrackExecution(ctx context.Context, lifetimeStatsFn func(ctx context.Context) (*repb.UsageStats, error)) (stop func()) { + // Since we're starting a new execution, set the stats baseline to the last + // observed value. + s.Reset() + ctx, cancel := context.WithCancel(ctx) - result := make(chan *repb.UsageStats, 1) done := make(chan struct{}) go func() { defer close(done) - defer Metrics.Unregister(c) - var last *repb.UsageStats var lastErr error start := time.Now() defer func() { // Only log an error if the task ran long enough that we could // reasonably expect to sample stats at least once while it was - // executing. Note that we can't sample stats until podman creates - // the container, which can take a few hundred ms or possibly longer - // if the executor is heavily loaded. + // executing. dur := time.Since(start) - if last == nil && dur > 1*time.Second && lastErr != nil { + if dur > 1*time.Second && lastErr != nil && s.TaskStats() == nil { log.CtxWarningf(ctx, "Failed to read container stats: %s", lastErr) } }() @@ -402,16 +303,14 @@ func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-cha for { select { case <-ctx.Done(): - result <- last return case <-t.C: - stats, err := c.Stats(ctx) + stats, err := lifetimeStatsFn(ctx) if err != nil { lastErr = err continue } - Metrics.Observe(c, stats) - last = stats + s.Update(stats) } } }() @@ -419,7 +318,7 @@ func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-cha cancel() <-done } - return stop, result + return stop } type FileSystemLayout struct { diff --git a/enterprise/server/remote_execution/containers/bare/bare.go b/enterprise/server/remote_execution/containers/bare/bare.go index 6c9395b30b7..ec2f313243d 100644 --- a/enterprise/server/remote_execution/containers/bare/bare.go +++ b/enterprise/server/remote_execution/containers/bare/bare.go @@ -83,10 +83,9 @@ func (c *bareCommandContainer) Signal(ctx context.Context, sig syscall.Signal) e func (c *bareCommandContainer) exec(ctx context.Context, cmd *repb.Command, workDir string, stdio *interfaces.Stdio) (result *interfaces.CommandResult) { var statsListener procstats.Listener if c.opts.EnableStats { - defer container.Metrics.Unregister(c) - statsListener = func(stats *repb.UsageStats) { - container.Metrics.Observe(c, stats) - } + // Setting the stats listener to non-nil enables stats reporting in + // commandutil.RunWithOpts. + statsListener = func(*repb.UsageStats) {} } if *enableLogFiles { diff --git a/enterprise/server/remote_execution/containers/firecracker/firecracker.go b/enterprise/server/remote_execution/containers/firecracker/firecracker.go index 15add40cc0c..c7522a35c8a 100644 --- a/enterprise/server/remote_execution/containers/firecracker/firecracker.go +++ b/enterprise/server/remote_execution/containers/firecracker/firecracker.go @@ -2020,11 +2020,9 @@ func (c *FirecrackerContainer) SendExecRequestToGuest(ctx context.Context, conn client := vmxpb.NewExecClient(conn) health := hlpb.NewHealthClient(conn) - defer container.Metrics.Unregister(c) var lastObservedStatsMutex sync.Mutex var lastObservedStats *repb.UsageStats statsListener := func(stats *repb.UsageStats) { - container.Metrics.Observe(c, stats) lastObservedStatsMutex.Lock() lastObservedStats = stats lastObservedStatsMutex.Unlock() diff --git a/enterprise/server/remote_execution/containers/ociruntime/ociruntime.go b/enterprise/server/remote_execution/containers/ociruntime/ociruntime.go index 58305ed35f2..13cbadb7b5a 100644 --- a/enterprise/server/remote_execution/containers/ociruntime/ociruntime.go +++ b/enterprise/server/remote_execution/containers/ociruntime/ociruntime.go @@ -530,11 +530,6 @@ func (c *ociContainer) cleanupNetwork(ctx context.Context) error { } func (c *ociContainer) Stats(ctx context.Context) (*repb.UsageStats, error) { - lifetimeStats, err := c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice) - if err != nil { - return nil, err - } - c.stats.Update(lifetimeStats) return c.stats.TaskStats(), nil } @@ -542,15 +537,16 @@ func (c *ociContainer) Stats(ctx context.Context) (*repb.UsageStats, error) { // metrics are updated while the function is being executed, and that the // resource usage results are populated in the returned CommandResult. func (c *ociContainer) doWithStatsTracking(ctx context.Context, invokeRuntimeFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult { - c.stats.Reset() - stop, statsCh := container.TrackStats(ctx, c) + stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) { + return c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice) + }) res := invokeRuntimeFn(ctx) stop() // statsCh will report stats for processes inside the container, and // res.UsageStats will report stats for the container runtime itself. // Combine these stats to get the total usage. runtimeProcessStats := res.UsageStats - taskStats := <-statsCh + taskStats := c.stats.TaskStats() if taskStats == nil { taskStats = &repb.UsageStats{} } diff --git a/enterprise/server/remote_execution/containers/podman/podman.go b/enterprise/server/remote_execution/containers/podman/podman.go index 24896894f9e..4707e8ccfb5 100644 --- a/enterprise/server/remote_execution/containers/podman/podman.go +++ b/enterprise/server/remote_execution/containers/podman/podman.go @@ -431,15 +431,23 @@ func (c *podmanCommandContainer) Run(ctx context.Context, command *repb.Command, // metrics are updated while the function is executing, and that the UsageStats // field is populated after execution. func (c *podmanCommandContainer) doWithStatsTracking(ctx context.Context, runPodmanFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult { - c.stats.Reset() - stop, statsCh := container.TrackStats(ctx, c) + stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) { + if !c.options.EnableStats { + return nil, nil + } + cid, err := c.getCID(ctx) + if err != nil { + return nil, err + } + return c.cgroupPaths.Stats(ctx, cid, c.blockDevice) + }) res := runPodmanFn(ctx) stop() // statsCh will report stats for processes inside the container, and // res.UsageStats will report stats for the podman process itself. // Combine these stats to get the total usage. podmanProcessStats := res.UsageStats - taskStats := <-statsCh + taskStats := c.stats.TaskStats() if taskStats == nil { taskStats = &repb.UsageStats{} } @@ -709,20 +717,6 @@ func (c *podmanCommandContainer) Unpause(ctx context.Context) error { } func (c *podmanCommandContainer) Stats(ctx context.Context) (*repb.UsageStats, error) { - if !c.options.EnableStats { - return nil, nil - } - - cid, err := c.getCID(ctx) - if err != nil { - return nil, err - } - - lifetimeStats, err := c.cgroupPaths.Stats(ctx, cid, c.blockDevice) - if err != nil { - return nil, err - } - c.stats.Update(lifetimeStats) return c.stats.TaskStats(), nil } diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go index ba548f27e28..4aa8ee0a846 100644 --- a/server/metrics/metrics.go +++ b/server/metrics/metrics.go @@ -1111,38 +1111,6 @@ var ( Help: "Maximum total CPU time on the executor that can be allocated for task execution, in **milliCPU** (CPU-milliseconds per second).", }) - RemoteExecutionMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: bbNamespace, - Subsystem: "remote_execution", - Name: "memory_usage_bytes", - Help: "Current total task memory usage in **bytes**. This only accounts for tasks which are actively executing. To see memory usage of pooled runners, sum with runner pool memory usage.", - }) - - // #### Examples - // - // ```promql - // # Total approximate memory usage of active and pooled runners, - // # grouped by executor pod. - // sum by (pod_name) ( - // buildbuddy_remote_execution_memory_usage_bytes - // + buildbuddy_remote_execution_runner_pool_memory_usage_bytes - // ) - // ``` - - RemoteExecutionPeakMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: bbNamespace, - Subsystem: "remote_execution", - Name: "peak_memory_usage_bytes", - Help: "Current total peak memory usage in **bytes**. This is the sum of the peak memory usage for all tasks currently executing. It is not a very useful metric on its own, and is mainly intended for comparison with `assigned_ram_bytes`.", - }) - - RemoteExecutionUsedMilliCPU = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: bbNamespace, - Subsystem: "remote_execution", - Name: "used_milli_cpu", - Help: "Approximate cumulative CPU usage of executed tasks, in **CPU-milliseconds**.", - }) - RemoteExecutionCPUUtilization = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: bbNamespace, Subsystem: "remote_execution",