Skip to content

Commit

Permalink
database_observability: log instance key across collectors
Browse files Browse the repository at this point in the history
Add instance label to SchemaTable and QuerySample collectors.
  • Loading branch information
cristiangreco committed Jan 15, 2025
1 parent 9661f51 commit 37cc19d
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Main (unreleased)

- Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr)

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)

v1.6.0-rc.1
-----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const selectQuerySamples = `

type QuerySampleArguments struct {
DB *sql.DB
InstanceKey string
CollectInterval time.Duration
EntryHandler loki.EntryHandler

Expand All @@ -43,6 +44,7 @@ type QuerySampleArguments struct {

type QuerySample struct {
dbConnection *sql.DB
instanceKey string
collectInterval time.Duration
entryHandler loki.EntryHandler

Expand All @@ -55,6 +57,7 @@ type QuerySample struct {
func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
return &QuerySample{
dbConnection: args.DB,
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: log.With(args.Logger, "collector", "QuerySample"),
Expand Down Expand Up @@ -153,8 +156,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="query samples fetched" op="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
`level=info msg="query samples fetched" op="%s" instance="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, c.instanceKey, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
),
},
}
Expand All @@ -166,8 +169,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, digest, table,
`level=info msg="table name parsed" op="%s" instance="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, digest, table,
),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -47,8 +47,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -60,8 +60,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -73,8 +73,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -86,9 +86,9 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="other_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="other_table"`,
},
},
{
Expand All @@ -103,12 +103,12 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` +
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` +
`query_sample_redacted="select ifnull(schema_name, :redacted1) as schema_name, digest, count_star from (select * from ` +
`performance_schema.events_statements_summary_by_digest where schema_name not in ::redacted2 ` +
`and last_seen > date_sub(now(), interval :redacted3 second) order by last_seen desc) as q ` +
`group by q.schema_name, q.digest, q.count_star limit :redacted4"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`,
},
},
{
Expand All @@ -125,8 +125,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -138,7 +138,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`,
},
},
{
Expand All @@ -150,7 +150,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`,
},
},
{
Expand All @@ -162,7 +162,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`,
},
},
{
Expand All @@ -179,8 +179,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
}
Expand All @@ -195,6 +195,7 @@ func TestQuerySample(t *testing.T) {

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Minute,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Expand Down Expand Up @@ -282,6 +283,7 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) {

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Millisecond,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Expand Down Expand Up @@ -328,8 +330,8 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) {
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}

require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, lokiEntries[1].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (

type SchemaTableArguments struct {
DB *sql.DB
InstanceKey string
CollectInterval time.Duration
EntryHandler loki.EntryHandler
CacheTTL time.Duration
Expand All @@ -58,6 +59,7 @@ type SchemaTableArguments struct {

type SchemaTable struct {
dbConnection *sql.DB
instanceKey string
collectInterval time.Duration
entryHandler loki.EntryHandler
// Cache of table definitions. Entries are removed after a configurable TTL.
Expand All @@ -84,6 +86,7 @@ type tableInfo struct {
func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
return &SchemaTable{
dbConnection: args.DB,
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
Expand Down Expand Up @@ -166,7 +169,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema),
Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" instance="%s" schema="%s"`, OP_SCHEMA_DETECTION, c.instanceKey, schema),
},
}
}
Expand Down Expand Up @@ -204,7 +207,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table),
Line: fmt.Sprintf(`level=info msg="table detected" op="%s" instance="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, c.instanceKey, schema, table),
},
}
}
Expand Down Expand Up @@ -240,7 +243,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, createStmt),
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestSchemaTable(t *testing.T) {

collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Second,
EntryHandler: lokiClient,
CacheTTL: time.Minute,
Expand Down Expand Up @@ -79,9 +80,9 @@ func TestSchemaTable(t *testing.T) {
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
Expand Down
20 changes: 16 additions & 4 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Component struct {
registry *prometheus.Registry
baseTarget discovery.Target
collectors []Collector
instanceKey string
dbConnection *sql.DB
healthErr *atomic.String
}
Expand All @@ -115,6 +116,12 @@ func New(opts component.Options, args Arguments) (*Component, error) {
healthErr: atomic.NewString(""),
}

instance, err := instanceKey(string(args.DataSourceName))
if err != nil {
return nil, err
}
c.instanceKey = instance

baseTarget, err := c.getBaseTarget()
if err != nil {
return nil, err
Expand Down Expand Up @@ -166,7 +173,7 @@ func (c *Component) getBaseTarget() (discovery.Target, error) {
model.AddressLabel: httpData.MemoryListenAddr,
model.SchemeLabel: "http",
model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"),
"instance": c.instanceKey(),
"instance": c.instanceKey,
"job": database_observability.JobName,
}, nil
}
Expand Down Expand Up @@ -218,6 +225,7 @@ func (c *Component) startCollectors() error {
if c.args.QuerySamplesEnabled {
qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{
DB: dbConnection,
InstanceKey: c.instanceKey,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
Expand All @@ -235,6 +243,7 @@ func (c *Component) startCollectors() error {

stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{
DB: dbConnection,
InstanceKey: c.instanceKey,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
Expand Down Expand Up @@ -306,8 +315,11 @@ func (c *Component) CurrentHealth() component.Health {

// instanceKey returns network(hostname:port)/dbname of the MySQL server.
// This is the same key as used by the mysqld_exporter integration.
func (c *Component) instanceKey() string {
m, _ := mysql.ParseDSN(string(c.args.DataSourceName))
func instanceKey(dsn string) (string, error) {
m, err := mysql.ParseDSN(dsn)
if err != nil {
return "", err
}

if m.Addr == "" {
m.Addr = "localhost:3306"
Expand All @@ -316,7 +328,7 @@ func (c *Component) instanceKey() string {
m.Net = "tcp"
}

return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName)
return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName), nil
}

// formatDSN appends the given parameters to the DSN.
Expand Down

0 comments on commit 37cc19d

Please sign in to comment.