Skip to content

Commit

Permalink
feat: add pubsub tables (#1149)
Browse files Browse the repository at this point in the history
Also generalise PG topic publishing so we can move to an event based
model if required.
  • Loading branch information
alecthomas authored Apr 2, 2024
1 parent c00ce9a commit e7291a7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
81 changes: 78 additions & 3 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ CREATE
EXTENSION IF NOT EXISTS pgcrypto;

-- Function for deployment notifications.
CREATE OR REPLACE FUNCTION notify_deployment_event() RETURNS TRIGGER AS
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS
$$
DECLARE
topic TEXT;
payload JSONB;
BEGIN
topic := CASE TG_TABLE_NAME
WHEN 'deployments' THEN 'deployments_events'
WHEN 'topics' THEN 'topics_events'
WHEN 'topic_events' THEN 'topic_events_events'
END;
IF TG_OP = 'DELETE'
THEN
payload = jsonb_build_object(
Expand All @@ -22,7 +28,7 @@ BEGIN
'new', new.key
);
END IF;
PERFORM pg_notify('notify_events', payload::text);
PERFORM pg_notify(topic, payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -64,7 +70,7 @@ CREATE TRIGGER deployments_notify_event
AFTER INSERT OR UPDATE OR DELETE
ON deployments
FOR EACH ROW
EXECUTE PROCEDURE notify_deployment_event();
EXECUTE PROCEDURE notify_event();

CREATE TABLE artefacts
(
Expand Down Expand Up @@ -257,4 +263,73 @@ CREATE INDEX events_custom_key_2_idx ON events (custom_key_2);
CREATE INDEX events_custom_key_3_idx ON events (custom_key_3);
CREATE INDEX events_custom_key_4_idx ON events (custom_key_4);

-- Topics are a way to asynchronously publish data between modules.
CREATE TABLE topics (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

-- Each topic is associated with an owning module.
module_id BIGINT NOT NULL REFERENCES modules(id),

-- Name of the topic.
name VARCHAR NOT NULL,

-- Data reference to the payload data type in the owning module's schema.
type VARCHAR NOT NULL
);

CREATE UNIQUE INDEX topics_module_name_idx ON topics(module_id, name);

CREATE TRIGGER topics_notify_event
AFTER INSERT OR UPDATE OR DELETE
ON topics
FOR EACH ROW
EXECUTE PROCEDURE notify_event();

-- This table contains the actual topic data.
CREATE TABLE topic_events (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE,

payload BYTEA NOT NULL
);

CREATE TRIGGER topic_events_notify_event
AFTER INSERT OR UPDATE OR DELETE
ON topic_events
FOR EACH ROW
EXECUTE PROCEDURE notify_event();

-- A subscription to a topic.
--
-- Multiple subscribers can consume from a single subscription.
CREATE TABLE topic_subscriptions (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE,

-- Name of the subscription.
name VARCHAR UNIQUE NOT NULL,

-- Cursor pointing into the topic_events table.
cursor BIGINT NOT NULL REFERENCES topic_events(id) ON DELETE CASCADE
);

-- A subscriber to a topic.
--
-- A subscriber is a 1:1 mapping between a subscription and a sink.
CREATE TABLE topic_subscribers (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id),

deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
-- Name of the verb to call on the deployment.
verb VARCHAR NOT NULL
);

-- migrate:down
14 changes: 6 additions & 8 deletions cmd/ftl/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ func (d *devCmd) Run(ctx context.Context, projConfig projectconfig.Config) error
return errors.New(ftlRunningErrorMsg)
}

g.Go(func() error {
return d.ServeCmd.Run(ctx)
})
}

err := d.ServeCmd.pollControllerOnine(ctx, client)
if err != nil {
return err
g.Go(func() error { return d.ServeCmd.Run(ctx) })
}

g.Go(func() error {
err := d.ServeCmd.waitForControllerOnline(ctx, client)
if err != nil {
return err
}

engine, err := buildengine.New(ctx, client, d.Dirs, d.External, buildengine.Parallelism(d.Parallelism))
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions cmd/ftl/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *serveCmd) Run(ctx context.Context) error {

runInBackground(logger)

err := s.pollControllerOnine(ctx, client)
err := s.waitForControllerOnline(ctx, client)
if err != nil {
return err
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func pollContainerHealth(ctx context.Context, containerName string, timeout time
for {
select {
case <-pollCtx.Done():
return errors.New("timed out waiting for container to be healthy")
return fmt.Errorf("timed out waiting for container to be healthy: %w", pollCtx.Err())

case <-time.After(1 * time.Millisecond):
output, err := exec.Capture(pollCtx, ".", "docker", "inspect", "--format", "{{.State.Health.Status}}", containerName)
Expand All @@ -346,7 +346,8 @@ func pollContainerHealth(ctx context.Context, containerName string, timeout time
}
}

func (s *serveCmd) pollControllerOnine(ctx context.Context, client ftlv1connect.ControllerServiceClient) error {
// waitForControllerOnline polls the controller service until it is online.
func (s *serveCmd) waitForControllerOnline(ctx context.Context, client ftlv1connect.ControllerServiceClient) error {
logger := log.FromContext(ctx)

ctx, cancel := context.WithTimeout(ctx, s.StartupTimeout)
Expand All @@ -372,7 +373,6 @@ func (s *serveCmd) pollControllerOnine(ctx context.Context, client ftlv1connect.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
logger.Errorf(ctx.Err(), "Timeout reached while polling for controller status")
}

return ctx.Err()
}
}
Expand Down

0 comments on commit e7291a7

Please sign in to comment.