diff --git a/go.mod b/go.mod index e70331b26..b8cbc2dec 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,8 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/atomic v1.10.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gorm.io/datatypes v1.0.7 @@ -126,12 +128,10 @@ require ( go.etcd.io/etcd/client/v3 v3.5.14 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/pkg/storage/internalstorage/collectionresource_storage.go b/pkg/storage/internalstorage/collectionresource_storage.go index e922c855e..6a5bd9c29 100644 --- a/pkg/storage/internalstorage/collectionresource_storage.go +++ b/pkg/storage/internalstorage/collectionresource_storage.go @@ -6,11 +6,14 @@ import ( "net/url" "strconv" "strings" + "time" + "go.opentelemetry.io/otel/attribute" "gorm.io/gorm" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/component-base/tracing" internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" @@ -98,6 +101,9 @@ func (s *CollectionResourceStorage) query(ctx context.Context, opts *internal.Li } func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.ListOptions) (*internal.CollectionResource, error) { + ctx, span := tracing.Start(ctx, "GetCollectionResource from internalstorage") + defer span.End(500 * time.Millisecond) + query, list, err := s.query(ctx, opts) if err != nil { return nil, err @@ -117,6 +123,7 @@ func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.List Items: make([]runtime.Object, 0, len(items)), } + span.AddEvent("About to convert objects", attribute.Int("count", len(items))) gvrs := make(map[schema.GroupVersionResource]struct{}) for _, resource := range items { obj, err := resource.ConvertToUnstructured() diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index 2dbedffdb..79b50b61f 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -82,7 +82,13 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) { return nil, err } - if err := db.Use(NewGormMetrics(MetricsConfig{DBName: cfg.Database})); err != nil { + if !cfg.Metrics.Disable { + if err := db.Use(NewGormMetrics(cfg.Database, cfg.Metrics.DBStatsRefreshInterval)); err != nil { + return nil, err + } + } + + if err := db.Use(NewGormTrace(false)); err != nil { return nil, err } diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index b59e18d48..27515e33b 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -7,7 +7,9 @@ import ( "fmt" "reflect" "strconv" + "time" + "go.opentelemetry.io/otel/attribute" "gorm.io/datatypes" "gorm.io/gorm" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -21,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/watch" genericstorage "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" + "k8s.io/component-base/tracing" internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" @@ -172,11 +175,17 @@ func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namesp } func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { + ctx, span := tracing.Start(ctx, "Get from internalstorage", + attribute.String("storage resource", s.config.StorageResource.String()), + attribute.String("target type", fmt.Sprintf("%T", into)), + ) + var objects [][]byte if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil { return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error) } + span.AddEvent("About to decode object") obj, _, err := s.config.Codec.Decode(objects[0], nil, into) if err != nil { return err @@ -193,12 +202,19 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna result = &ResourceMetadataList{} } - query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap()) - offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts) + db := s.db.WithContext(ctx) + query := db.Model(&Resource{}).Where(s.gvrKeyMap()) + offset, amount, query, err := applyListOptionsToResourceQuery(db, query, opts) return offset, amount, query, result, err } func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { + ctx, span := tracing.Start(ctx, "List from internalstorage", + attribute.String("storage resource", s.config.StorageResource.String()), + attribute.String("target type", fmt.Sprintf("%T", listObject)), + ) + defer span.End(500 * time.Millisecond) + offset, amount, query, result, err := s.genListObjectsQuery(ctx, opts) if err != nil { return err @@ -231,6 +247,8 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o return nil } + span.AddEvent("About to convert objects", attribute.Int("count", len(objects))) + if unstructuredList, ok := listObject.(*unstructured.UnstructuredList); ok { unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects)) for _, object := range objects { diff --git a/pkg/storage/internalstorage/tracing.go b/pkg/storage/internalstorage/tracing.go new file mode 100644 index 000000000..967332307 --- /dev/null +++ b/pkg/storage/internalstorage/tracing.go @@ -0,0 +1,176 @@ +package internalstorage + +/* + Ref from https://github.com/go-gorm/opentelemetry/tree/master/tracing@0.1.11 +*/ + +import ( + "database/sql" + "database/sql/driver" + "fmt" + "io" + "regexp" + "strings" + + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" +) + +var ( + firstWordRegex = regexp.MustCompile(`^\w+`) + cCommentRegex = regexp.MustCompile(`(?is)/\*.*?\*/`) + lineCommentRegex = regexp.MustCompile(`(?im)(?:--|#).*?$`) + sqlPrefixRegex = regexp.MustCompile(`^[\s;]*`) + + dbRowsAffected = attribute.Key("db.rows_affected") +) + +type tracePlugin struct { + attrs []attribute.KeyValue + excludeQueryVars bool +} + +func NewGormTrace(excludeQueryVars bool) gorm.Plugin { + return &tracePlugin{ + excludeQueryVars: excludeQueryVars, + } +} + +func (p tracePlugin) Name() string { + return "gorm:oteltracing" +} + +type gormHookFunc func(tx *gorm.DB) + +type gormRegister interface { + Register(name string, fn func(*gorm.DB)) error +} + +func (p tracePlugin) Initialize(db *gorm.DB) (err error) { + cb := db.Callback() + hooks := []struct { + callback gormRegister + hook gormHookFunc + name string + }{ + {cb.Query().Before("gorm:query"), p.before("gorm.Query"), "before:select"}, + {cb.Query().After("gorm:query"), p.after("gorm.Query"), "after:select"}, + + /* + {cb.Create().Before("gorm:create"), p.before("gorm.Create"), "before:create"}, + {cb.Create().After("gorm:create"), p.after("gorm.Create"), "after:create"}, + + {cb.Delete().Before("gorm:delete"), p.before("gorm.Delete"), "before:delete"}, + {cb.Delete().After("gorm:delete"), p.after("gorm.Delete"), "after:delete"}, + + {cb.Update().Before("gorm:update"), p.before("gorm.Update"), "before:update"}, + {cb.Update().After("gorm:update"), p.after("gorm.Update"), "after:update"}, + + {cb.Row().Before("gorm:row"), p.before("gorm.Row"), "before:row"}, + {cb.Row().After("gorm:row"), p.after("gorm.Row"), "after:row"}, + + {cb.Raw().Before("gorm:raw"), p.before("gorm.Raw"), "before:raw"}, + {cb.Raw().After("gorm:raw"), p.after("gorm.Raw"), "after:raw"}, + */ + } + + var firstErr error + for _, h := range hooks { + if err := h.callback.Register("otel:"+h.name, h.hook); err != nil && firstErr == nil { + firstErr = fmt.Errorf("callback register %s failed: %w", h.name, err) + } + } + + return firstErr +} + +func (p *tracePlugin) before(operate string) gormHookFunc { + return func(tx *gorm.DB) { + span := trace.SpanFromContext(tx.Statement.Context) + if !span.IsRecording() { + return + } + span.AddEvent("About " + operate) + } +} + +func (p *tracePlugin) after(operate string) gormHookFunc { + return func(tx *gorm.DB) { + span := trace.SpanFromContext(tx.Statement.Context) + if !span.IsRecording() { + return + } + + attrs := make([]attribute.KeyValue, 0, len(p.attrs)+4) + attrs = append(attrs, p.attrs...) + + if sys := dbSystem(tx); sys.Valid() { + attrs = append(attrs, sys) + } + + vars := tx.Statement.Vars + + var query string + if p.excludeQueryVars { + query = tx.Statement.SQL.String() + } else { + query = tx.Dialector.Explain(tx.Statement.SQL.String(), vars...) + } + + formatQuery := p.formatQuery(query) + attrs = append(attrs, semconv.DBStatementKey.String(formatQuery)) + attrs = append(attrs, semconv.DBOperationKey.String(dbOperation(formatQuery))) + if tx.Statement.Table != "" { + attrs = append(attrs, semconv.DBSQLTableKey.String(tx.Statement.Table)) + } + if tx.Statement.RowsAffected != -1 { + attrs = append(attrs, dbRowsAffected.Int64(tx.Statement.RowsAffected)) + } + + span.AddEvent(fmt.Sprintf("%s succeeded", operate), trace.WithAttributes(attrs...)) + switch tx.Error { + case nil, + gorm.ErrRecordNotFound, + driver.ErrSkip, + io.EOF, // end of rows iterator + sql.ErrNoRows: + // ignore + default: + span.RecordError(tx.Error) + } + } +} + +func (p *tracePlugin) formatQuery(query string) string { + return query +} + +func dbSystem(tx *gorm.DB) attribute.KeyValue { + switch tx.Dialector.Name() { + case "mysql": + return semconv.DBSystemMySQL + case "mssql": + return semconv.DBSystemMSSQL + case "postgres", "postgresql": + return semconv.DBSystemPostgreSQL + case "sqlite": + return semconv.DBSystemSqlite + case "sqlserver": + return semconv.DBSystemKey.String("sqlserver") + case "clickhouse": + return semconv.DBSystemKey.String("clickhouse") + case "spanner": + return semconv.DBSystemKey.String("spanner") + default: + return attribute.KeyValue{} + } +} + +func dbOperation(query string) string { + s := cCommentRegex.ReplaceAllString(query, "") + s = lineCommentRegex.ReplaceAllString(s, "") + s = sqlPrefixRegex.ReplaceAllString(s, "") + return strings.ToLower(firstWordRegex.FindString(s)) +}