Skip to content

Commit

Permalink
[receiver/postgresql] Client Refactor (open-telemetry#13084)
Browse files Browse the repository at this point in the history
Refactors the postgresql client
  • Loading branch information
schmikei authored Aug 9, 2022
1 parent f5c8156 commit f46d7ad
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 334 deletions.
246 changes: 156 additions & 90 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@ package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-co
import (
"context"
"database/sql"
"errors"
"fmt"
"net"
"strings"

"github.com/lib/pq"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.uber.org/multierr"
)

// databaseName is a name that refers to a database so that it can be uniquely referred to later
// i.e. database1
type databaseName string

// tableIdentifier is an identifier that contains both the database and table separated by a "|"
// i.e. database1|table2
type tableIdentifier string

type client interface {
Close() error
getCommitsAndRollbacks(ctx context.Context, databases []string) ([]MetricStat, error)
getBackends(ctx context.Context, databases []string) ([]MetricStat, error)
getDatabaseSize(ctx context.Context, databases []string) ([]MetricStat, error)
getDatabaseTableMetrics(ctx context.Context) ([]MetricStat, error)
getBlocksReadByTable(ctx context.Context) ([]MetricStat, error)
getDatabaseStats(ctx context.Context, databases []string) (map[databaseName]databaseStats, error)
getBackends(ctx context.Context, databases []string) (map[databaseName]int64, error)
getDatabaseSize(ctx context.Context, databases []string) (map[databaseName]int64, error)
getDatabaseTableMetrics(ctx context.Context, db string) (map[tableIdentifier]tableStats, error)
getBlocksReadByTable(ctx context.Context, db string) (map[tableIdentifier]tableIOStats, error)
listDatabases(ctx context.Context) ([]string, error)
}

Expand Down Expand Up @@ -116,31 +123,99 @@ func (c *postgreSQLClient) Close() error {
return c.client.Close()
}

type MetricStat struct {
database string
table string
stats map[string]string
type databaseStats struct {
transactionCommitted int64
transactionRollback int64
}

func (c *postgreSQLClient) getCommitsAndRollbacks(ctx context.Context, databases []string) ([]MetricStat, error) {
func (c *postgreSQLClient) getDatabaseStats(ctx context.Context, databases []string) (map[databaseName]databaseStats, error) {
query := filterQueryByDatabases("SELECT datname, xact_commit, xact_rollback FROM pg_stat_database", databases, false)

return c.collectStatsFromQuery(ctx, query, true, false, "xact_commit", "xact_rollback")
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, err
}
var errs error
dbStats := map[databaseName]databaseStats{}
for rows.Next() {
var datname string
var transactionCommitted, transactionRollback int64
err = rows.Scan(&datname, &transactionCommitted, &transactionRollback)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
if datname != "" {
dbStats[databaseName(datname)] = databaseStats{
transactionCommitted: transactionCommitted,
transactionRollback: transactionRollback,
}
}
}
return dbStats, errs
}

func (c *postgreSQLClient) getBackends(ctx context.Context, databases []string) ([]MetricStat, error) {
// getBackends returns a map of database names to the number of active connections
func (c *postgreSQLClient) getBackends(ctx context.Context, databases []string) (map[databaseName]int64, error) {
query := filterQueryByDatabases("SELECT datname, count(*) as count from pg_stat_activity", databases, true)

return c.collectStatsFromQuery(ctx, query, true, false, "count")
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
ars := map[databaseName]int64{}
var errors error
for rows.Next() {
var datname string
var count int64
err = rows.Scan(&datname, &count)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
if datname != "" {
ars[databaseName(datname)] = count
}
}
return ars, errors
}

func (c *postgreSQLClient) getDatabaseSize(ctx context.Context, databases []string) ([]MetricStat, error) {
func (c *postgreSQLClient) getDatabaseSize(ctx context.Context, databases []string) (map[databaseName]int64, error) {
query := filterQueryByDatabases("SELECT datname, pg_database_size(datname) FROM pg_catalog.pg_database WHERE datistemplate = false", databases, false)
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
sizes := map[databaseName]int64{}
var errors error
for rows.Next() {
var datname string
var size int64
err = rows.Scan(&datname, &size)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
if datname != "" {
sizes[databaseName(datname)] = size
}
}
return sizes, errors
}

return c.collectStatsFromQuery(ctx, query, true, false, "db_size")
// tableStats contains a result for a row of the getDatabaseTableMetrics result
type tableStats struct {
database string
table string
live int64
dead int64
inserts int64
upd int64
del int64
hotUpd int64
}

func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context) ([]MetricStat, error) {
func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context, db string) (map[tableIdentifier]tableStats, error) {
query := `SELECT schemaname || '.' || relname AS table,
n_live_tup AS live,
n_dead_tup AS dead,
Expand All @@ -150,10 +225,47 @@ func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context) ([]Metri
n_tup_hot_upd AS hot_upd
FROM pg_stat_user_tables;`

return c.collectStatsFromQuery(ctx, query, false, true, "live", "dead", "ins", "upd", "del", "hot_upd")
ts := map[tableIdentifier]tableStats{}
var errors error
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, err
}
for rows.Next() {
var table string
var live, dead, ins, upd, del, hotUpd int64
err = rows.Scan(&table, &live, &dead, &ins, &upd, &del, &hotUpd)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
ts[tableKey(db, table)] = tableStats{
database: db,
table: table,
live: live,
inserts: ins,
upd: upd,
del: del,
hotUpd: hotUpd,
}
}
return ts, errors
}

type tableIOStats struct {
database string
table string
heapRead int64
heapHit int64
idxRead int64
idxHit int64
toastRead int64
toastHit int64
tidxRead int64
tidxHit int64
}

func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context) ([]MetricStat, error) {
func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context, db string) (map[tableIdentifier]tableIOStats, error) {
query := `SELECT schemaname || '.' || relname AS table,
coalesce(heap_blks_read, 0) AS heap_read,
coalesce(heap_blks_hit, 0) AS heap_hit,
Expand All @@ -165,77 +277,34 @@ func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context) ([]MetricSt
coalesce(tidx_blks_hit, 0) AS tidx_hit
FROM pg_statio_user_tables;`

return c.collectStatsFromQuery(ctx, query, false, true, "heap_read", "heap_hit", "idx_read", "idx_hit", "toast_read", "toast_hit", "tidx_read", "tidx_hit")
}

func (c *postgreSQLClient) collectStatsFromQuery(ctx context.Context, query string, includeDatabase bool, includeTable bool, orderedFields ...string) ([]MetricStat, error) {
tios := map[tableIdentifier]tableIOStats{}
var errors error
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()

errors := scrapererror.ScrapeErrors{}
metricStats := []MetricStat{}
for rows.Next() {
rowFields := make([]interface{}, 0)

// Build a list of addresses that rows.Scan will load column data into
if includeDatabase {
var val string
rowFields = append(rowFields, &val)
}
if includeTable {
var val string
rowFields = append(rowFields, &val)
}
for range orderedFields {
var val string
rowFields = append(rowFields, &val)
}

if err := rows.Scan(rowFields...); err != nil {
return nil, err
}

database := c.database
if includeDatabase {
v, err := convertInterfaceToString(rowFields[0])
if err != nil {
errors.AddPartial(0, err)
continue
}
database = v
rowFields = rowFields[1:]
}
table := ""
if includeTable {
v, err := convertInterfaceToString(rowFields[0])
if err != nil {
errors.AddPartial(0, err)
continue
}
table = v
rowFields = rowFields[1:]
var table string
var heapRead, heapHit, idxRead, idxHit, toastRead, toastHit, tidxRead, tidxHit int64
err = rows.Scan(&table, &heapRead, &heapHit, &idxRead, &idxHit, &toastRead, &toastHit, &tidxRead, &tidxHit)
if err != nil {
errors = multierr.Append(errors, err)
continue
}

stats := map[string]string{}
for idx, val := range rowFields {
v, err := convertInterfaceToString(val)
if err != nil {
errors.AddPartial(0, err)
continue
}
stats[orderedFields[idx]] = v
tios[tableKey(db, table)] = tableIOStats{
database: db,
table: table,
heapRead: heapRead,
heapHit: heapHit,
idxRead: idxRead,
idxHit: idxHit,
toastRead: toastRead,
toastHit: toastHit,
tidxRead: tidxRead,
tidxHit: tidxHit,
}

metricStats = append(metricStats, MetricStat{
database: database,
table: table,
stats: stats,
})
}
return metricStats, errors.Combine()
return tios, errors
}

func (c *postgreSQLClient) listDatabases(ctx context.Context) ([]string, error) {
Expand Down Expand Up @@ -278,9 +347,6 @@ func filterQueryByDatabases(baseQuery string, databases []string, groupBy bool)
return baseQuery + ";"
}

func convertInterfaceToString(input interface{}) (string, error) {
if val, ok := input.(*string); ok {
return *val, nil
}
return "", errors.New("issue converting interface into string")
func tableKey(database, table string) tableIdentifier {
return tableIdentifier(fmt.Sprintf("%s|%s", database, table))
}
Binary file removed receiver/postgresqlreceiver/debug.test
Binary file not shown.
Loading

0 comments on commit f46d7ad

Please sign in to comment.