Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify stats polling logic #7987

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion enterprise/server/cmd/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func main() {
log.Fatalf("Error starting task scheduler: %v", err)
}

container.Metrics.Start(rootContext)
monitoring.StartMonitoringHandler(env, fmt.Sprintf("%s:%d", *listen, *monitoringPort))

// Setup SSL for monitoring endpoints (optional).
Expand Down
1 change: 0 additions & 1 deletion enterprise/server/remote_execution/container/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//proto:remote_execution_go_proto",
"//server/environment",
"//server/interfaces",
"//server/metrics",
"//server/util/alert",
"//server/util/authutil",
"//server/util/flag",
Expand Down
135 changes: 17 additions & 118 deletions enterprise/server/remote_execution/container/container.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the TODO on line 126 completed now?

Copy link
Member Author

@bduffany bduffany Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this TODO for now - if we wanted to remove this "baseline" tracking logic, but still be able to properly track usage for remote persistent workers, then we'd have to migrate all of the container processes to a new cgroup for each task executed, which seems a bit too complicated. We can maybe revisit later.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/buildbuddy-io/buildbuddy/enterprise/server/util/oci"
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
"github.com/buildbuddy-io/buildbuddy/server/util/flag"
Expand Down Expand Up @@ -56,10 +55,6 @@ const (
)

