Skip to content

Commit

Permalink
register db.sql metrics
Browse files Browse the repository at this point in the history
cleanup

lint

client_test.go

temp

go.mod

cleanup

undo project toml change

integration test go.mod and go.sum testdata

handle unsafe.pointer
  • Loading branch information
deniseli committed Aug 15, 2024
1 parent af62b9e commit e086161
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 16 deletions.
14 changes: 12 additions & 2 deletions backend/controller/sql/databasetesting/devel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"strings"
"time"

"github.com/XSAM/otelsql"
_ "github.com/jackc/pgx/v5/stdlib" // pgx driver
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -29,9 +31,13 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB,

var conn *stdsql.DB
for range 10 {
conn, err = stdsql.Open("pgx", noDBDSN.String())
conn, err = otelsql.Open("pgx", noDBDSN.String())
if err == nil {
defer conn.Close()
err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
if err != nil {
panic(err)
}
break
}
logger.Debugf("Waiting for database to be ready: %v", err)
Expand Down Expand Up @@ -72,10 +78,14 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB,
return nil, err
}

realConn, err := stdsql.Open("pgx", dsn)
realConn, err := otelsql.Open("pgx", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
err = otelsql.RegisterDBStatsMetrics(realConn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
if err != nil {
return nil, fmt.Errorf("failed to register db metrics: %w", err)
}
// Reset transient state in the database to a clean state for development purposes.
// This includes things like resetting the state of async calls, leases,
// controller/runner registration, etc. but not anything more.
Expand Down
7 changes: 5 additions & 2 deletions cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package main

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"time"

"github.com/XSAM/otelsql"
"github.com/alecthomas/kong"
"github.com/alecthomas/types/optional"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller"
Expand Down Expand Up @@ -53,7 +54,9 @@ func main() {
kctx.FatalIfErrorf(err, "failed to initialize observability")

// The FTL controller currently only supports DB as a configuration provider/resolver.
conn, err := sql.Open("pgx", cli.ControllerConfig.DSN)
conn, err := otelsql.Open("pgx", cli.ControllerConfig.DSN)
kctx.FatalIfErrorf(err)
err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
kctx.FatalIfErrorf(err)
dal, err := dal.New(ctx, conn, optional.Some[string](*cli.ControllerConfig.KMSURI))
kctx.FatalIfErrorf(err)
Expand Down
13 changes: 11 additions & 2 deletions cmd/ftl/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package main

import (
"context"
"database/sql"
"fmt"
"net/url"
"time"

"github.com/XSAM/otelsql"
_ "github.com/jackc/pgx/v5/stdlib" // pgx driver
"github.com/jpillora/backoff"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller"
Expand Down Expand Up @@ -58,10 +59,18 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
}

// Bring up the DB connection and DAL.
conn, err := sql.Open("pgx", config.DSN)
conn, err := otelsql.Open("pgx", config.DSN)
if err != nil {
return fmt.Errorf("failed to bring up DB connection: %w", err)
}
err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
if err != nil {
return fmt.Errorf("failed to register DB metrics: %w", err)
}
encryptors, err := config.EncryptionKeys.Encryptors(false)
if err != nil {
return fmt.Errorf("failed to create encryptors: %w", err)
}

wg := errgroup.Group{}
wg.Go(func() error {
Expand Down
13 changes: 11 additions & 2 deletions cmd/ftl/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"net"
Expand All @@ -15,8 +14,10 @@ import (
"time"

"connectrpc.com/connect"
"github.com/XSAM/otelsql"
"github.com/alecthomas/types/optional"
_ "github.com/jackc/pgx/v5/stdlib" // pgx driver
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl"
Expand Down Expand Up @@ -148,10 +149,18 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
controllerCtx = cf.ContextWithSecrets(controllerCtx, sm)

// Bring up the DB connection and DAL.
conn, err := sql.Open("pgx", config.DSN)
conn, err := otelsql.Open("pgx", config.DSN)
if err != nil {
return fmt.Errorf("failed to bring up DB connection: %w", err)
}
err = otelsql.RegisterDBStatsMetrics(conn, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
if err != nil {
return fmt.Errorf("failed to register DB metrics: %w", err)
}
encryptors, err := config.EncryptionKeys.Encryptors(false)
if err != nil {
return fmt.Errorf("failed to create encryptors: %w", err)
}

wg.Go(func() error {
if err := controller.Start(controllerCtx, config, runnerScaling, conn); err != nil {
Expand Down
1 change: 1 addition & 0 deletions examples/go/echo/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/XSAM/otelsql v0.32.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/go/echo/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/XSAM/otelsql v0.32.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/BurntSushi/toml v1.4.0
github.com/TBD54566975/golang-tools v0.2.1
github.com/TBD54566975/scaffolder v1.0.0
github.com/XSAM/otelsql v0.32.0
github.com/alecthomas/assert/v2 v2.10.0
github.com/alecthomas/atomic v0.1.0-alpha2
github.com/alecthomas/chroma/v2 v2.14.0
Expand Down Expand Up @@ -68,6 +69,7 @@ require (
golang.org/x/sync v0.8.0
golang.org/x/term v0.23.0
google.golang.org/protobuf v1.34.2
gotest.tools/v3 v3.5.1
modernc.org/sqlite v1.32.0
)

Expand All @@ -87,6 +89,7 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
Expand All @@ -110,7 +113,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/tools v0.23.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion internal/modulecontext/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/XSAM/otelsql"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

// Database represents a database connection based on a DSN
Expand All @@ -19,10 +21,14 @@ type Database struct {

// NewDatabase creates a Database that can be added to ModuleContext
func NewDatabase(dbType DBType, dsn string) (Database, error) {
db, err := sql.Open("pgx", dsn)
db, err := otelsql.Open("pgx", dsn)
if err != nil {
return Database{}, fmt.Errorf("failed to bring up DB connection: %w", err)
}
err = otelsql.RegisterDBStatsMetrics(db, otelsql.WithAttributes(semconv.DBSystemPostgreSQL))
if err != nil {
return Database{}, fmt.Errorf("failed to register db metrics: %w", err)
}
return Database{
DSN: dsn,
DBType: dbType,
Expand Down
2 changes: 1 addition & 1 deletion internal/observability/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (

func TestSchemaMismatch(t *testing.T) {
dflt := resource.Default()
assert.Equal(t, dflt.SchemaURL(), schemaURL, `change import in client.go to: semconv "go.opentelemetry.io/otel/semconv/v%s"`, path.Base(dflt.SchemaURL()))
assert.Equal(t, dflt.SchemaURL(), schemaURL, `in every file that imports go.opentelemetry.io/otel/semconv, change the import to: semconv "go.opentelemetry.io/otel/semconv/v%s"`, path.Base(dflt.SchemaURL()))
}
45 changes: 40 additions & 5 deletions internal/reflect/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package reflect

import (
"container/list"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -131,6 +132,11 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) {
return src
}

// Special case list.List to handle its internal structure
if reflect.TypeOf(src) == reflect.TypeFor[*list.List]() {
return copyList(src.(*list.List), ptrs, copyConf)
}

// Look up the corresponding copy function.
switch v.Kind() {
case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32,
Expand All @@ -139,14 +145,18 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) {
reflect.Complex64, reflect.Complex128, reflect.Func:
dst = copyPremitive(src, ptrs, copyConf)
case reflect.String:
dst = strings.Clone(src.(string))
if v.Type() == reflect.TypeFor[string]() {
dst = strings.Clone(src.(string))
} else {
dst = copyStringAlias(src, ptrs, copyConf)
}
case reflect.Slice:
dst = copySlice(src, ptrs, copyConf)
case reflect.Array:
dst = copyArray(src, ptrs, copyConf)
case reflect.Map:
dst = copyMap(src, ptrs, copyConf)
case reflect.Ptr, reflect.UnsafePointer:
case reflect.Pointer, reflect.UnsafePointer:
dst = copyPointer(src, ptrs, copyConf)
case reflect.Struct:
dst = copyStruct(src, ptrs, copyConf)
Expand All @@ -160,16 +170,35 @@ func copyAny(src any, ptrs map[uintptr]any, copyConf *copyConfig) (dst any) {
return
}

func copyList(src *list.List, ptrs map[uintptr]any, copyConf *copyConfig) *list.List {
if src == nil {
return nil
}
dst := list.New()
for e := src.Front(); e != nil; e = e.Next() {
copiedValue := copyAny(e.Value, ptrs, copyConf)
dst.PushBack(copiedValue)
}
return dst
}

func copyPremitive(src any, ptr map[uintptr]any, copyConf *copyConfig) (dst any) {
kind := reflect.ValueOf(src).Kind()
switch kind {
case reflect.Array, reflect.Chan, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.Struct, reflect.UnsafePointer:
case reflect.Array, reflect.Chan, reflect.Interface, reflect.Map, reflect.Pointer, reflect.Slice, reflect.Struct, reflect.UnsafePointer:
panic(fmt.Sprintf("reflect: internal error: type %v is not a primitive", kind))
}
dst = src
return
}

func copyStringAlias(src any, ptr map[uintptr]any, copyConf *copyConfig) any {
v := reflect.ValueOf(src)
dc := reflect.New(v.Type()).Elem()
dc.Set(v)
return dc.Interface()
}

func copySlice(x any, ptrs map[uintptr]any, copyConf *copyConfig) any {
v := reflect.ValueOf(x)
kind := v.Kind()
Expand Down Expand Up @@ -224,8 +253,14 @@ func copyPointer(x any, ptrs map[uintptr]any, copyConf *copyConfig) any {
v := reflect.ValueOf(x)
t := reflect.TypeOf(x)

if v.Kind() != reflect.Pointer {
panic(fmt.Errorf("reflect: internal error: must be a Pointer or Ptr; got %v", v.Kind()))
if v.Kind() != reflect.Pointer && v.Kind() != reflect.UnsafePointer {
panic(fmt.Errorf("reflect: internal error: must be a Pointer or UnsafePointer; got %v", v.Kind()))
}

if v.Kind() == reflect.UnsafePointer {
// For unsafe.Pointer, just return a copy of the pointer itself, since it
// has no element type to copy (t.Elem() panics).
return x
}

if v.IsNil() {
Expand Down
Loading

0 comments on commit e086161

Please sign in to comment.