Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
add topic and subscription to echo for testing

make mergeable

desc

minor refactor

final
  • Loading branch information
deniseli committed Jul 29, 2024
1 parent c419313 commit fbe10c2
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 8 deletions.
25 changes: 25 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -23,6 +24,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
if err != nil {
return dalerrs.TranslatePGError(err)
}
observability.PubSub.Published(ctx, module, topic)
return nil
}

Expand Down Expand Up @@ -100,11 +102,34 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
}

err = d.recordSubscriberCalled(ctx, subscription, subscriber.Sink)
if err != nil {
logger.Errorf(err, "failed to record subscriber call")
}

successful++
}
return successful, nil
}

func (d *DAL) recordSubscriberCalled(ctx context.Context, subscription sql.GetSubscriptionsNeedingUpdateRow, sinkRef schema.RefKey) error {
topic, err := d.db.GetTopicByKey(ctx, subscription.Topic)
if err != nil {
return dalerrs.TranslatePGError(err)
}

// GetModulesByID can take multiple IDs, so it returns a list of rows. This call
// should always return exactly one row, since we pass in one ID.
modules, err := d.db.GetModulesByID(ctx, []int64{subscription.ModuleID})
if err != nil {
return dalerrs.TranslatePGError(err)
}

observability.PubSub.SubscriberCalled(ctx, topic.Name, schema.RefKey{Module: modules[0].Name, Name: subscription.Name}, sinkRef)
return nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
err := d.db.CompleteEventForSubscription(ctx, name, module)
if err != nil {
Expand Down
22 changes: 21 additions & 1 deletion backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package observability

import "fmt"
import (
"errors"
"fmt"
)

var (
PubSub *PubSubMetrics
)

// To be migrated to init() in https://github.com/TBD54566975/ftl/pull/2177/files
func InitControllerObservability() error {
if err := InitFSMMetrics(); err != nil {
return fmt.Errorf("could not initialize controller metrics: %w", err)
}

return nil
}

func init() {
var errs error
var err error

PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)

if errs != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w\n", errs))
}
}
71 changes: 71 additions & 0 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package observability

import (
"context"
"errors"
"fmt"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttribute = "ftl.pubsub.topic.name"
pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref"
pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref"
)

type PubSubMetrics struct {
meter metric.Meter
published metric.Int64Counter
subscriberCalled metric.Int64Counter
}

func initPubSubMetrics() (*PubSubMetrics, error) {
result := &PubSubMetrics{}
var errs error
var err error

result.meter = otel.Meter(pubsubMeterName)

counterName := fmt.Sprintf("%s.published", pubsubMeterName)
if result.published, err = result.meter.Int64Counter(
counterName,
metric.WithDescription("the number of times that an event is published to a topic")); err != nil {
result.published, errs = handleInitCounterError(errs, err, counterName)
}

counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName)
if result.subscriberCalled, err = result.meter.Int64Counter(
counterName,
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
result.subscriberCalled, errs = handleInitCounterError(errs, err, counterName)
}

return result, nil
}

func handleInitCounterError(errs error, err error, counterName string) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err))
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) {
m.published.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicNameAttribute, topic),
))
}

func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) {
m.subscriberCalled.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, sink.Module),
attribute.String(pubsubTopicNameAttribute, topic),
attribute.String(pubsubSubscriptionRefAttribute, subscription.String()),
attribute.String(pubsubSubscriberRefAttribute, sink.String()),
))
}
1 change: 1 addition & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ SELECT
subs.key::subscription_key as key,
curser.key as cursor,
topics.key::topic_key as topic,
subs.module_id,
subs.name
FROM topic_subscriptions subs
LEFT JOIN topics ON subs.topic_id = topics.id
Expand Down Expand Up @@ -727,3 +728,9 @@ UPDATE topic_subscriptions
SET state = 'idle'
WHERE name = @name::TEXT
AND module_id = (SELECT id FROM module);

-- name: GetTopicByKey :one
SELECT *
FROM topics
WHERE "key" = sqlc.arg('key')::topic_key
LIMIT 1;
33 changes: 29 additions & 4 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions examples/go/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ import (
"context"
"fmt"

"ftl/builtin"
"ftl/time"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

var defaultName = ftl.Config[string]("default")

type EchoEvent struct {
Message string
}

var Echotopic = ftl.Topic[EchoEvent]("echotopic")
var sub = ftl.Subscription(Echotopic, "sub")

// An echo request.
type EchoRequest struct {
Name ftl.Option[string] `json:"name"`
Expand All @@ -24,11 +32,21 @@ type EchoResponse struct {
// Echo returns a greeting with the current time.
//
//ftl:verb
func Echo(ctx context.Context, req EchoRequest) (EchoResponse, error) {
//ftl:ingress POST /echotopic
func Echo(ctx context.Context, req builtin.HttpRequest[EchoRequest]) (builtin.HttpResponse[EchoResponse, string], error) {
tresp, err := ftl.Call(ctx, time.Time, time.TimeRequest{})
if err != nil {
return EchoResponse{}, err
return builtin.HttpResponse[EchoResponse, string]{}, err
}
if err := Echotopic.Publish(ctx, EchoEvent{Message: "adf"}); err != nil {
return builtin.HttpResponse[EchoResponse, string]{}, err
}

return EchoResponse{Message: fmt.Sprintf("Hello, %s!!! It is %s!", req.Name.Default(defaultName.Get(ctx)), tresp.Time)}, nil
return builtin.HttpResponse[EchoResponse, string]{Body: ftl.Some(EchoResponse{Message: fmt.Sprintf("Hello, %s!!! It is %s!", req.Body.Name.Default(defaultName.Get(ctx)), tresp.Time)})}, nil
}

//ftl:verb
//ftl:subscribe sub
func EchoSinkOne(ctx context.Context, e EchoEvent) error {
return nil
}

0 comments on commit fbe10c2

Please sign in to comment.