var (
// Metrics is a shared metrics object to handle proper prometheus metrics
// accounting across container instances.
Metrics = NewContainerMetrics()

// ErrRemoved is returned by TracedCommandContainer operations when an
// operation fails due to the container already being removed.
ErrRemoved = status.UnavailableError("container has been removed")
Expand Down Expand Up @@ -124,99 +119,9 @@ type Init struct {
Publisher *operation.Publisher
}

// ContainerMetrics handles Prometheus metrics accounting for CommandContainer
// instances.
type ContainerMetrics struct {
mu sync.Mutex
// Latest stats observed, per-container.
latest map[CommandContainer]*repb.UsageStats
// CPU usage for the current usage interval, per-container. This is cleared
// every time we update the CPU gauge.
intervalCPUNanos int64
}

func NewContainerMetrics() *ContainerMetrics {
return &ContainerMetrics{
latest: make(map[CommandContainer]*repb.UsageStats),
}
}

// Start kicks off a goroutine that periodically updates the CPU gauge.
func (m *ContainerMetrics) Start(ctx context.Context) {
go func() {
t := time.NewTicker(cpuUsageUpdateInterval)
defer t.Stop()
lastTick := time.Now()
for {
select {
case <-ctx.Done():
return
case <-t.C:
tick := time.Now()
m.updateCPUMetric(tick.Sub(lastTick))
lastTick = tick
}
}
}()
}

func (m *ContainerMetrics) updateCPUMetric(dt time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
milliCPU := (float64(m.intervalCPUNanos) / 1e6) / dt.Seconds()
metrics.RemoteExecutionCPUUtilization.Set(milliCPU)
m.intervalCPUNanos = 0
}

// Observe records the latest stats for the current container execution.
func (m *ContainerMetrics) Observe(c CommandContainer, s *repb.UsageStats) {
m.mu.Lock()
defer m.mu.Unlock()
if s == nil {
delete(m.latest, c)
} else {
// Before recording CPU usage, sum the previous CPU usage so we know how
// much new usage has been incurred.
var prevCPUNanos int64
for _, stats := range m.latest {
prevCPUNanos += stats.CpuNanos
}
m.latest[c] = s
var cpuNanos int64
for _, stats := range m.latest {
cpuNanos += stats.CpuNanos
}
diffCPUNanos := cpuNanos - prevCPUNanos
// Note: This > 0 check is here to avoid panicking in case there are
// issues with process stats returning non-monotonically-increasing
// values for CPU usage.
if diffCPUNanos > 0 {
metrics.RemoteExecutionUsedMilliCPU.Add(float64(diffCPUNanos) / 1e6)
m.intervalCPUNanos += diffCPUNanos
}
}
var totalMemBytes, totalPeakMemBytes int64
for _, stats := range m.latest {
totalMemBytes += stats.MemoryBytes
totalPeakMemBytes += stats.PeakMemoryBytes
}
metrics.RemoteExecutionMemoryUsageBytes.Set(float64(totalMemBytes))
metrics.RemoteExecutionPeakMemoryUsageBytes.Set(float64(totalPeakMemBytes))
}

// Unregister records that the given container has completed execution. It must
// be called for each container whose stats are observed via ObserveStats,
// otherwise a memory leak will occur.
func (m *ContainerMetrics) Unregister(c CommandContainer) {
m.Observe(c, nil)
}

// UsageStats holds usage stats for a container.
// It is useful for keeping track of usage relative to when the container
// last executed a task.
//
// TODO: see whether its feasible to execute each task in its own cgroup
// so that we can avoid this bookkeeping and get stats without polling.
type UsageStats struct {
Clock clockwork.Clock

Expand Down Expand Up @@ -265,9 +170,8 @@ func (s *UsageStats) clock() clockwork.Clock {
}

// Reset resets resource usage counters in preparation for a new task, so that
// the new task's resource usage can be accounted for. It should be called at
// the beginning of Run() as well as at the beginning of Exec() in the container
// lifecycle.
// the new task's resource usage can be accounted for.
// TODO: make this private - it should only be used by TrackExecution.
func (s *UsageStats) Reset() {
if s.last != nil {
s.last.MemoryBytes = 0
Expand Down Expand Up @@ -354,6 +258,7 @@ func (s *UsageStats) updateTimeline(now time.Time) {
// Update updates the usage for the current task, given a reading from the
// lifetime stats (e.g. cgroup created when the task container was initially
// created).
// TODO: make this private - it should only be used by TrackExecution.
func (s *UsageStats) Update(lifetimeStats *repb.UsageStats) {
s.last = lifetimeStats.CloneVT()
if lifetimeStats.GetMemoryBytes() > s.peakMemoryUsageBytes {
Expand All @@ -364,35 +269,31 @@ func (s *UsageStats) Update(lifetimeStats *repb.UsageStats) {
}
}

// TrackStats starts a goroutine to monitor the container's resource usage. It
// polls c.Stats() to get the cumulative usage since the start of the current
// task.
// TrackExecution starts a goroutine to monitor a container's resource usage
// during an execution, periodically calling Update. It polls the given stats
// function to get the cumulative usage for the lifetime of the container (not
// just the current task).
//
// The returned func stops tracking resource usage. It must be called, or else a
// goroutine leak may occur. Monitoring can safely be stopped more than once.
//
// The returned channel should be received from at most once, *after* calling
// the returned stop function. The received value can be nil if stats were not
// successfully sampled at least once.
func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-chan *repb.UsageStats) {
func (s *UsageStats) TrackExecution(ctx context.Context, lifetimeStatsFn func(ctx context.Context) (*repb.UsageStats, error)) (stop func()) {
// Since we're starting a new execution, set the stats baseline to the last
// observed value.
s.Reset()

ctx, cancel := context.WithCancel(ctx)
result := make(chan *repb.UsageStats, 1)
done := make(chan struct{})
go func() {
defer close(done)
defer Metrics.Unregister(c)
var last *repb.UsageStats
var lastErr error

start := time.Now()
defer func() {
// Only log an error if the task ran long enough that we could
// reasonably expect to sample stats at least once while it was
// executing. Note that we can't sample stats until podman creates
// the container, which can take a few hundred ms or possibly longer
// if the executor is heavily loaded.
// executing.
dur := time.Since(start)
if last == nil && dur > 1*time.Second && lastErr != nil {
if dur > 1*time.Second && lastErr != nil && s.TaskStats() == nil {
log.CtxWarningf(ctx, "Failed to read container stats: %s", lastErr)
}
}()
Expand All @@ -402,24 +303,22 @@ func TrackStats(ctx context.Context, c CommandContainer) (stop func(), res <-cha
for {
select {
case <-ctx.Done():
result <- last
return
case <-t.C:
stats, err := c.Stats(ctx)
stats, err := lifetimeStatsFn(ctx)
if err != nil {
lastErr = err
continue
}
Metrics.Observe(c, stats)
last = stats
s.Update(stats)
}
}
}()
stop = func() {
cancel()
<-done
}
return stop, result
return stop
}

type FileSystemLayout struct {
Expand Down
7 changes: 3 additions & 4 deletions enterprise/server/remote_execution/containers/bare/bare.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ func (c *bareCommandContainer) Signal(ctx context.Context, sig syscall.Signal) e
func (c *bareCommandContainer) exec(ctx context.Context, cmd *repb.Command, workDir string, stdio *interfaces.Stdio) (result *interfaces.CommandResult) {
var statsListener procstats.Listener
if c.opts.EnableStats {
defer container.Metrics.Unregister(c)
statsListener = func(stats *repb.UsageStats) {
container.Metrics.Observe(c, stats)
}
// Setting the stats listener to non-nil enables stats reporting in
// commandutil.RunWithOpts.
statsListener = func(*repb.UsageStats) {}
}

if *enableLogFiles {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,9 @@ func (c *FirecrackerContainer) SendExecRequestToGuest(ctx context.Context, conn
client := vmxpb.NewExecClient(conn)
health := hlpb.NewHealthClient(conn)

defer container.Metrics.Unregister(c)
var lastObservedStatsMutex sync.Mutex
var lastObservedStats *repb.UsageStats
statsListener := func(stats *repb.UsageStats) {
container.Metrics.Observe(c, stats)
lastObservedStatsMutex.Lock()
lastObservedStats = stats
lastObservedStatsMutex.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,27 +530,23 @@ func (c *ociContainer) cleanupNetwork(ctx context.Context) error {
}

func (c *ociContainer) Stats(ctx context.Context) (*repb.UsageStats, error) {
lifetimeStats, err := c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice)
if err != nil {
return nil, err
}
c.stats.Update(lifetimeStats)
return c.stats.TaskStats(), nil
}

// Instruments an OCI runtime call with monitor() to ensure that resource usage
// metrics are updated while the function is being executed, and that the
// resource usage results are populated in the returned CommandResult.
func (c *ociContainer) doWithStatsTracking(ctx context.Context, invokeRuntimeFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult {
c.stats.Reset()
stop, statsCh := container.TrackStats(ctx, c)
stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) {
return c.cgroupPaths.Stats(ctx, c.cid, c.blockDevice)
})
res := invokeRuntimeFn(ctx)
stop()
// statsCh will report stats for processes inside the container, and
// res.UsageStats will report stats for the container runtime itself.
// Combine these stats to get the total usage.
runtimeProcessStats := res.UsageStats
taskStats := <-statsCh
taskStats := c.stats.TaskStats()
if taskStats == nil {
taskStats = &repb.UsageStats{}
}
Expand Down
28 changes: 11 additions & 17 deletions enterprise/server/remote_execution/containers/podman/podman.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,23 @@ func (c *podmanCommandContainer) Run(ctx context.Context, command *repb.Command,
// metrics are updated while the function is executing, and that the UsageStats
// field is populated after execution.
func (c *podmanCommandContainer) doWithStatsTracking(ctx context.Context, runPodmanFn func(ctx context.Context) *interfaces.CommandResult) *interfaces.CommandResult {
c.stats.Reset()
stop, statsCh := container.TrackStats(ctx, c)
stop := c.stats.TrackExecution(ctx, func(ctx context.Context) (*repb.UsageStats, error) {
if !c.options.EnableStats {
return nil, nil
}
cid, err := c.getCID(ctx)
if err != nil {
return nil, err
}
return c.cgroupPaths.Stats(ctx, cid, c.blockDevice)
})
res := runPodmanFn(ctx)
stop()
// statsCh will report stats for processes inside the container, and
// res.UsageStats will report stats for the podman process itself.
// Combine these stats to get the total usage.
podmanProcessStats := res.UsageStats
taskStats := <-statsCh
taskStats := c.stats.TaskStats()
if taskStats == nil {
taskStats = &repb.UsageStats{}
}
Expand Down Expand Up @@ -709,20 +717,6 @@ func (c *podmanCommandContainer) Unpause(ctx context.Context) error {
}

func (c *podmanCommandContainer) Stats(ctx context.Context) (*repb.UsageStats, error) {
if !c.options.EnableStats {
return nil, nil
}

cid, err := c.getCID(ctx)
if err != nil {
return nil, err
}

lifetimeStats, err := c.cgroupPaths.Stats(ctx, cid, c.blockDevice)
if err != nil {
return nil, err
}
c.stats.Update(lifetimeStats)
return c.stats.TaskStats(), nil
}

Expand Down
32 changes: 0 additions & 32 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,38 +1111,6 @@ var (
Help: "Maximum total CPU time on the executor that can be allocated for task execution, in **milliCPU** (CPU-milliseconds per second).",
})

RemoteExecutionMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "memory_usage_bytes",
Help: "Current total task memory usage in **bytes**. This only accounts for tasks which are actively executing. To see memory usage of pooled runners, sum with runner pool memory usage.",
})

// #### Examples
//
// ```promql
// # Total approximate memory usage of active and pooled runners,
// # grouped by executor pod.
// sum by (pod_name) (
// buildbuddy_remote_execution_memory_usage_bytes
// + buildbuddy_remote_execution_runner_pool_memory_usage_bytes
// )
// ```

RemoteExecutionPeakMemoryUsageBytes = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "peak_memory_usage_bytes",
Help: "Current total peak memory usage in **bytes**. This is the sum of the peak memory usage for all tasks currently executing. It is not a very useful metric on its own, and is mainly intended for comparison with `assigned_ram_bytes`.",
})

RemoteExecutionUsedMilliCPU = promauto.NewCounter(prometheus.CounterOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Name: "used_milli_cpu",
Help: "Approximate cumulative CPU usage of executed tasks, in **CPU-milliseconds**.",
})

RemoteExecutionCPUUtilization = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: bbNamespace,
Subsystem: "remote_execution",
Expand Down
Loading