diff --git a/README.md b/README.md index eaa0085..fb6894f 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,22 @@ # otelpgx -Provides [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-go) +Provides [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-go) instrumentation for the [jackc/pgx](https://github.com/jackc/pgx) library. ## Requirements -- go 1.18 (or higher) +- go 1.22 (or higher) - pgx v5 (or higher) ## Usage +Make sure you have a suitable pgx version: + +```bash +go get github.com/jackc/pgx/v5 +``` + Install the library: ```go @@ -28,10 +34,14 @@ if err != nil { cfg.ConnConfig.Tracer = otelpgx.NewTracer() -conn, err := pgxpool.NewWithConfig(ctx, cfg) +conn, err := pgxpool.NewConfig(ctx, cfg) if err != nil { return nil, fmt.Errorf("connect to database: %w", err) } + +if err := otelpgx.RecordStats(conn); err != nil { + return nil, fmt.Errorf("unable to record database stats: %w", err) +} ``` -See [options.go](options.go) for the full list of options. +See [options.go](options.go) for the full list of options. \ No newline at end of file diff --git a/go.mod b/go.mod index 2673704..04385ee 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,23 @@ module github.com/exaring/otelpgx -go 1.20 +go 1.22.0 + +toolchain go1.23.4 require ( github.com/jackc/pgx/v5 v5.6.0 - go.opentelemetry.io/otel v1.23.1 - go.opentelemetry.io/otel/trace v1.23.1 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 ) require ( - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - go.opentelemetry.io/otel/metric v1.23.1 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 8b8b9b7..ec4b330 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,13 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= @@ -19,13 +21,16 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY= -go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA= -go.opentelemetry.io/otel/metric v1.23.1 h1:PQJmqJ9u2QaJLBOELl1cxIdPcpbwzbkjfEyelTl2rlo= -go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI= -go.opentelemetry.io/otel/trace v1.23.1 h1:4LrmmEd8AU2rFvU1zegmvqW7+kWarxtNOPyeL6HmYY8= -go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= @@ -35,3 +40,4 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/meter.go b/meter.go new file mode 100644 index 0000000..c554dc8 --- /dev/null +++ b/meter.go @@ -0,0 +1,233 @@ +package otelpgx + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.27.0" +) + +const ( + // defaultMinimumReadDBStatsInterval is the default minimum interval between calls to db.Stats(). + defaultMinimumReadDBStatsInterval = time.Second +) + +var ( + pgxPoolAcquireCount = "pgxpool.acquires" + pgxPoolAcquireDuration = "pgxpool.acquire_duration" + pgxPoolAcquiredConnections = "pgxpool.acquired_connections" + pgxPoolCancelledAcquires = "pgxpool.canceled_acquires" + pgxPoolConstructingConnections = "pgxpool.constructing_connections" + pgxPoolEmptyAcquire = "pgxpool.empty_acquire" + pgxPoolIdleConnections = "pgxpool.idle_connections" + pgxPoolMaxConnections = "pgxpool.max_connections" + pgxPoolMaxIdleDestroyCount = "pgxpool.max_idle_destroys" + pgxPoolMaxLifetimeDestroyCount = "pgxpool.max_lifetime_destroys" + pgxPoolNewConnectionsCount = "pgxpool.new_connections" + pgxPoolTotalConnections = "pgxpool.total_connections" +) + +// RecordStats records database statistics for provided pgxpool.Pool at a default 1 second interval +// unless otherwise specified by the WithMinimumReadDBStatsInterval StatsOption. +func RecordStats(db *pgxpool.Pool, opts ...StatsOption) error { + o := statsOptions{ + meterProvider: otel.GetMeterProvider(), + minimumReadDBStatsInterval: defaultMinimumReadDBStatsInterval, + defaultAttributes: []attribute.KeyValue{ + semconv.DBSystemPostgreSQL, + }, + } + + for _, opt := range opts { + opt.applyStatsOptions(&o) + } + + meter := o.meterProvider.Meter(meterName, metric.WithInstrumentationVersion(findOwnImportedVersion())) + + return recordStats(meter, db, o.minimumReadDBStatsInterval, o.defaultAttributes...) +} + +func recordStats( + meter metric.Meter, + db *pgxpool.Pool, + minimumReadDBStatsInterval time.Duration, + attrs ...attribute.KeyValue, +) error { + var ( + err error + + // Asynchronous Observable Metrics + acquireCount metric.Int64ObservableCounter + acquireDuration metric.Int64ObservableCounter + acquiredConns metric.Int64ObservableUpDownCounter + cancelledAcquires metric.Int64ObservableCounter + constructingConns metric.Int64ObservableUpDownCounter + emptyAcquires metric.Int64ObservableCounter + idleConns metric.Int64ObservableUpDownCounter + maxConns metric.Int64ObservableGauge + maxIdleDestroyCount metric.Int64ObservableCounter + maxLifetimeDestroyCount metric.Int64ObservableCounter + newConnsCount metric.Int64ObservableCounter + totalConns metric.Int64ObservableUpDownCounter + + observeOptions []metric.ObserveOption + + dbStats *pgxpool.Stat + lastDBStats time.Time + + // lock prevents a race between batch observer and instrument registration. + lock sync.Mutex + ) + + serverAddress := semconv.ServerAddress(db.Config().ConnConfig.Host) + serverPort := semconv.ServerPort(int(db.Config().ConnConfig.Port)) + dbNamespace := semconv.DBNamespace(db.Config().ConnConfig.Database) + poolName := fmt.Sprintf("%s:%d/%s", serverAddress.Value.AsString(), serverPort.Value.AsInt64(), dbNamespace.Value.AsString()) + dbClientConnectionPoolName := semconv.DBClientConnectionPoolName(poolName) + + lock.Lock() + defer lock.Unlock() + + if acquireCount, err = meter.Int64ObservableCounter( + pgxPoolAcquireCount, + metric.WithDescription("Cumulative count of successful acquires from the pool."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolAcquireCount, err) + } + + if acquireDuration, err = meter.Int64ObservableCounter( + pgxPoolAcquireDuration, + metric.WithDescription("Total duration of all successful acquires from the pool in nanoseconds."), + metric.WithUnit("ns"), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolAcquireDuration, err) + } + + if acquiredConns, err = meter.Int64ObservableUpDownCounter( + pgxPoolAcquiredConnections, + metric.WithDescription("Number of currently acquired connections in the pool."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolAcquiredConnections, err) + } + + if cancelledAcquires, err = meter.Int64ObservableCounter( + pgxPoolCancelledAcquires, + metric.WithDescription("Cumulative count of acquires from the pool that were canceled by a context."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolCancelledAcquires, err) + } + + if constructingConns, err = meter.Int64ObservableUpDownCounter( + pgxPoolConstructingConnections, + metric.WithUnit("ms"), + metric.WithDescription("Number of connections with construction in progress in the pool."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolConstructingConnections, err) + } + + if emptyAcquires, err = meter.Int64ObservableCounter( + pgxPoolEmptyAcquire, + metric.WithDescription("Cumulative count of successful acquires from the pool that waited for a resource to be released or constructed because the pool was empty."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolEmptyAcquire, err) + } + + if idleConns, err = meter.Int64ObservableUpDownCounter( + pgxPoolIdleConnections, + metric.WithDescription("Number of currently idle connections in the pool."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolIdleConnections, err) + } + + if maxConns, err = meter.Int64ObservableGauge( + pgxPoolMaxConnections, + metric.WithDescription("Maximum size of the pool."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolMaxConnections, err) + } + + if maxIdleDestroyCount, err = meter.Int64ObservableCounter( + pgxPoolMaxIdleDestroyCount, + metric.WithDescription("Cumulative count of connections destroyed because they exceeded MaxConnectionsIdleTime."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolMaxIdleDestroyCount, err) + } + + if maxLifetimeDestroyCount, err = meter.Int64ObservableCounter( + pgxPoolMaxLifetimeDestroyCount, + metric.WithDescription("Cumulative count of connections destroyed because they exceeded MaxConnectionsLifetime."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolMaxLifetimeDestroyCount, err) + } + + if newConnsCount, err = meter.Int64ObservableCounter( + pgxPoolNewConnectionsCount, + metric.WithDescription("Cumulative count of new connections opened."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolNewConnectionsCount, err) + } + + if totalConns, err = meter.Int64ObservableUpDownCounter( + pgxPoolTotalConnections, + metric.WithDescription("Total number of resources currently in the pool. The value is the sum of ConstructingConnections, AcquiredConnections, and IdleConnections."), + ); err != nil { + return fmt.Errorf("failed to create asynchronous metric: %s with error: %w", pgxPoolTotalConnections, err) + } + + attrs = append(attrs, []attribute.KeyValue{ + semconv.DBSystemPostgreSQL, + dbClientConnectionPoolName, + }...) + + observeOptions = []metric.ObserveOption{ + metric.WithAttributes(attrs...), + } + + _, err = meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + lock.Lock() + defer lock.Unlock() + + now := time.Now() + if now.Sub(lastDBStats) >= minimumReadDBStatsInterval { + dbStats = db.Stat() + lastDBStats = now + } + + o.ObserveInt64(acquireCount, dbStats.AcquireCount(), observeOptions...) + o.ObserveInt64(acquireDuration, dbStats.AcquireDuration().Nanoseconds(), observeOptions...) + o.ObserveInt64(acquiredConns, int64(dbStats.AcquiredConns()), observeOptions...) + o.ObserveInt64(cancelledAcquires, dbStats.CanceledAcquireCount(), observeOptions...) + o.ObserveInt64(constructingConns, int64(dbStats.ConstructingConns()), observeOptions...) + o.ObserveInt64(emptyAcquires, dbStats.EmptyAcquireCount(), observeOptions...) + o.ObserveInt64(idleConns, int64(dbStats.IdleConns()), observeOptions...) + o.ObserveInt64(maxConns, int64(dbStats.MaxConns()), observeOptions...) + o.ObserveInt64(maxIdleDestroyCount, dbStats.MaxIdleDestroyCount(), observeOptions...) + o.ObserveInt64(maxLifetimeDestroyCount, dbStats.MaxLifetimeDestroyCount(), observeOptions...) + o.ObserveInt64(newConnsCount, dbStats.NewConnsCount(), observeOptions...) + o.ObserveInt64(totalConns, int64(dbStats.TotalConns()), observeOptions...) + + return nil + }, + acquireCount, + acquireDuration, + acquiredConns, + cancelledAcquires, + constructingConns, + emptyAcquires, + idleConns, + maxConns, + maxIdleDestroyCount, + maxLifetimeDestroyCount, + newConnsCount, + totalConns, + ) + + return err +} diff --git a/options.go b/options.go index 8780f31..6f56b15 100644 --- a/options.go +++ b/options.go @@ -1,7 +1,10 @@ package otelpgx import ( + "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -21,15 +24,40 @@ func (o optionFunc) apply(c *tracerConfig) { func WithTracerProvider(provider trace.TracerProvider) Option { return optionFunc(func(cfg *tracerConfig) { if provider != nil { - cfg.tp = provider + cfg.tracerProvider = provider } }) } -// WithAttributes specifies additional attributes to be added to the span. +// WithMeterProvider specifies a meter provider to use for creating a meter. +// If none is specified, the global provider is used. +func WithMeterProvider(provider metric.MeterProvider) Option { + return optionFunc(func(cfg *tracerConfig) { + if provider != nil { + cfg.meterProvider = provider + } + }) +} + +// Deprecated: Use WithTracerAttributes. +// +// WithAttributes specifies additional attributes to be added to spans. +// This is exactly equivalent to using WithTracerAttributes. func WithAttributes(attrs ...attribute.KeyValue) Option { + return WithTracerAttributes(attrs...) +} + +// WithTracerAttributes specifies additional attributes to be added to spans. +func WithTracerAttributes(attrs ...attribute.KeyValue) Option { + return optionFunc(func(cfg *tracerConfig) { + cfg.tracerAttrs = append(cfg.tracerAttrs, attrs...) + }) +} + +// WithMeterAttributes specifies additional attributes to be added to metrics. +func WithMeterAttributes(attrs ...attribute.KeyValue) Option { return optionFunc(func(cfg *tracerConfig) { - cfg.attrs = append(cfg.attrs, attrs...) + cfg.meterAttrs = append(cfg.meterAttrs, attrs...) }) } @@ -80,3 +108,46 @@ func WithIncludeQueryParameters() Option { cfg.includeParams = true }) } + +// StatsOption allows for managing RecordStats configuration using functional options. +type StatsOption interface { + applyStatsOptions(o *statsOptions) +} + +type statsOptions struct { + // meterProvider sets the metric.MeterProvider. If nil, the global Provider will be used. + meterProvider metric.MeterProvider + + // minimumReadDBStatsInterval sets the minimum interval between calls to db.Stats(). Negative values are ignored. + minimumReadDBStatsInterval time.Duration + + // defaultAttributes will be set to each metrics as default. + defaultAttributes []attribute.KeyValue +} + +type statsOptionFunc func(o *statsOptions) + +func (f statsOptionFunc) applyStatsOptions(o *statsOptions) { + f(o) +} + +// WithStatsMeterProvider sets meter provider to use for pgx stat metric collection. +func WithStatsMeterProvider(provider metric.MeterProvider) StatsOption { + return statsOptionFunc(func(o *statsOptions) { + o.meterProvider = provider + }) +} + +// WithStatsAttributes specifies additional attributes to be added to pgx stat metrics. +func WithStatsAttributes(attrs ...attribute.KeyValue) StatsOption { + return statsOptionFunc(func(o *statsOptions) { + o.defaultAttributes = append(o.defaultAttributes, attrs...) + }) +} + +// WithMinimumReadDBStatsInterval sets the minimum interval between calls to db.Stats(). Negative values are ignored. +func WithMinimumReadDBStatsInterval(interval time.Duration) StatsOption { + return statsOptionFunc(func(o *statsOptions) { + o.minimumReadDBStatsInterval = interval + }) +} diff --git a/tracer.go b/tracer.go index 7110c7d..217a108 100644 --- a/tracer.go +++ b/tracer.go @@ -7,6 +7,7 @@ import ( "fmt" "runtime/debug" "strings" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -14,35 +15,56 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.27.0" "go.opentelemetry.io/otel/trace" ) const ( - tracerName = "github.com/exaring/otelpgx" - + tracerName = "github.com/exaring/otelpgx" + meterName = "github.com/exaring/otelpgx" + startTimeCtxKey = "otelpgxStartTime" sqlOperationUnknown = "UNKNOWN" ) +const ( + pgxOperationQuery = "query" + pgxOperationCopy = "copy" + pgxOperationBatch = "batch" + pgxOperationConnect = "connect" + pgxOperationPrepare = "prepare" + pgxOperationAcquire = "acquire" +) + const ( // RowsAffectedKey represents the number of rows affected. RowsAffectedKey = attribute.Key("pgx.rows_affected") // QueryParametersKey represents the query parameters. QueryParametersKey = attribute.Key("pgx.query.parameters") - // BatchSizeKey represents the batch size. - BatchSizeKey = attribute.Key("pgx.batch.size") // PrepareStmtNameKey represents the prepared statement name. PrepareStmtNameKey = attribute.Key("pgx.prepare_stmt.name") // SQLStateKey represents PostgreSQL error code, // see https://www.postgresql.org/docs/current/errcodes-appendix.html. SQLStateKey = attribute.Key("pgx.sql_state") + // PGXOperationTypeKey represents the pgx tracer operation type + PGXOperationTypeKey = attribute.Key("pgx.operation.type") + // DBClientOperationErrorsKey represents the count of operation errors + DBClientOperationErrorsKey = attribute.Key("db.client.operation.errors") ) +var _ pgxpool.AcquireTracer = (*Tracer)(nil) + // Tracer is a wrapper around the pgx tracer interfaces which instrument -// queries. +// queries with both tracing and metrics. type Tracer struct { - tracer trace.Tracer - attrs []attribute.KeyValue + tracer trace.Tracer + meter metric.Meter + tracerAttrs []attribute.KeyValue + meterAttrs []attribute.KeyValue + + operationDuration metric.Int64Histogram + operationErrors metric.Int64Counter + trimQuerySpanName bool spanNameFunc SpanNameFunc prefixQuerySpanName bool @@ -51,8 +73,12 @@ type Tracer struct { } type tracerConfig struct { - tp trace.TracerProvider - attrs []attribute.KeyValue + tracerProvider trace.TracerProvider + meterProvider metric.MeterProvider + + tracerAttrs []attribute.KeyValue + meterAttrs []attribute.KeyValue + trimQuerySpanName bool spanNameFunc SpanNameFunc prefixQuerySpanName bool @@ -60,13 +86,15 @@ type tracerConfig struct { includeParams bool } -var _ pgxpool.AcquireTracer = (*Tracer)(nil) - // NewTracer returns a new Tracer. func NewTracer(opts ...Option) *Tracer { cfg := &tracerConfig{ - tp: otel.GetTracerProvider(), - attrs: []attribute.KeyValue{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + tracerAttrs: []attribute.KeyValue{ + semconv.DBSystemPostgreSQL, + }, + meterAttrs: []attribute.KeyValue{ semconv.DBSystemPostgreSQL, }, trimQuerySpanName: false, @@ -80,18 +108,50 @@ func NewTracer(opts ...Option) *Tracer { opt.apply(cfg) } - return &Tracer{ - tracer: cfg.tp.Tracer(tracerName, trace.WithInstrumentationVersion(findOwnImportedVersion())), - attrs: cfg.attrs, + tracer := &Tracer{ + tracer: cfg.tracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(findOwnImportedVersion())), + meter: cfg.meterProvider.Meter(meterName, metric.WithInstrumentationVersion(findOwnImportedVersion())), + tracerAttrs: cfg.tracerAttrs, + meterAttrs: cfg.meterAttrs, trimQuerySpanName: cfg.trimQuerySpanName, spanNameFunc: cfg.spanNameFunc, prefixQuerySpanName: cfg.prefixQuerySpanName, logSQLStatement: cfg.logSQLStatement, includeParams: cfg.includeParams, } + + tracer.createMetrics() + + return tracer +} + +// createMetrics initializes all synchronous metrics tracked by Tracer. +// Any errors encountered upon metric creation will be sent to the globally assigned OpenTelemetry ErrorHandler. +func (t *Tracer) createMetrics() { + var err error + + t.operationDuration, err = t.meter.Int64Histogram( + semconv.DBClientOperationDurationName, + metric.WithDescription(semconv.DBClientOperationDurationDescription), + metric.WithUnit("ms"), + ) + if err != nil { + otel.Handle(err) + } + + t.operationErrors, err = t.meter.Int64Counter( + string(DBClientOperationErrorsKey), + metric.WithDescription("The count of database client operation errors"), + ) + if err != nil { + otel.Handle(err) + } } -func recordError(span trace.Span, err error) { +// recordSpanError handles all error handling to be applied on the provided span. +// The provided error must be non-nil and not a sql.ErrNoRows error. +// Otherwise, recordSpanError will be a no-op. +func recordSpanError(span trace.Span, err error) { if err != nil && !errors.Is(err, sql.ErrNoRows) { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -103,6 +163,23 @@ func recordError(span trace.Span, err error) { } } +// incrementOperationErrorCount will increment the operation error count metric for any provided error +// that is non-nil and not sql.ErrNoRows. Otherwise, incrementOperationErrorCount becomes a no-op. +func (t *Tracer) incrementOperationErrorCount(ctx context.Context, err error, pgxOperation string) { + if err != nil && !errors.Is(err, sql.ErrNoRows) { + t.operationErrors.Add(ctx, 1, metric.WithAttributes(append(t.meterAttrs, PGXOperationTypeKey.String(pgxOperation))...)) + } +} + +// recordOperationDuration will compute and record the time since the start of an operation. +func (t *Tracer) recordOperationDuration(ctx context.Context, pgxOperation string) { + if startTime, ok := ctx.Value(startTimeCtxKey).(time.Time); ok { + t.operationDuration.Record(ctx, time.Since(startTime).Milliseconds(), metric.WithAttributes( + append(t.meterAttrs, PGXOperationTypeKey.String(pgxOperation))..., + )) + } +} + // sqlOperationName attempts to get the first 'word' from a given SQL query, which usually // is the operation name (e.g. 'SELECT'). func (t *Tracer) sqlOperationName(stmt string) string { @@ -129,9 +206,11 @@ func connectionAttributesFromConfig(config *pgx.ConnConfig) []trace.SpanStartOpt if config != nil { return []trace.SpanStartOption{ trace.WithAttributes( - semconv.NetPeerName(config.Host), - semconv.NetPeerPort(int(config.Port)), - semconv.DBUser(config.User), + semconv.DBSystemPostgreSQL, + semconv.ServerAddress(config.Host), + semconv.ServerPort(int(config.Port)), + semconv.UserName(config.User), + semconv.DBNamespace(config.Database), ), } } @@ -141,13 +220,15 @@ func connectionAttributesFromConfig(config *pgx.ConnConfig) []trace.SpanStartOpt // TraceQueryStart is called at the beginning of Query, QueryRow, and Exec calls. // The returned context is used for the rest of the call and will be passed to TraceQueryEnd. func (t *Tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), + trace.WithAttributes(t.tracerAttrs...), } if conn != nil { @@ -155,7 +236,11 @@ func (t *Tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.T } if t.logSQLStatement { - opts = append(opts, trace.WithAttributes(semconv.DBStatement(data.SQL))) + opts = append(opts, trace.WithAttributes( + semconv.DBQueryText(data.SQL), + semconv.DBOperationName(t.sqlOperationName(data.SQL)), + )) + if t.includeParams { opts = append(opts, trace.WithAttributes(makeParamsAttribute(data.Args))) } @@ -165,6 +250,7 @@ func (t *Tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.T if t.trimQuerySpanName { spanName = t.sqlOperationName(data.SQL) } + if t.prefixQuerySpanName { spanName = "query " + spanName } @@ -177,27 +263,32 @@ func (t *Tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.T // TraceQueryEnd is called at the end of Query, QueryRow, and Exec calls. func (t *Tracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQueryEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationQuery) if data.Err == nil { span.SetAttributes(RowsAffectedKey.Int64(data.CommandTag.RowsAffected())) } span.End() + + t.recordOperationDuration(ctx, pgxOperationQuery) } // TraceCopyFromStart is called at the beginning of CopyFrom calls. The // returned context is used for the rest of the call and will be passed to // TraceCopyFromEnd. func (t *Tracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceCopyFromStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), - trace.WithAttributes(semconv.DBSQLTable(data.TableName.Sanitize())), + trace.WithAttributes(t.tracerAttrs...), + trace.WithAttributes(semconv.DBCollectionName(data.TableName.Sanitize())), } if conn != nil { @@ -212,19 +303,24 @@ func (t *Tracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data pg // TraceCopyFromEnd is called at the end of CopyFrom calls. func (t *Tracer) TraceCopyFromEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceCopyFromEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationCopy) if data.Err == nil { span.SetAttributes(RowsAffectedKey.Int64(data.CommandTag.RowsAffected())) } span.End() + + t.recordOperationDuration(ctx, pgxOperationCopy) } // TraceBatchStart is called at the beginning of SendBatch calls. The returned // context is used for the rest of the call and will be passed to // TraceBatchQuery and TraceBatchEnd. func (t *Tracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } @@ -236,8 +332,8 @@ func (t *Tracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pgx.T opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), - trace.WithAttributes(BatchSizeKey.Int(size)), + trace.WithAttributes(t.tracerAttrs...), + trace.WithAttributes(semconv.DBOperationBatchSize(size)), } if conn != nil { @@ -251,13 +347,15 @@ func (t *Tracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pgx.T // TraceBatchQuery is called at the after each query in a batch. func (t *Tracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchQueryData) { + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationBatch) + if !trace.SpanFromContext(ctx).IsRecording() { return } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), + trace.WithAttributes(t.tracerAttrs...), } if conn != nil { @@ -265,11 +363,14 @@ func (t *Tracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.T } if t.logSQLStatement { - opts = append(opts, trace.WithAttributes(semconv.DBStatement(data.SQL))) + opts = append(opts, trace.WithAttributes( + semconv.DBQueryText(data.SQL), + semconv.DBOperationName(t.sqlOperationName(data.SQL)), + )) + if t.includeParams { opts = append(opts, trace.WithAttributes(makeParamsAttribute(data.Args))) } - } var spanName string @@ -286,7 +387,7 @@ func (t *Tracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.T } _, span := t.tracer.Start(ctx, spanName, opts...) - recordError(span, data.Err) + recordSpanError(span, data.Err) span.End() } @@ -294,22 +395,27 @@ func (t *Tracer) TraceBatchQuery(ctx context.Context, conn *pgx.Conn, data pgx.T // TraceBatchEnd is called at the end of SendBatch calls. func (t *Tracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceBatchEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationBatch) span.End() + + t.recordOperationDuration(ctx, pgxOperationBatch) } // TraceConnectStart is called at the beginning of Connect and ConnectConfig // calls. The returned context is used for the rest of the call and will be // passed to TraceConnectEnd. func (t *Tracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), + trace.WithAttributes(t.tracerAttrs...), } if data.ConnConfig != nil { @@ -324,22 +430,27 @@ func (t *Tracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectSta // TraceConnectEnd is called at the end of Connect and ConnectConfig calls. func (t *Tracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationConnect) span.End() + + t.recordOperationDuration(ctx, pgxOperationConnect) } // TracePrepareStart is called at the beginning of Prepare calls. The returned // context is used for the rest of the call and will be passed to // TracePrepareEnd. func (t *Tracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), + trace.WithAttributes(t.tracerAttrs...), } if data.Name != "" { @@ -350,8 +461,10 @@ func (t *Tracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx opts = append(opts, connectionAttributesFromConfig(conn.Config())...) } + opts = append(opts, trace.WithAttributes(semconv.DBOperationName(t.sqlOperationName(data.SQL)))) + if t.logSQLStatement { - opts = append(opts, trace.WithAttributes(semconv.DBStatement(data.SQL))) + opts = append(opts, trace.WithAttributes(semconv.DBQueryText(data.SQL))) } spanName := data.SQL @@ -370,21 +483,26 @@ func (t *Tracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx // TracePrepareEnd is called at the end of Prepare calls. func (t *Tracer) TracePrepareEnd(ctx context.Context, _ *pgx.Conn, data pgx.TracePrepareEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationPrepare) span.End() + + t.recordOperationDuration(ctx, pgxOperationPrepare) } // TraceAcquireStart is called at the beginning of Acquire. // The returned context is used for the rest of the call and will be passed to the TraceAcquireEnd. func (t *Tracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireStartData) context.Context { + ctx = context.WithValue(ctx, startTimeCtxKey, time.Now()) + if !trace.SpanFromContext(ctx).IsRecording() { return ctx } opts := []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(t.attrs...), + trace.WithAttributes(t.tracerAttrs...), } if pool != nil && pool.Config() != nil && pool.Config().ConnConfig != nil { @@ -399,9 +517,12 @@ func (t *Tracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, data // TraceAcquireEnd is called when a connection has been acquired. func (t *Tracer) TraceAcquireEnd(ctx context.Context, _ *pgxpool.Pool, data pgxpool.TraceAcquireEndData) { span := trace.SpanFromContext(ctx) - recordError(span, data.Err) + recordSpanError(span, data.Err) + t.incrementOperationErrorCount(ctx, data.Err, pgxOperationAcquire) span.End() + + t.recordOperationDuration(ctx, pgxOperationAcquire) } func makeParamsAttribute(args []any) attribute.KeyValue { @@ -409,6 +530,7 @@ func makeParamsAttribute(args []any) attribute.KeyValue { for i := range args { ss[i] = fmt.Sprintf("%+v", args[i]) } + return QueryParametersKey.StringSlice(ss) }