diff --git a/backend/controller/sql/databasetesting/devel.go b/backend/controller/sql/databasetesting/devel.go index a52f3fb4c2..e2a3a06c72 100644 --- a/backend/controller/sql/databasetesting/devel.go +++ b/backend/controller/sql/databasetesting/devel.go @@ -8,7 +8,9 @@ import ( "strings" "time" + "github.com/XSAM/otelsql" _ "github.com/jackc/pgx/v5/stdlib" // pgx driver + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "github.com/TBD54566975/ftl/backend/controller/sql" "github.com/TBD54566975/ftl/internal/log" @@ -29,9 +31,13 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB, var conn *stdsql.DB for range 10 { - conn, err = stdsql.Open("pgx", noDBDSN.String()) + conn, err = otelsql.Open("pgx", noDBDSN.String()) if err == nil { defer conn.Close() + err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) + if err != nil { + panic(err) + } break } logger.Debugf("Waiting for database to be ready: %v", err) @@ -72,10 +78,14 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB, return nil, err } - realConn, err := stdsql.Open("pgx", dsn) + realConn, err := otelsql.Open("pgx", dsn) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } + err = otelsql.RegisterDBStatsMetrics(realConn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) + if err != nil { + return nil, fmt.Errorf("failed to register db metrics: %w", err) + } // Reset transient state in the database to a clean state for development purposes. // This includes things like resetting the state of async calls, leases, // controller/runner registration, etc. but not anything more. diff --git a/cmd/ftl-controller/main.go b/cmd/ftl-controller/main.go index 42b0a54d01..83c2e88a46 100644 --- a/cmd/ftl-controller/main.go +++ b/cmd/ftl-controller/main.go @@ -2,16 +2,17 @@ package main import ( "context" - "database/sql" "fmt" "os" "strconv" "time" + "github.com/XSAM/otelsql" "github.com/alecthomas/kong" "github.com/alecthomas/types/optional" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/secretsmanager" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/controller" @@ -53,7 +54,9 @@ func main() { kctx.FatalIfErrorf(err, "failed to initialize observability") // The FTL controller currently only supports DB as a configuration provider/resolver. - conn, err := sql.Open("pgx", cli.ControllerConfig.DSN) + conn, err := otelsql.Open("pgx", cli.ControllerConfig.DSN) + kctx.FatalIfErrorf(err) + err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) kctx.FatalIfErrorf(err) dal, err := dal.New(ctx, conn, optional.Some[string](*cli.ControllerConfig.KMSURI)) kctx.FatalIfErrorf(err) diff --git a/cmd/ftl/cmd_box_run.go b/cmd/ftl/cmd_box_run.go index 971df8a915..cdbf3f5849 100644 --- a/cmd/ftl/cmd_box_run.go +++ b/cmd/ftl/cmd_box_run.go @@ -2,13 +2,14 @@ package main import ( "context" - "database/sql" "fmt" "net/url" "time" + "github.com/XSAM/otelsql" _ "github.com/jackc/pgx/v5/stdlib" // pgx driver "github.com/jpillora/backoff" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl/backend/controller" @@ -58,10 +59,18 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er } // Bring up the DB connection and DAL. - conn, err := sql.Open("pgx", config.DSN) + conn, err := otelsql.Open("pgx", config.DSN) if err != nil { return fmt.Errorf("failed to bring up DB connection: %w", err) } + err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) + if err != nil { + return fmt.Errorf("failed to register DB metrics: %w", err) + } + encryptors, err := config.EncryptionKeys.Encryptors(false) + if err != nil { + return fmt.Errorf("failed to create encryptors: %w", err) + } wg := errgroup.Group{} wg.Go(func() error { diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 62bfd843c6..5147ddfcb1 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "errors" "fmt" "net" @@ -15,8 +14,10 @@ import ( "time" "connectrpc.com/connect" + "github.com/XSAM/otelsql" "github.com/alecthomas/types/optional" _ "github.com/jackc/pgx/v5/stdlib" // pgx driver + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl" @@ -148,10 +149,18 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini controllerCtx = cf.ContextWithSecrets(controllerCtx, sm) // Bring up the DB connection and DAL. - conn, err := sql.Open("pgx", config.DSN) + conn, err := otelsql.Open("pgx", config.DSN) if err != nil { return fmt.Errorf("failed to bring up DB connection: %w", err) } + err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) + if err != nil { + return fmt.Errorf("failed to register DB metrics: %w", err) + } + encryptors, err := config.EncryptionKeys.Encryptors(false) + if err != nil { + return fmt.Errorf("failed to create encryptors: %w", err) + } wg.Go(func() error { if err := controller.Start(controllerCtx, config, runnerScaling, conn); err != nil { diff --git a/examples/go/echo/go.mod b/examples/go/echo/go.mod index 272b973492..7769185838 100644 --- a/examples/go/echo/go.mod +++ b/examples/go/echo/go.mod @@ -10,6 +10,7 @@ require ( connectrpc.com/connect v1.16.2 // indirect connectrpc.com/grpcreflect v1.2.0 // indirect connectrpc.com/otelconnect v0.7.1 // indirect + github.com/XSAM/otelsql v0.32.0 // indirect github.com/alecthomas/atomic v0.1.0-alpha2 // indirect github.com/alecthomas/concurrency v0.0.2 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect diff --git a/examples/go/echo/go.sum b/examples/go/echo/go.sum index 9fbb9ebc36..9086537f12 100644 --- a/examples/go/echo/go.sum +++ b/examples/go/echo/go.sum @@ -6,6 +6,8 @@ connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/XSAM/otelsql v0.32.0 h1:vDRE4nole0iOOlTaC/Bn6ti7VowzgxK39n3Ll1Kt7i0= +github.com/XSAM/otelsql v0.32.0/go.mod h1:Ary0hlyVBbaSwo8atZB8Aoothg9s/LBJj/N/p5qDmLM= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= diff --git a/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.mod b/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.mod index f190123044..ce3ab84d32 100644 --- a/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.mod +++ b/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.mod @@ -11,6 +11,7 @@ require ( connectrpc.com/connect v1.16.2 // indirect connectrpc.com/grpcreflect v1.2.0 // indirect connectrpc.com/otelconnect v0.7.1 // indirect + github.com/XSAM/otelsql v0.32.0 // indirect github.com/alecthomas/atomic v0.1.0-alpha2 // indirect github.com/alecthomas/concurrency v0.0.2 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect diff --git a/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.sum b/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.sum index 9fbb9ebc36..9569a27c55 100644 --- a/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.sum +++ b/go-runtime/ftl/reflection/testdata/go/runtimereflection/go.sum @@ -6,6 +6,8 @@ connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/XSAM/otelsql v0.32.0 h1:vDRE4nole0iOOlTaC/Bn6ti7VowzgxK39n3Ll1Kt7i0= +github.com/XSAM/otelsql v0.32.0/go.mod h1:Ary0hlyVBbaSwo8atZB8Aoothg9s/LBJj/N/p5qDmLM= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= @@ -132,6 +134,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= diff --git a/go.mod b/go.mod index 229522885f..46285e945b 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/BurntSushi/toml v1.4.0 github.com/TBD54566975/golang-tools v0.2.1 github.com/TBD54566975/scaffolder v1.0.0 + github.com/XSAM/otelsql v0.32.0 github.com/alecthomas/assert/v2 v2.10.0 github.com/alecthomas/atomic v0.1.0-alpha2 github.com/alecthomas/chroma/v2 v2.14.0 @@ -68,6 +69,7 @@ require ( golang.org/x/sync v0.8.0 golang.org/x/term v0.23.0 google.golang.org/protobuf v1.34.2 + gotest.tools/v3 v3.5.1 modernc.org/sqlite v1.32.0 ) @@ -87,6 +89,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/iancoleman/strcase v0.3.0 // indirect @@ -110,7 +113,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/tools v0.23.0 // indirect - gotest.tools/v3 v3.5.1 // indirect modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect ) diff --git a/go.sum b/go.sum index 93f56bfac0..d6974f3379 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/TBD54566975/golang-tools v0.2.1 h1:jzP27dzvJRb43Z9xTbRCPOT/rZD43FZkqV github.com/TBD54566975/golang-tools v0.2.1/go.mod h1:rEEXIq0/pFgZqi/MTOq4DBmVpLHLgI9WocJWXYhu050= github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= +github.com/XSAM/otelsql v0.32.0 h1:vDRE4nole0iOOlTaC/Bn6ti7VowzgxK39n3Ll1Kt7i0= +github.com/XSAM/otelsql v0.32.0/go.mod h1:Ary0hlyVBbaSwo8atZB8Aoothg9s/LBJj/N/p5qDmLM= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= diff --git a/internal/modulecontext/database.go b/internal/modulecontext/database.go index fef41e6f54..0551987e39 100644 --- a/internal/modulecontext/database.go +++ b/internal/modulecontext/database.go @@ -6,6 +6,8 @@ import ( "strconv" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/XSAM/otelsql" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) // Database represents a database connection based on a DSN @@ -19,10 +21,14 @@ type Database struct { // NewDatabase creates a Database that can be added to ModuleContext func NewDatabase(dbType DBType, dsn string) (Database, error) { - db, err := sql.Open("pgx", dsn) + db, err := otelsql.Open("pgx", dsn) if err != nil { return Database{}, fmt.Errorf("failed to bring up DB connection: %w", err) } + err = otelsql.RegisterDBStatsMetrics(db, otelsql.WithAttributes(semconv.DBSystemPostgreSQL)) + if err != nil { + return Database{}, fmt.Errorf("failed to register db metrics: %w", err) + } return Database{ DSN: dsn, DBType: dbType, diff --git a/internal/observability/client_test.go b/internal/observability/client_test.go index 1cb3537ac4..b1582ba137 100644 --- a/internal/observability/client_test.go +++ b/internal/observability/client_test.go @@ -10,5 +10,5 @@ import ( func TestSchemaMismatch(t *testing.T) { dflt := resource.Default() - assert.Equal(t, dflt.SchemaURL(), schemaURL, `change import in client.go to: semconv "go.opentelemetry.io/otel/semconv/v%s"`, path.Base(dflt.SchemaURL())) + assert.Equal(t, dflt.SchemaURL(), schemaURL, `in every file that imports go.opentelemetry.io/otel/semconv, change the import to: semconv "go.opentelemetry.io/otel/semconv/v%s"`, path.Base(dflt.SchemaURL())) } diff --git a/internal/reflect/reflect.go b/internal/reflect/reflect.go index 079c4edb93..6f25cca53b 100644 --- a/internal/reflect/reflect.go +++ b/internal/reflect/reflect.go @@ -13,6 +13,7 @@ package reflect import ( + "container/list" "fmt" "reflect" "strings" @@ -131,6 +132,11 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) { return src } + // Special case list.List to handle its internal structure + if reflect.TypeOf(src) == reflect.TypeFor[*list.List]() { + return copyList(src.(*list.List), ptrs, copyConf) + } + // Look up the corresponding copy function. switch v.Kind() { case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, @@ -139,14 +145,18 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) { reflect.Complex64, reflect.Complex128, reflect.Func: dst = copyPremitive(src, ptrs, copyConf) case reflect.String: - dst = strings.Clone(src.(string)) + if v.Type() == reflect.TypeFor[string]() { + dst = strings.Clone(src.(string)) + } else { + dst = copyStringAlias(src, ptrs, copyConf) + } case reflect.Slice: dst = copySlice(src, ptrs, copyConf) case reflect.Array: dst = copyArray(src, ptrs, copyConf) case reflect.Map: dst = copyMap(src, ptrs, copyConf) - case reflect.Ptr, reflect.UnsafePointer: + case reflect.Pointer, reflect.UnsafePointer: dst = copyPointer(src, ptrs, copyConf) case reflect.Struct: dst = copyStruct(src, ptrs, copyConf) @@ -160,16 +170,35 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) { return } +func copyList(src *list.List, ptrs map[uintptr]any, copyConf *copyConfig) *list.List { + if src == nil { + return nil + } + dst := list.New() + for e := src.Front(); e != nil; e = e.Next() { + copiedValue := copyAny(e.Value, ptrs, copyConf) + dst.PushBack(copiedValue) + } + return dst +} + func copyPremitive(src any, ptr map[uintptr]any, copyConf *copyConfig) (dst any) { kind := reflect.ValueOf(src).Kind() switch kind { - case reflect.Array, reflect.Chan, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.Struct, reflect.UnsafePointer: + case reflect.Array, reflect.Chan, reflect.Interface, reflect.Map, reflect.Pointer, reflect.Slice, reflect.Struct, reflect.UnsafePointer: panic(fmt.Sprintf("reflect: internal error: type %v is not a primitive", kind)) } dst = src return } +func copyStringAlias(src any, ptr map[uintptr]any, copyConf *copyConfig) any { + v := reflect.ValueOf(src) + dc := reflect.New(v.Type()).Elem() + dc.Set(v) + return dc.Interface() +} + func copySlice(x any, ptrs map[uintptr]any, copyConf *copyConfig) any { v := reflect.ValueOf(x) kind := v.Kind() @@ -224,8 +253,14 @@ func copyPointer(x any, ptrs map[uintptr]any, copyConf *copyConfig) any { v := reflect.ValueOf(x) t := reflect.TypeOf(x) - if v.Kind() != reflect.Pointer { - panic(fmt.Errorf("reflect: internal error: must be a Pointer or Ptr; got %v", v.Kind())) + if v.Kind() != reflect.Pointer && v.Kind() != reflect.UnsafePointer { + panic(fmt.Errorf("reflect: internal error: must be a Pointer or UnsafePointer; got %v", v.Kind())) + } + + if v.Kind() == reflect.UnsafePointer { + // For unsafe.Pointer, just return a copy of the pointer itself, since it + // has no element type to copy (t.Elem() panics). + return x } if v.IsNil() { diff --git a/internal/reflect/reflect_test.go b/internal/reflect/reflect_test.go index a6f138e042..349a98021d 100644 --- a/internal/reflect/reflect_test.go +++ b/internal/reflect/reflect_test.go @@ -1,9 +1,45 @@ package reflect import ( + "container/list" "testing" + "unsafe" + + "gotest.tools/v3/assert" ) +type mystring string +type structWithMystring struct { + str mystring +} + +func TestAliasedString(t *testing.T) { + output := DeepCopy(structWithMystring{"asdf"}) + assert.Equal(t, output, structWithMystring{"asdf"}) +} + +func TestListElements(t *testing.T) { + l := list.New() + l.PushBack("one") + output := DeepCopy(l) + assert.Equal(t, output.Front().Value, l.Front().Value) + assert.Equal(t, output.Len(), l.Len()) +} + +type structWithUnsafePtr struct { + ptr unsafe.Pointer +} + +func TestUnsafePointer(t *testing.T) { + var x int + // #nosec G103 + obj := structWithUnsafePtr{ptr: unsafe.Pointer(&x)} + DeepCopy(obj) // should not panic + x = 3 + intPtr := (*int)(obj.ptr) + assert.Equal(t, 3, *intPtr) +} + type structOfPointers struct { intPtr *int floatPtr *float64