Skip to content

Commit

Permalink
[v16] In-process metrics registry (#51203)
Browse files Browse the repository at this point in the history
* Use a non-global metrics registry in Teleport (#50913)

* Support a non-global registry in Teleport

* lint

* Update lib/service/service.go

Co-authored-by: rosstimothy <[email protected]>

---------

Co-authored-by: rosstimothy <[email protected]>

* Serve metrics from the local registry in the diagnostic service (#51031)

* Use local metrics registry in the diagnostic service

* Test metrics are served by the diag service

* Init local registry at runtime instead of config (#51074)

* lint

---------

Co-authored-by: rosstimothy <[email protected]>
  • Loading branch information
hugoShaka and rosstimothy authored Jan 21, 2025
1 parent 1c67704 commit 82b7acd
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 2 deletions.
63 changes: 61 additions & 2 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -457,6 +458,15 @@ type TeleportProcess struct {
// resolver is used to identify the reverse tunnel address when connecting via
// the proxy.
resolver reversetunnelclient.Resolver

// metricRegistry is the prometheus metric registry for the process.
// Every teleport service that wants to register metrics should use this
// instead of the global prometheus.DefaultRegisterer to avoid registration
// conflicts.
//
// Both the metricsRegistry and the default global registry are gathered by
// Telepeort's metric service.
metricsRegistry *prometheus.Registry
}

type keyPairKey struct {
Expand Down Expand Up @@ -893,6 +903,15 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
"pid", fmt.Sprintf("%v.%v", os.Getpid(), processID),
)

// Use the custom metrics registry if specified, else create a new one.
// We must create the registry in NewTeleport, as opposed to ApplyConfig(),
// because some tests are running multiple Teleport instances from the same
// config.
metricsRegistry := cfg.MetricsRegistry
if metricsRegistry == nil {
metricsRegistry = prometheus.NewRegistry()
}

// If FIPS mode was requested make sure binary is build against BoringCrypto.
if cfg.FIPS {
if !modules.GetModules().IsBoringBinary() {
Expand Down Expand Up @@ -1073,6 +1092,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
keyPairs: make(map[keyPairKey]KeyPair),
cloudLabels: cloudLabels,
TracingProvider: tracing.NoopProvider(),
metricsRegistry: metricsRegistry,
}

process.registerExpectedServices(cfg)
Expand Down Expand Up @@ -3260,11 +3280,23 @@ func (process *TeleportProcess) initUploaderService() error {
return nil
}

// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger.
type promHTTPLogAdapter struct {
ctx context.Context
*slog.Logger
}

// Println implements the promhttp.Logger interface.
func (l promHTTPLogAdapter) Println(v ...interface{}) {
//nolint:sloglint // msg cannot be constant
l.ErrorContext(l.ctx, fmt.Sprint(v...))
}

// initMetricsService starts the metrics service currently serving metrics for
// prometheus consumption
func (process *TeleportProcess) initMetricsService() error {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", process.newMetricsHandler())

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id))

Expand Down Expand Up @@ -3349,6 +3381,33 @@ func (process *TeleportProcess) initMetricsService() error {
return nil
}

// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
// in-process one.
func (process *TeleportProcess) newMetricsHandler() http.Handler {
// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
// As we move more things to the local registry, especially in other tools like tbot, we will have less
// conflicts in tests.
ErrorHandling: promhttp.ContinueOnError,
ErrorLog: promHTTPLogAdapter{
ctx: process.ExitContext(),
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
},
}),
)
return metricsHandler
}

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService() error {
Expand All @@ -3358,7 +3417,7 @@ func (process *TeleportProcess) initDiagnosticService() error {
// metrics will otherwise be served by the metrics service if it's enabled
// in the config.
if !process.Config.Metrics.Enabled {
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", process.newMetricsHandler())
}

if process.Config.Debug {
Expand Down
150 changes: 150 additions & 0 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -39,7 +41,9 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1851,6 +1855,152 @@ func TestInitDatabaseService(t *testing.T) {
}
}

// TestMetricsService tests that the optional metrics service exposes
// metrics from both the in-process and global metrics registry. When the
// service is disabled, metrics are served by the diagnostics service
// (tested in TestMetricsInDiagnosticsService).
func TestMetricsService(t *testing.T) {
t.Parallel()
// Test setup: create a listener for the metrics server, get its file descriptor.

// Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production
// build and avoid a cyclic dependency.
metricsListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, metricsListener.Close())
})
require.IsType(t, &net.TCPListener{}, metricsListener)
metricsListenerFile, err := metricsListener.(*net.TCPListener).File()
require.NoError(t, err)

// Test setup: create a new teleport process
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.Metrics.Enabled = true

// Configure the metrics server to use the listener we previously created.
cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()}
cfg.FileDescriptors = []*servicecfg.FileDescriptor{
{Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile},
}

// Create and start the Teleport service.
process, err := NewTeleport(cfg)
require.NoError(t, err)
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "local_metric_" + nonce,
})
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "global_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, MetricsReady)
require.NoError(t, err)

// Test execution: get metrics and check the tests metrics are here.
metricsURL, err := url.Parse("http://" + metricsListener.Addr().String())
require.NoError(t, err)
metricsURL.Path = "/metrics"
resp, err := http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
}

// TestMetricsInDiagnosticsService tests that the diagnostics service exposes
// metrics from both the in-process and global metrics registry when the metrics
// service is disabled.
func TestMetricsInDiagnosticsService(t *testing.T) {
t.Parallel()
// Test setup: create a new teleport process
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}

// Test setup: Create and start the Teleport service.
process, err := NewTeleport(cfg)
require.NoError(t, err)
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "local_metric_" + nonce,
})
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "global_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, TeleportReadyEvent)
require.NoError(t, err)

// Test execution: query the metrics endpoint and check the tests metrics are here.
diagAddr, err := process.DiagnosticAddr()
require.NoError(t, err)
metricsURL, err := url.Parse("http://" + diagAddr.String())
require.NoError(t, err)
metricsURL.Path = "/metrics"
resp, err := http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
}

// makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to
// avoid https://github.com/golang/go/issues/62614.
func makeTempDir(t *testing.T) string {
Expand Down
7 changes: 7 additions & 0 deletions lib/service/servicecfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ghodss/yaml"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"

Expand Down Expand Up @@ -264,6 +265,12 @@ type Config struct {
// AccessGraph represents AccessGraph server config
AccessGraph AccessGraphConfig

// MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics.
// As of today, not every Teleport metric is registered against this registry. Some Teleport services
// and Teleport dependencies are using the global registry.
// Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service.
MetricsRegistry *prometheus.Registry

// token is either the token needed to join the auth server, or a path pointing to a file
// that contains the token
//
Expand Down

0 comments on commit 82b7acd

Please sign in to comment.