Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] In-process metrics registry #51204

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -470,6 +471,15 @@ type TeleportProcess struct {
// resolver is used to identify the reverse tunnel address when connecting via
// the proxy.
resolver reversetunnelclient.Resolver

// metricRegistry is the prometheus metric registry for the process.
// Every teleport service that wants to register metrics should use this
// instead of the global prometheus.DefaultRegisterer to avoid registration
// conflicts.
//
// Both the metricsRegistry and the default global registry are gathered by
// Telepeort's metric service.
metricsRegistry *prometheus.Registry
}

type keyPairKey struct {
Expand Down Expand Up @@ -906,6 +916,15 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
"pid", fmt.Sprintf("%v.%v", os.Getpid(), processID),
)

// Use the custom metrics registry if specified, else create a new one.
// We must create the registry in NewTeleport, as opposed to ApplyConfig(),
// because some tests are running multiple Teleport instances from the same
// config.
metricsRegistry := cfg.MetricsRegistry
if metricsRegistry == nil {
metricsRegistry = prometheus.NewRegistry()
}

// If FIPS mode was requested make sure binary is build against BoringCrypto.
if cfg.FIPS {
if !modules.GetModules().IsBoringBinary() {
Expand Down Expand Up @@ -1086,6 +1105,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
keyPairs: make(map[keyPairKey]KeyPair),
cloudLabels: cloudLabels,
TracingProvider: tracing.NoopProvider(),
metricsRegistry: metricsRegistry,
}

process.registerExpectedServices(cfg)
Expand Down Expand Up @@ -3346,11 +3366,23 @@ func (process *TeleportProcess) initUploaderService() error {
return nil
}

// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger.
type promHTTPLogAdapter struct {
ctx context.Context
*slog.Logger
}

// Println implements the promhttp.Logger interface.
func (l promHTTPLogAdapter) Println(v ...interface{}) {
//nolint:sloglint // msg cannot be constant
l.ErrorContext(l.ctx, fmt.Sprint(v...))
}

// initMetricsService starts the metrics service currently serving metrics for
// prometheus consumption
func (process *TeleportProcess) initMetricsService() error {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", process.newMetricsHandler())

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id))

Expand Down Expand Up @@ -3435,6 +3467,33 @@ func (process *TeleportProcess) initMetricsService() error {
return nil
}

// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
// in-process one.
func (process *TeleportProcess) newMetricsHandler() http.Handler {
// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
// As we move more things to the local registry, especially in other tools like tbot, we will have less
// conflicts in tests.
ErrorHandling: promhttp.ContinueOnError,
ErrorLog: promHTTPLogAdapter{
ctx: process.ExitContext(),
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
},
}),
)
return metricsHandler
}

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService() error {
Expand All @@ -3444,7 +3503,7 @@ func (process *TeleportProcess) initDiagnosticService() error {
// metrics will otherwise be served by the metrics service if it's enabled
// in the config.
if !process.Config.Metrics.Enabled {
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", process.newMetricsHandler())
}

if process.Config.Debug {
Expand Down
150 changes: 150 additions & 0 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -39,7 +41,9 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1755,6 +1759,152 @@ func TestInstanceMetadata(t *testing.T) {
}
}

// TestMetricsService tests that the optional metrics service exposes
// metrics from both the in-process and global metrics registry. When the
// service is disabled, metrics are served by the diagnostics service
// (tested in TestMetricsInDiagnosticsService).
func TestMetricsService(t *testing.T) {
t.Parallel()
// Test setup: create a listener for the metrics server, get its file descriptor.

// Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production
// build and avoid a cyclic dependency.
metricsListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, metricsListener.Close())
})
require.IsType(t, &net.TCPListener{}, metricsListener)
metricsListenerFile, err := metricsListener.(*net.TCPListener).File()
require.NoError(t, err)

// Test setup: create a new teleport process
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.Metrics.Enabled = true

// Configure the metrics server to use the listener we previously created.
cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()}
cfg.FileDescriptors = []*servicecfg.FileDescriptor{
{Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile},
}

// Create and start the Teleport service.
process, err := NewTeleport(cfg)
require.NoError(t, err)
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "local_metric_" + nonce,
})
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "global_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, MetricsReady)
require.NoError(t, err)

// Test execution: get metrics and check the tests metrics are here.
metricsURL, err := url.Parse("http://" + metricsListener.Addr().String())
require.NoError(t, err)
metricsURL.Path = "/metrics"
resp, err := http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
}

// TestMetricsInDiagnosticsService tests that the diagnostics service exposes
// metrics from both the in-process and global metrics registry when the metrics
// service is disabled.
func TestMetricsInDiagnosticsService(t *testing.T) {
t.Parallel()
// Test setup: create a new teleport process
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}

// Test setup: Create and start the Teleport service.
process, err := NewTeleport(cfg)
require.NoError(t, err)
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "local_metric_" + nonce,
})
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "global_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, TeleportReadyEvent)
require.NoError(t, err)

// Test execution: query the metrics endpoint and check the tests metrics are here.
diagAddr, err := process.DiagnosticAddr()
require.NoError(t, err)
metricsURL, err := url.Parse("http://" + diagAddr.String())
require.NoError(t, err)
metricsURL.Path = "/metrics"
resp, err := http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
}

// makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to
// avoid https://github.com/golang/go/issues/62614.
func makeTempDir(t *testing.T) string {
Expand Down
7 changes: 7 additions & 0 deletions lib/service/servicecfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ghodss/yaml"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/sashabaranov/go-openai"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -263,6 +264,12 @@ type Config struct {
// AccessGraph represents AccessGraph server config
AccessGraph AccessGraphConfig

// MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics.
// As of today, not every Teleport metric is registered against this registry. Some Teleport services
// and Teleport dependencies are using the global registry.
// Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service.
MetricsRegistry *prometheus.Registry

// token is either the token needed to join the auth server, or a path pointing to a file
// that contains the token
//
Expand Down
Loading