Skip to content

Commit

Permalink
feat: allow configuration of topic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jan 6, 2025
1 parent 7eaf2aa commit e353125
Show file tree
Hide file tree
Showing 17 changed files with 1,198 additions and 918 deletions.
14 changes: 13 additions & 1 deletion backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ func provisionTopic() InMemResourceProvisionerFn {
if len(topicMetas) != 1 {
return nil, fmt.Errorf("expected topic metadata from kafka but received none")
}
partitions := 1
if pm, ok := slices.FindVariant[*schema.MetadataPartitions](topic.Metadata); ok {
partitions = pm.Partitions
}
if topicMetas[0].Err == sarama.ErrUnknownTopicOrPartition {
// No topic exists yet. Create it
err = admin.CreateTopic(topicID, &sarama.TopicDetail{
NumPartitions: 8,
NumPartitions: int32(partitions),
ReplicationFactor: 1,
ReplicaAssignment: nil,
}, false)
Expand All @@ -226,6 +230,14 @@ func provisionTopic() InMemResourceProvisionerFn {
}
} else if topicMetas[0].Err != sarama.ErrNoError {
return nil, fmt.Errorf("failed to describe topic %q: %w", topicID, topicMetas[0].Err)
} else if len(topicMetas[0].Partitions) != partitions {
var plural string
if len(topicMetas[0].Partitions) == 1 {
plural = "partition"
} else {
plural = "partitions"
}
return nil, fmt.Errorf("existing topic %s has %d %s instead of %d", topicID, len(topicMetas[0].Partitions), plural, partitions)
}

return &RuntimeEvent{
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (PartitionMapper) PartitionKey(event PubSubEvent) string {
return event.Time.String()
}

//ftl:export
//ftl:topic export partitions=10
type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]
Expand Down
Loading

0 comments on commit e353125

Please sign in to comment.