diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index ff5423068f..66238ae7fe 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -3,11 +3,17 @@ CREATE EXTENSION IF NOT EXISTS pgcrypto; -- Function for deployment notifications. -CREATE OR REPLACE FUNCTION notify_deployment_event() RETURNS TRIGGER AS +CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ DECLARE + topic TEXT; payload JSONB; BEGIN + topic := CASE TG_TABLE_NAME + WHEN 'deployments' THEN 'deployments_events' + WHEN 'topics' THEN 'topics_events' + WHEN 'topic_events' THEN 'topic_events_events' + END; IF TG_OP = 'DELETE' THEN payload = jsonb_build_object( @@ -22,7 +28,7 @@ BEGIN 'new', new.key ); END IF; - PERFORM pg_notify('notify_events', payload::text); + PERFORM pg_notify(topic, payload::text); RETURN NULL; END; $$ LANGUAGE plpgsql; @@ -64,7 +70,7 @@ CREATE TRIGGER deployments_notify_event AFTER INSERT OR UPDATE OR DELETE ON deployments FOR EACH ROW -EXECUTE PROCEDURE notify_deployment_event(); +EXECUTE PROCEDURE notify_event(); CREATE TABLE artefacts ( @@ -257,4 +263,73 @@ CREATE INDEX events_custom_key_2_idx ON events (custom_key_2); CREATE INDEX events_custom_key_3_idx ON events (custom_key_3); CREATE INDEX events_custom_key_4_idx ON events (custom_key_4); +-- Topics are a way to asynchronously publish data between modules. +CREATE TABLE topics ( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + + -- Each topic is associated with an owning module. + module_id BIGINT NOT NULL REFERENCES modules(id), + + -- Name of the topic. + name VARCHAR NOT NULL, + + -- Data reference to the payload data type in the owning module's schema. + type VARCHAR NOT NULL +); + +CREATE UNIQUE INDEX topics_module_name_idx ON topics(module_id, name); + +CREATE TRIGGER topics_notify_event + AFTER INSERT OR UPDATE OR DELETE + ON topics + FOR EACH ROW +EXECUTE PROCEDURE notify_event(); + +-- This table contains the actual topic data. +CREATE TABLE topic_events ( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + + topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, + + payload BYTEA NOT NULL +); + +CREATE TRIGGER topic_events_notify_event + AFTER INSERT OR UPDATE OR DELETE + ON topic_events + FOR EACH ROW +EXECUTE PROCEDURE notify_event(); + +-- A subscription to a topic. +-- +-- Multiple subscribers can consume from a single subscription. +CREATE TABLE topic_subscriptions ( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + + topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, + + -- Name of the subscription. + name VARCHAR UNIQUE NOT NULL, + + -- Cursor pointing into the topic_events table. + cursor BIGINT NOT NULL REFERENCES topic_events(id) ON DELETE CASCADE +); + +-- A subscriber to a topic. +-- +-- A subscriber is a 1:1 mapping between a subscription and a sink. +CREATE TABLE topic_subscribers ( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + + topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id), + + deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, + -- Name of the verb to call on the deployment. + verb VARCHAR NOT NULL +); + -- migrate:down \ No newline at end of file diff --git a/cmd/ftl/cmd_dev.go b/cmd/ftl/cmd_dev.go index d7f80351ee..6c59b346eb 100644 --- a/cmd/ftl/cmd_dev.go +++ b/cmd/ftl/cmd_dev.go @@ -47,17 +47,15 @@ func (d *devCmd) Run(ctx context.Context, projConfig projectconfig.Config) error return errors.New(ftlRunningErrorMsg) } - g.Go(func() error { - return d.ServeCmd.Run(ctx) - }) - } - - err := d.ServeCmd.pollControllerOnine(ctx, client) - if err != nil { - return err + g.Go(func() error { return d.ServeCmd.Run(ctx) }) } g.Go(func() error { + err := d.ServeCmd.waitForControllerOnline(ctx, client) + if err != nil { + return err + } + engine, err := buildengine.New(ctx, client, d.Dirs, d.External, buildengine.Parallelism(d.Parallelism)) if err != nil { return err diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 11ee07aefe..e9a12a2fb8 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -59,7 +59,7 @@ func (s *serveCmd) Run(ctx context.Context) error { runInBackground(logger) - err := s.pollControllerOnine(ctx, client) + err := s.waitForControllerOnline(ctx, client) if err != nil { return err } @@ -330,7 +330,7 @@ func pollContainerHealth(ctx context.Context, containerName string, timeout time for { select { case <-pollCtx.Done(): - return errors.New("timed out waiting for container to be healthy") + return fmt.Errorf("timed out waiting for container to be healthy: %w", pollCtx.Err()) case <-time.After(1 * time.Millisecond): output, err := exec.Capture(pollCtx, ".", "docker", "inspect", "--format", "{{.State.Health.Status}}", containerName) @@ -346,7 +346,8 @@ func pollContainerHealth(ctx context.Context, containerName string, timeout time } } -func (s *serveCmd) pollControllerOnine(ctx context.Context, client ftlv1connect.ControllerServiceClient) error { +// waitForControllerOnline polls the controller service until it is online. +func (s *serveCmd) waitForControllerOnline(ctx context.Context, client ftlv1connect.ControllerServiceClient) error { logger := log.FromContext(ctx) ctx, cancel := context.WithTimeout(ctx, s.StartupTimeout) @@ -372,7 +373,6 @@ func (s *serveCmd) pollControllerOnine(ctx context.Context, client ftlv1connect. if errors.Is(ctx.Err(), context.DeadlineExceeded) { logger.Errorf(ctx.Err(), "Timeout reached while polling for controller status") } - return ctx.Err() } }