Skip to content

Commit

Permalink
Clean up cgroup polling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany committed Dec 2, 2024
1 parent 0f5f692 commit 667f2b8
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 172 deletions.
1 change: 0 additions & 1 deletion enterprise/server/remote_execution/container/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//proto:remote_execution_go_proto",
"//server/environment",
"//server/interfaces",
"//server/metrics",
"//server/util/alert",
"//server/util/authutil",
"//server/util/flag",
Expand Down
122 changes: 13 additions & 109 deletions enterprise/server/remote_execution/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/buildbuddy-io/buildbuddy/enterprise/server/util/oci"
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
"github.com/buildbuddy-io/buildbuddy/server/util/flag"
Expand Down Expand Up @@ -56,10 +55,6 @@ const (
)

var (
// Metrics is a shared metrics object to handle proper prometheus metrics
// accounting across container instances.
Metrics = NewContainerMetrics()

// ErrRemoved is returned by TracedCommandContainer operations when an
// operation fails due to the container already being removed.
ErrRemoved = status.UnavailableError("container has been removed")
Expand Down Expand Up @@ -124,93 +119,6 @@ type Init struct {
Publisher *operation.Publisher
}

// ContainerMetrics handles Prometheus metrics accounting for CommandContainer
// instances.
type ContainerMetrics struct {
mu sync.Mutex
// Latest stats observed, per-container.
latest map[CommandContainer]*repb.UsageStats
// CPU usage for the current usage interval, per-container. This is cleared
// every time we update the CPU gauge.
intervalCPUNanos int64
}

func NewContainerMetrics() *ContainerMetrics {
return &ContainerMetrics{
latest: make(map[CommandContainer]*repb.UsageStats),
}
}

// Start kicks off a goroutine that periodically updates the CPU gauge.
func (m *ContainerMetrics) Start(ctx context.Context) {
go func() {
t := time.NewTicker(cpuUsageUpdateInterval)
defer t.Stop()
lastTick := time.Now()
for {
select {
case <-ctx.Done():
return
case <-t.C:
tick := time.Now()
m.updateCPUMetric(tick.Sub(lastTick))
lastTick = tick
}
}
}()
}

func (m *ContainerMetrics) updateCPUMetric(dt time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
milliCPU := (float64(m.intervalCPUNanos) / 1e6) / dt.Seconds()
metrics.RemoteExecutionCPUUtilization.Set(milliCPU)
m.intervalCPUNanos = 0
}

// Observe records the latest stats for the current container execution.
func (m *ContainerMetrics) Observe(c CommandContainer, s *repb.UsageStats) {
m.mu.Lock()
defer m.mu.Unlock()
if s == nil {
delete(m.latest, c)
} else {
// Before recording CPU usage, sum the previous CPU usage so we know how
// much new usage has been incurred.
var prevCPUNanos int64
for _, stats := range m.latest {
prevCPUNanos += stats.CpuNanos
}
m.latest[c] = s
var cpuNanos int64
for _, stats := range m.latest {
cpuNanos += stats.CpuNanos
}
diffCPUNanos := cpuNanos - prevCPUNanos
// Note: This > 0 check is here to avoid panicking in case there are
// issues with process stats returning non-monotonically-increasing
// values for CPU usage.
if diffCPUNanos > 0 {
metrics.RemoteExecutionUsedMilliCPU.Add(float64(diffCPUNanos) / 1e6)
m.intervalCPUNanos += diffCPUNanos
}
}
var totalMemBytes, totalPeakMemBytes int64
for _, stats := range m.latest {
totalMemBytes += stats.MemoryBytes
totalPeakMemBytes += stats.PeakMemoryBytes
}
metrics.RemoteExecutionMemoryUsageBytes.Set(float64(totalMemBytes))
metrics.RemoteExecutionPeakMemoryUsageBytes.Set(float64(totalPeakMemBytes))
}

// Unregister records that the given container has completed execution. It must
// be called for each container whose stats are observed via ObserveStats,
// otherwise a memory leak will occur.
func (m *ContainerMetrics) Unregister(c CommandContainer) {
m.Observe(c, nil)
}

// UsageStats holds usage stats for a container.
// It is useful for keeping track of usage relative to when the container
// last executed a task.
Expand Down Expand Up @@ -364,24 +272,22 @@ func (s *UsageStats) Update(lifetimeStats *repb.UsageStats) {
}
}

