Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database_observability: log instance key across collectors #2408

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Main (unreleased)

- Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr)

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

v1.6.0-rc.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const selectQuerySamples = `

type QuerySampleArguments struct {
DB *sql.DB
InstanceKey string
CollectInterval time.Duration
EntryHandler loki.EntryHandler

Expand All @@ -43,6 +44,7 @@ type QuerySampleArguments struct {

type QuerySample struct {
dbConnection *sql.DB
instanceKey string
collectInterval time.Duration
entryHandler loki.EntryHandler

Expand All @@ -55,6 +57,7 @@ type QuerySample struct {
func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
return &QuerySample{
dbConnection: args.DB,
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: log.With(args.Logger, "collector", "QuerySample"),
Expand Down Expand Up @@ -153,8 +156,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="query samples fetched" op="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
`level=info msg="query samples fetched" op="%s" instance="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, c.instanceKey, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
),
},
}
Expand All @@ -166,8 +169,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, digest, table,
`level=info msg="table name parsed" op="%s" instance="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, digest, table,
),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -47,8 +47,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -60,8 +60,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -73,8 +73,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -86,9 +86,9 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="other_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="other_table"`,
},
},
{
Expand All @@ -103,12 +103,12 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` +
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` +
`query_sample_redacted="select ifnull(schema_name, :redacted1) as schema_name, digest, count_star from (select * from ` +
`performance_schema.events_statements_summary_by_digest where schema_name not in ::redacted2 ` +
`and last_seen > date_sub(now(), interval :redacted3 second) order by last_seen desc) as q ` +
`group by q.schema_name, q.digest, q.count_star limit :redacted4"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`,
},
},
{
Expand All @@ -125,8 +125,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
{
Expand All @@ -138,7 +138,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`,
},
},
{
Expand All @@ -150,7 +150,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`,
},
},
{
Expand All @@ -162,7 +162,7 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`,
},
},
{
Expand All @@ -179,8 +179,8 @@ func TestQuerySample(t *testing.T) {
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`,
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`,
},
},
}
Expand All @@ -195,6 +195,7 @@ func TestQuerySample(t *testing.T) {

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Minute,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Expand Down Expand Up @@ -282,6 +283,7 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) {

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Millisecond,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
Expand Down Expand Up @@ -328,8 +330,8 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) {
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}

require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, lokiEntries[1].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (

type SchemaTableArguments struct {
DB *sql.DB
InstanceKey string
CollectInterval time.Duration
EntryHandler loki.EntryHandler
CacheTTL time.Duration
Expand All @@ -58,6 +59,7 @@ type SchemaTableArguments struct {

type SchemaTable struct {
dbConnection *sql.DB
instanceKey string
collectInterval time.Duration
entryHandler loki.EntryHandler
// Cache of table definitions. Entries are removed after a configurable TTL.
Expand All @@ -84,6 +86,7 @@ type tableInfo struct {
func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
return &SchemaTable{
dbConnection: args.DB,
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
Expand Down Expand Up @@ -166,7 +169,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema),
Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" instance="%s" schema="%s"`, OP_SCHEMA_DETECTION, c.instanceKey, schema),
},
}
}
Expand Down Expand Up @@ -204,7 +207,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table),
Line: fmt.Sprintf(`level=info msg="table detected" op="%s" instance="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, c.instanceKey, schema, table),
},
}
}
Expand Down Expand Up @@ -240,7 +243,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, createStmt),
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestSchemaTable(t *testing.T) {

collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Second,
EntryHandler: lokiClient,
CacheTTL: time.Minute,
Expand Down Expand Up @@ -79,9 +80,9 @@ func TestSchemaTable(t *testing.T) {
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
Expand Down
20 changes: 16 additions & 4 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Component struct {
registry *prometheus.Registry
baseTarget discovery.Target
collectors []Collector
instanceKey string
dbConnection *sql.DB
healthErr *atomic.String
}
Expand All @@ -115,6 +116,12 @@ func New(opts component.Options, args Arguments) (*Component, error) {
healthErr: atomic.NewString(""),
}

instance, err := instanceKey(string(args.DataSourceName))
if err != nil {
return nil, err
}
c.instanceKey = instance

baseTarget, err := c.getBaseTarget()
if err != nil {
return nil, err
Expand Down Expand Up @@ -166,7 +173,7 @@ func (c *Component) getBaseTarget() (discovery.Target, error) {
model.AddressLabel: httpData.MemoryListenAddr,
model.SchemeLabel: "http",
model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"),
"instance": c.instanceKey(),
"instance": c.instanceKey,
"job": database_observability.JobName,
}, nil
}
Expand Down Expand Up @@ -218,6 +225,7 @@ func (c *Component) startCollectors() error {
if c.args.QuerySamplesEnabled {
qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{
DB: dbConnection,
InstanceKey: c.instanceKey,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
Expand All @@ -235,6 +243,7 @@ func (c *Component) startCollectors() error {

stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{
DB: dbConnection,
InstanceKey: c.instanceKey,
CollectInterval: c.args.CollectInterval,
EntryHandler: entryHandler,
Logger: c.opts.Logger,
Expand Down Expand Up @@ -306,8 +315,11 @@ func (c *Component) CurrentHealth() component.Health {

// instanceKey returns network(hostname:port)/dbname of the MySQL server.
// This is the same key as used by the mysqld_exporter integration.
func (c *Component) instanceKey() string {
m, _ := mysql.ParseDSN(string(c.args.DataSourceName))
func instanceKey(dsn string) (string, error) {
m, err := mysql.ParseDSN(dsn)
if err != nil {
return "", err
}

if m.Addr == "" {
m.Addr = "localhost:3306"
Expand All @@ -316,7 +328,7 @@ func (c *Component) instanceKey() string {
m.Net = "tcp"
}

return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName)
return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName), nil
}

// formatDSN appends the given parameters to the DSN.
Expand Down
Loading