Skip to content

Commit

Permalink
internalstorage: add trace
Browse files Browse the repository at this point in the history
Signed-off-by: scyda <[email protected]>
  • Loading branch information
scydas committed Dec 20, 2024
1 parent f730e26 commit 384fc0a
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 5 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/atomic v1.10.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/datatypes v1.0.7
Expand Down Expand Up @@ -126,12 +128,10 @@ require (
go.etcd.io/etcd/client/v3 v3.5.14 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/internalstorage/collectionresource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"net/url"
"strconv"
"strings"
"time"

"go.opentelemetry.io/otel/attribute"
"gorm.io/gorm"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/component-base/tracing"

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
Expand Down Expand Up @@ -98,6 +101,9 @@ func (s *CollectionResourceStorage) query(ctx context.Context, opts *internal.Li
}

func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.ListOptions) (*internal.CollectionResource, error) {
ctx, span := tracing.Start(ctx, "GetCollectionResource from internalstorage")
defer span.End(500 * time.Millisecond)

query, list, err := s.query(ctx, opts)
if err != nil {
return nil, err
Expand All @@ -117,6 +123,7 @@ func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.List
Items: make([]runtime.Object, 0, len(items)),
}

span.AddEvent("About to convert objects", attribute.Int("count", len(items)))
gvrs := make(map[schema.GroupVersionResource]struct{})
for _, resource := range items {
obj, err := resource.ConvertToUnstructured()
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
return nil, err
}

if err := db.Use(NewGormMetrics(MetricsConfig{DBName: cfg.Database})); err != nil {
if !cfg.Metrics.Disable {

Check failure on line 85 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

cfg.Metrics undefined (type *Config has no field or method Metrics)

Check failure on line 85 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

cfg.Metrics undefined (type *Config has no field or method Metrics)

Check failure on line 85 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Build

cfg.Metrics undefined (type *Config has no field or method Metrics)
if err := db.Use(NewGormMetrics(cfg.Database, cfg.Metrics.DBStatsRefreshInterval)); err != nil {

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

too many arguments in call to NewGormMetrics

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

cfg.Metrics undefined (type *Config has no field or method Metrics) (typecheck)

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

too many arguments in call to NewGormMetrics

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Lint with golangci-lint

cfg.Metrics undefined (type *Config has no field or method Metrics)) (typecheck)

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Build

too many arguments in call to NewGormMetrics

Check failure on line 86 in pkg/storage/internalstorage/register.go

View workflow job for this annotation

GitHub Actions / Build

cfg.Metrics undefined (type *Config has no field or method Metrics)
return nil, err
}
}

if err := db.Use(NewGormTrace(false)); err != nil {
return nil, err
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"fmt"
"reflect"
"strconv"
"time"

"go.opentelemetry.io/otel/attribute"
"gorm.io/datatypes"
"gorm.io/gorm"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -21,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
genericstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
Expand Down Expand Up @@ -172,11 +175,17 @@ func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namesp
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
ctx, span := tracing.Start(ctx, "Get from internalstorage",
attribute.String("storage resource", s.config.StorageResource.String()),
attribute.String("target type", fmt.Sprintf("%T", into)),
)

var objects [][]byte
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
}

span.AddEvent("About to decode object")
obj, _, err := s.config.Codec.Decode(objects[0], nil, into)
if err != nil {
return err
Expand All @@ -193,12 +202,19 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap())
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
db := s.db.WithContext(ctx)
query := db.Model(&Resource{}).Where(s.gvrKeyMap())
offset, amount, query, err := applyListOptionsToResourceQuery(db, query, opts)
return offset, amount, query, result, err
}

