Skip to content

Commit

Permalink
feat: publish pubsub events (#1613)
Browse files Browse the repository at this point in the history
- pipes module calls for publishing events through to the controller to
the db
- fake implementation for tests not included
- added TopicEventKey. It will be generally useful I think, but also
required for how db events are set up needing a "key" column
  • Loading branch information
matt2e authored Jun 4, 2024
1 parent d4c9753 commit 502cf46
Show file tree
Hide file tree
Showing 22 changed files with 1,163 additions and 751 deletions.
9 changes: 9 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,15 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S
return connect.NewResponse(&ftlv1.SendFSMEventResponse{}), nil
}

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
err := s.dal.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Body)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
return connect.NewResponse(&ftlv1.PublishEventResponse{}), nil
}

func (s *Service) callWithRequest(
ctx context.Context,
req *connect.Request[ftlv1.CallRequest],
Expand Down
2 changes: 2 additions & 0 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (d *DAL) publishNotification(ctx context.Context, notification event, logge

case "topics":
// TODO: handle topics notifications
case "topic_events":
// TODO: handle topic events notifications
default:
panic(fmt.Sprintf("unknown table %q in DB notification", notification.Table))
}
Expand Down
21 changes: 21 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dal

import (
"context"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/internal/model"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
err := d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{
Key: model.NewTopicEventKey(module, topic),
Module: module,
Topic: topic,
Payload: payload,
})
if err != nil {
return translatePGError(err)
}
return nil
}
1 change: 1 addition & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,20 @@ VALUES (
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key),
sqlc.arg('sink')::TEXT);

-- name: PublishEventForTopic :exec
INSERT INTO topic_events ("key", topic_id, payload)
VALUES (
sqlc.arg('key')::topic_event_key,
(
SELECT topics.id
FROM topics
INNER JOIN modules ON topics.module_id = modules.id
WHERE modules.name = sqlc.arg('module')::TEXT
AND topics.name = sqlc.arg('topic')::TEXT
),
sqlc.arg('payload')
);

-- name: GetModuleConfiguration :one
SELECT value
FROM module_configuration
Expand Down
32 changes: 32 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,15 @@ CREATE TRIGGER topics_notify_event
FOR EACH ROW
EXECUTE PROCEDURE notify_event();

CREATE DOMAIN topic_event_key AS TEXT;

-- This table contains the actual topic data.
CREATE TABLE topic_events (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

"key" topic_event_key UNIQUE NOT NULL,
topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE,

payload BYTEA NOT NULL
);

Expand Down
Loading

0 comments on commit 502cf46

Please sign in to comment.