// TrackStats starts a goroutine to monitor the container's resource usage. It
// polls c.Stats() to get the cumulative usage since the start of the current
// task.
// TrackExecution starts a goroutine to monitor a container's resource usage
// during an execution, periodically calling Update. It polls the given stats
// function to get the cumulative usage for the lifetime of the container (not
// just the current task).
//
// The returned func stops tracking resource usage. It must be called, or else a
// goroutine leak may occur. Monitoring can safely be stopped more than once.
//
// The returned channel should be received from at most once, *after* calling
// the returned stop function. The received value can be nil if stats were not
// successfully sampled at least once.
func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-chan *repb.UsageStats) {
func (s *UsageStats) TrackExecution(ctx context.Context, lifetimeStatsFn func(ctx context.Context) (*repb.UsageStats, error)) (stop func()) {
// Since we're starting a new execution, set the stats baseline to the last
// observed value.
s.Reset()

ctx, cancel := context.WithCancel(ctx)
result := make(chan *repb.UsageStats, 1)
done := make(chan struct{})
go func() {
defer close(done)
defer Metrics.Unregister(c)
var last *repb.UsageStats
var lastErr error

start := time.Now()
Expand All @@ -392,7 +298,7 @@ func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-cha
// the container, which can take a few hundred ms or possibly longer
// if the executor is heavily loaded.
dur := time.Since(start)
if last == nil && dur > 1*time.Second && lastErr != nil {
if dur > 1*time.Second && lastErr != nil && s.TaskStats() == nil {
log.CtxWarningf(ctx, "Failed to read container stats: %s", lastErr)
}
}()
Expand All @@ -402,24 +308,22 @@ func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-cha
for {
select {
case <-ctx.Done():
result <- last
return
case <-t.C:
stats, err := c.Stats(ctx)
stats, err := lifetimeStatsFn(ctx)
if err != nil {
lastErr = err
continue
}
Metrics.Observe(c, stats)
last = stats
s.Update(stats)
}
}
}()
stop = func() {
cancel()
<-done
}
return stop, result
return stop
}

type FileSystemLayout struct {
Expand Down
9 changes: 4 additions & 5 deletions enterprise/server/remote_execution/containers/bare/bare.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ func (c *bareCommandContainer) Signal(ctx context.Context, sig syscall.Signal) e
func (c *bareCommandContainer) exec(ctx context.Context, cmd *repb.Command, workDir string, stdio *interfaces.Stdio) (result *interfaces.CommandResult) {
var statsListener procstats.Listener
if c.opts.EnableStats {
defer container.Metrics.Unregister(c)
statsListener = func(stats *repb.UsageStats) {
container.Metrics.Observe(c, stats)
}
// Setting the stats listener to non-nil enables stats reporting in
// commandutil.RunWithOpts.
statsListener = func(*repb.UsageStats) {}
}

if *enableLogFiles {
Expand Down Expand Up @@ -129,8 +128,8 @@ func (c *bareCommandContainer) exec(ctx context.Context, cmd *repb.Command, work

return commandutil.RunWithOpts(ctx, cmd, &commandutil.RunOpts{
Dir: workDir,
StatsListener: statsListener,
Stdio: stdio,
StatsListener: statsListener,
Signal: c.signal,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,27 +530,23 @@ func (c *ociContainer) cleanupNetwork(ctx context.Context) error {
}

func (c *ociContainer) Stats(ctx context.Context) (*repb.UsageStats, error) {
lifetimeStats, err := c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice)
if err != nil {
return nil, err
}
c.stats.Update(lifetimeStats)
return c.stats.TaskStats(), nil
}

// Instruments an OCI runtime call with monitor() to ensure that resource usage
// metrics are updated while the function is being executed, and that the
// resource usage results are populated in the returned CommandResult.
func (c *ociContainer) doWithStatsTracking(ctx context.Context, invokeRuntimeFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult {
c.stats.Reset()
stop, statsCh := container.TrackStats(ctx, c)
stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) {
return c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice)
})
res := invokeRuntimeFn(ctx)
stop()
// statsCh will report stats for processes inside the container, and
// res.UsageStats will report stats for the container runtime itself.
// Combine these stats to get the total usage.
runtimeProcessStats := res.UsageStats
taskStats := <-statsCh
taskStats := c.stats.TaskStats()
if taskStats == nil {
taskStats = &repb.UsageStats{}
}
Expand Down
28 changes: 11 additions & 17 deletions enterprise/server/remote_execution/containers/podman/podman.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,23 @@ func (c *podmanCommandContainer) Run(ctx context.Context, command *repb.Command,
// metrics are updated while the function is executing, and that the UsageStats
// field is populated after execution.
func (c *podmanCommandContainer) doWithStatsTracking(ctx context.Context, runPodmanFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult {
c.stats.Reset()
stop, statsCh := container.TrackStats(ctx, c)
stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) {
if !c.options.EnableStats {
return nil, nil
}
cid, err := c.getCID(ctx)
if err != nil {
return nil, err
}
return c.cgroupPaths.Stats(ctx, cid, c.blockDevice)
})
res := runPodmanFn(ctx)
stop()
// statsCh will report stats for processes inside the container, and
// res.UsageStats will report stats for the podman process itself.
// Combine these stats to get the total usage.
podmanProcessStats := res.UsageStats
taskStats := <-statsCh
taskStats := c.stats.TaskStats()
if taskStats == nil {
taskStats = &repb.UsageStats{}
}
Expand Down Expand Up @@ -709,20 +717,6 @@ func (c *podmanCommandContainer) Unpause(ctx context.Context) error {
}

func (c *podmanCommandContainer) Stats(ctx context.Context) (*repb.UsageStats, error) {
if !c.options.EnableStats {
return nil, nil
}

cid, err := c.getCID(ctx)
if err != nil {
return nil, err
}

lifetimeStats, err := c.cgroupPaths.Stats(ctx, cid, c.blockDevice)
if err != nil {
return nil, err
}
c.stats.Update(lifetimeStats)
return c.stats.TaskStats(), nil
}

Expand Down
32 changes: 0 additions & 32 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,38 +1111,6 @@ var (
Help: "Maximum total CPU time on the executor that can be allocated for task execution, in **milliCPU** (CPU-milliseconds per second).",
})

RemoteExecutionMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "memory_usage_bytes",
Help: "Current total task memory usage in **bytes**. This only accounts for tasks which are actively executing. To see memory usage of pooled runners, sum with runner pool memory usage.",
})

// #### Examples
//
// ```promql
// # Total approximate memory usage of active and pooled runners,
// # grouped by executor pod.
// sum by (pod_name) (
// buildbuddy_remote_execution_memory_usage_bytes
// + buildbuddy_remote_execution_runner_pool_memory_usage_bytes
// )
// ```

RemoteExecutionPeakMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "peak_memory_usage_bytes",
Help: "Current total peak memory usage in **bytes**. This is the sum of the peak memory usage for all tasks currently executing. It is not a very useful metric on its own, and is mainly intended for comparison with `assigned_ram_bytes`.",
})

RemoteExecutionUsedMilliCPU = promauto.NewCounter(prometheus.CounterOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "used_milli_cpu",
Help: "Approximate cumulative CPU usage of executed tasks, in **CPU-milliseconds**.",
})

RemoteExecutionCPUUtilization = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Expand Down

0 comments on commit 667f2b8

Please sign in to comment.