func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error {
ctx, span := tracing.Start(ctx, "List from internalstorage",
attribute.String("storage resource", s.config.StorageResource.String()),
attribute.String("target type", fmt.Sprintf("%T", listObject)),
)
defer span.End(500 * time.Millisecond)

offset, amount, query, result, err := s.genListObjectsQuery(ctx, opts)
if err != nil {
return err
Expand Down Expand Up @@ -231,6 +247,8 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
return nil
}

span.AddEvent("About to convert objects", attribute.Int("count", len(objects)))

if unstructuredList, ok := listObject.(*unstructured.UnstructuredList); ok {
unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects))
for _, object := range objects {
Expand Down
176 changes: 176 additions & 0 deletions pkg/storage/internalstorage/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package internalstorage

/*
Ref from https://github.com/go-gorm/opentelemetry/tree/master/[email protected]
*/

import (
"database/sql"
"database/sql/driver"
"fmt"
"io"
"regexp"
"strings"

"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
)

var (
firstWordRegex = regexp.MustCompile(`^\w+`)
cCommentRegex = regexp.MustCompile(`(?is)/\*.*?\*/`)
lineCommentRegex = regexp.MustCompile(`(?im)(?:--|#).*?$`)
sqlPrefixRegex = regexp.MustCompile(`^[\s;]*`)

dbRowsAffected = attribute.Key("db.rows_affected")
)

type tracePlugin struct {
attrs []attribute.KeyValue
excludeQueryVars bool
}

func NewGormTrace(excludeQueryVars bool) gorm.Plugin {
return &tracePlugin{
excludeQueryVars: excludeQueryVars,
}
}

func (p tracePlugin) Name() string {
return "gorm:oteltracing"
}

type gormHookFunc func(tx *gorm.DB)

type gormRegister interface {
Register(name string, fn func(*gorm.DB)) error
}

func (p tracePlugin) Initialize(db *gorm.DB) (err error) {
cb := db.Callback()
hooks := []struct {
callback gormRegister
hook gormHookFunc
name string
}{
{cb.Query().Before("gorm:query"), p.before("gorm.Query"), "before:select"},
{cb.Query().After("gorm:query"), p.after("gorm.Query"), "after:select"},

/*
{cb.Create().Before("gorm:create"), p.before("gorm.Create"), "before:create"},
{cb.Create().After("gorm:create"), p.after("gorm.Create"), "after:create"},
{cb.Delete().Before("gorm:delete"), p.before("gorm.Delete"), "before:delete"},
{cb.Delete().After("gorm:delete"), p.after("gorm.Delete"), "after:delete"},
{cb.Update().Before("gorm:update"), p.before("gorm.Update"), "before:update"},
{cb.Update().After("gorm:update"), p.after("gorm.Update"), "after:update"},
{cb.Row().Before("gorm:row"), p.before("gorm.Row"), "before:row"},
{cb.Row().After("gorm:row"), p.after("gorm.Row"), "after:row"},
{cb.Raw().Before("gorm:raw"), p.before("gorm.Raw"), "before:raw"},
{cb.Raw().After("gorm:raw"), p.after("gorm.Raw"), "after:raw"},
*/
}

var firstErr error
for _, h := range hooks {
if err := h.callback.Register("otel:"+h.name, h.hook); err != nil && firstErr == nil {
firstErr = fmt.Errorf("callback register %s failed: %w", h.name, err)
}
}

return firstErr
}

func (p *tracePlugin) before(operate string) gormHookFunc {
return func(tx *gorm.DB) {
span := trace.SpanFromContext(tx.Statement.Context)
if !span.IsRecording() {
return
}
span.AddEvent("About " + operate)
}
}

func (p *tracePlugin) after(operate string) gormHookFunc {
return func(tx *gorm.DB) {
span := trace.SpanFromContext(tx.Statement.Context)
if !span.IsRecording() {
return
}

attrs := make([]attribute.KeyValue, 0, len(p.attrs)+4)
attrs = append(attrs, p.attrs...)

if sys := dbSystem(tx); sys.Valid() {
attrs = append(attrs, sys)
}

vars := tx.Statement.Vars

var query string
if p.excludeQueryVars {
query = tx.Statement.SQL.String()
} else {
query = tx.Dialector.Explain(tx.Statement.SQL.String(), vars...)
}

formatQuery := p.formatQuery(query)
attrs = append(attrs, semconv.DBStatementKey.String(formatQuery))
attrs = append(attrs, semconv.DBOperationKey.String(dbOperation(formatQuery)))
if tx.Statement.Table != "" {
attrs = append(attrs, semconv.DBSQLTableKey.String(tx.Statement.Table))
}
if tx.Statement.RowsAffected != -1 {
attrs = append(attrs, dbRowsAffected.Int64(tx.Statement.RowsAffected))
}

span.AddEvent(fmt.Sprintf("%s succeeded", operate), trace.WithAttributes(attrs...))
switch tx.Error {
case nil,
gorm.ErrRecordNotFound,
driver.ErrSkip,
io.EOF, // end of rows iterator
sql.ErrNoRows:
// ignore
default:
span.RecordError(tx.Error)
}
}
}

func (p *tracePlugin) formatQuery(query string) string {
return query
}

func dbSystem(tx *gorm.DB) attribute.KeyValue {
switch tx.Dialector.Name() {
case "mysql":
return semconv.DBSystemMySQL
case "mssql":
return semconv.DBSystemMSSQL
case "postgres", "postgresql":
return semconv.DBSystemPostgreSQL
case "sqlite":
return semconv.DBSystemSqlite
case "sqlserver":
return semconv.DBSystemKey.String("sqlserver")
case "clickhouse":
return semconv.DBSystemKey.String("clickhouse")
case "spanner":
return semconv.DBSystemKey.String("spanner")
default:
return attribute.KeyValue{}
}
}

func dbOperation(query string) string {
s := cCommentRegex.ReplaceAllString(query, "")
s = lineCommentRegex.ReplaceAllString(s, "")
s = sqlPrefixRegex.ReplaceAllString(s, "")
return strings.ToLower(firstWordRegex.FindString(s))
}

0 comments on commit 384fc0a

Please sign in to comment.