Skip to content

Commit

Permalink
feat: add topics, subscriptions & subscribers to db on deployment (#1608
Browse files Browse the repository at this point in the history
)

Keys will have the following prefixes:
- topic: `top`
- subscription: `sub`
- subscriber: `subr`
  • Loading branch information
matt2e authored May 31, 2024
1 parent 0381efa commit cc211c1
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 13 deletions.
59 changes: 59 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,41 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
return model.DeploymentKey{}, fmt.Errorf("failed to upsert module: %w", translatePGError(err))
}

// upsert topics
for _, decl := range moduleSchema.Decls {
t, ok := decl.(*schema.Topic)
if !ok {
continue
}
err := tx.UpsertTopic(ctx, sql.UpsertTopicParams{
Topic: model.NewTopicKey(moduleSchema.Name, t.Name),
Module: moduleSchema.Name,
Name: t.Name,
EventType: t.Event.String(),
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert topic: %w", translatePGError(err))
}
}

// upsert subscriptions
for _, decl := range moduleSchema.Decls {
s, ok := decl.(*schema.Subscription)
if !ok {
continue
}
err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{
Key: model.NewSubscriptionKey(moduleSchema.Name, s.Name),
Module: moduleSchema.Name,
TopicModule: s.Topic.Module,
TopicName: s.Topic.Name,
Name: s.Name,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscription: %w", translatePGError(err))
}
}

deploymentKey := model.NewDeploymentKey(moduleSchema.Name)

// Create the deployment
Expand Down Expand Up @@ -550,6 +585,30 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

// create subscribers
for _, decl := range moduleSchema.Decls {
v, ok := decl.(*schema.Verb)
if !ok {
continue
}
for _, md := range v.Metadata {
s, ok := md.(*schema.MetadataSubscriber)
if !ok {
continue
}
err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name),
Module: moduleSchema.Name,
SubscriptionName: s.Name,
Deployment: deploymentKey,
Sink: v.Name,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
}
}
}

return deploymentKey, nil
}

Expand Down
2 changes: 2 additions & 0 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (d *DAL) publishNotification(ctx context.Context, notification event, logge
logger.Tracef("Deployment notification: %s", deployment)
d.DeploymentChanges.Publish(deployment)

case "topics":
// TODO: handle topics notifications
default:
panic(fmt.Sprintf("unknown table %q in DB notification", notification.Table))
}
Expand Down
11 changes: 6 additions & 5 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,52 @@ WHERE
fsm = @fsm::schema_ref AND key = @key::TEXT
RETURNING true;

-- name: UpsertTopic :exec
INSERT INTO topics (key, module_id, name, type)
VALUES (
sqlc.arg('topic')::topic_key,
(SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT LIMIT 1),
sqlc.arg('name')::TEXT,
sqlc.arg('event_type')::TEXT
)
ON CONFLICT (name, module_id) DO
UPDATE SET
type = sqlc.arg('event_type')::TEXT
RETURNING id;

-- name: UpsertSubscription :exec
INSERT INTO topic_subscriptions (key, topic_id, module_id, name)
VALUES (
sqlc.arg('key')::subscription_key,
(
SELECT topics.id as id
FROM topics
INNER JOIN modules ON topics.module_id = modules.id
WHERE modules.name = sqlc.arg('topic_module')::TEXT
AND topics.name = sqlc.arg('topic_name')::TEXT
),
(SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT),
sqlc.arg('name')::TEXT
)
ON CONFLICT (name, module_id) DO
UPDATE SET
topic_id = excluded.topic_id
RETURNING id;

-- name: InsertSubscriber :exec
INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink)
VALUES (
sqlc.arg('key')::subscriber_key,
(
SELECT topic_subscriptions.id as id
FROM topic_subscriptions
INNER JOIN modules ON topic_subscriptions.module_id = modules.id
WHERE modules.name = sqlc.arg('module')::TEXT
AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT
),
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key),
sqlc.arg('sink')::TEXT);

-- name: GetModuleConfiguration :one
SELECT value
FROM module_configuration
Expand Down
104 changes: 104 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,18 @@ CREATE TABLE topic_subscriptions (

topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE,

-- Each subscription is associated with an owning module.
module_id BIGINT NOT NULL REFERENCES modules(id),

-- Name of the subscription.
name TEXT UNIQUE NOT NULL,

-- Cursor pointing into the topic_events table.
cursor BIGINT NOT NULL REFERENCES topic_events(id) ON DELETE CASCADE
cursor BIGINT REFERENCES topic_events(id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX topic_subscriptions_module_name_idx ON topic_subscriptions(module_id, name);

CREATE DOMAIN subscriber_key AS TEXT;

-- A subscriber to a topic.
Expand All @@ -373,7 +378,7 @@ CREATE TABLE topic_subscribers (

deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
-- Name of the verb to call on the deployment.
verb TEXT NOT NULL
sink TEXT NOT NULL
);

CREATE DOMAIN lease_key AS TEXT;
Expand Down
13 changes: 7 additions & 6 deletions internal/model/cron_job_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ package model

import (
"errors"
"strings"
)

type CronJobKey = KeyType[CronJobPayload, *CronJobPayload]

func NewCronJobKey(module, verb string) CronJobKey {
return newKey[CronJobPayload](strings.Join([]string{module, verb}, "-"))
return newKey[CronJobPayload](module, verb)
}

func ParseCronJobKey(key string) (CronJobKey, error) { return parseKey[CronJobPayload](key) }

type CronJobPayload struct {
Ref string
Module string
Verb string
}

var _ KeyPayload = (*CronJobPayload)(nil)

func (d *CronJobPayload) Kind() string { return "crn" }
func (d *CronJobPayload) String() string { return d.Ref }
func (d *CronJobPayload) String() string { return d.Module + "-" + d.Verb }
func (d *CronJobPayload) Parse(parts []string) error {
if len(parts) == 0 {
if len(parts) != 2 {
return errors.New("expected <module>-<verb> but got empty string")
}
d.Ref = strings.Join(parts, "-")
d.Module = parts[0]
d.Verb = parts[1]
return nil
}
func (d *CronJobPayload) RandomBytes() int { return 10 }
Loading

0 comments on commit cc211c1

Please sign in to comment.