diff --git a/go-runtime/ftl/ftltest/pubsub.go b/go-runtime/ftl/ftltest/pubsub.go index 26800d45cb..3f306747b9 100644 --- a/go-runtime/ftl/ftltest/pubsub.go +++ b/go-runtime/ftl/ftltest/pubsub.go @@ -17,6 +17,13 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) +type topicState struct { + // events published to this topic + events []any + // tracks the number of live subscriptions for this topic + subscriptionCount int +} + type fakePubSub struct { // all pubsub events are processed through globalTopic globalTopic *pubsub.Topic[pubSubEvent] @@ -25,7 +32,7 @@ type fakePubSub struct { // pubSubLock required to access [topics, subscriptions, subscribers] pubSubLock sync.Mutex - topics map[schema.RefKey][]any + topics map[schema.RefKey]*topicState subscriptions map[string]*subscription subscribers map[string][]subscriber } @@ -33,7 +40,7 @@ type fakePubSub struct { func newFakePubSub(ctx context.Context) *fakePubSub { f := &fakePubSub{ globalTopic: pubsub.New[pubSubEvent](), - topics: map[schema.RefKey][]any{}, + topics: map[schema.RefKey]*topicState{}, subscriptions: map[string]*subscription{}, subscribers: map[string][]subscriber{}, } @@ -42,10 +49,24 @@ func newFakePubSub(ctx context.Context) *fakePubSub { } func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error { + // tracks event publication to a topic f.publishWaitGroup.Add(1) + // tracks event subscription consumption completion + f.publishWaitGroup.Add(f.fetchTopicState(topic).subscriptionCount) return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event}) } +func (f *fakePubSub) fetchTopicState(topic *schema.Ref) *topicState { + ts, ok := f.topics[topic.ToRefKey()] + if !ok { + ts = &topicState{ + events: []any{}, + } + f.topics[topic.ToRefKey()] = ts + } + return ts +} + // addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) { f.pubSubLock.Lock() @@ -57,6 +78,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl topic: sub.Topic, errors: map[int]error{}, } + f.fetchTopicState(sub.Topic).subscriptionCount++ } f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error { @@ -77,11 +99,8 @@ func eventsForTopic[E any](ctx context.Context, f *fakePubSub, topic ftl.TopicHa logger := log.FromContext(ctx).Scope("pubsub") var events = []E{} - raw, ok := f.topics[topic.Ref.ToRefKey()] - if !ok { - return events - } - for _, e := range raw { + ts := f.fetchTopicState(topic.Ref) + for _, e := range ts.events { if event, ok := e.(E); ok { events = append(events, event) } else { @@ -103,17 +122,13 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft if !ok { return results } - topic, ok := f.topics[handle.Topic.ToRefKey()] - if !ok { - return results - } - + ts := f.fetchTopicState(subscription.topic) count := subscription.cursor.Default(-1) if !subscription.isExecuting { count++ } for i := range count { - e := topic[i] + e := ts.events[i] if event, ok := e.(E); ok { result := SubscriptionResult[E]{ Event: event, @@ -155,11 +170,9 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { switch event := e.(type) { case publishEvent: logger.Debugf("publishing to %s: %v", event.topic.Name, event.content) - if _, ok := f.topics[event.topic.ToRefKey()]; !ok { - f.topics[event.topic.ToRefKey()] = []any{event.content} - } else { - f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content) - } + ts := f.fetchTopicState(event.topic) + ts.events = append(ts.events, event.content) + // indicate that the event has been published to the topic f.publishWaitGroup.Done() case subscriptionDidConsumeEvent: sub, ok := f.subscriptions[event.subscription] @@ -170,6 +183,8 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { sub.errors[sub.cursor.MustGet()] = event.err } sub.isExecuting = false + // indicate that the subscription has processed the event + f.publishWaitGroup.Done() } for _, sub := range f.subscriptions { @@ -177,13 +192,13 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { // already executing continue } - topicEvents, ok := f.topics[sub.topic.ToRefKey()] - if !ok { - // no events publshed yet + ts := f.fetchTopicState(sub.topic) + if len(ts.events) == 0 { + // no events published yet continue } var cursor = sub.cursor.Default(-1) - if len(topicEvents) <= cursor+1 { + if len(ts.events) <= cursor+1 { // no new events continue } @@ -200,7 +215,7 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { go func(sub string, chosenSubscriber subscriber, event any) { err := chosenSubscriber(ctx, event) f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err}) - }(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()]) + }(sub.name, chosenSubscriber, ts.events[sub.cursor.MustGet()]) } } @@ -237,13 +252,13 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr } remaining := []remainingState{} for _, sub := range f.subscriptions { - topicEvents, ok := f.topics[sub.topic.ToRefKey()] - if !ok { - // no events publshed yet + ts := f.fetchTopicState(sub.topic) + if len(ts.events) == 0 { + // no events published yet continue } var cursor = sub.cursor.Default(-1) - if !sub.isExecuting && len(topicEvents) <= cursor+1 { + if !sub.isExecuting && len(ts.events) <= cursor+1 { // all events have been consumed continue } @@ -255,7 +270,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr remaining = append(remaining, remainingState{ name: sub.name, isExecuting: sub.isExecuting, - pendingEvents: len(topicEvents) - cursor - 1, + pendingEvents: len(ts.events) - cursor - 1, }) } if len(remaining) == 0 { diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go index 5acde43143..7a5a535368 100644 --- a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go @@ -10,8 +10,15 @@ import ( ) //ftl:export -var Topic = ftl.Topic[Event]("topic") -var subscription = ftl.Subscription(Topic, "subscription") +var Topic1 = ftl.Topic[Event]("topic1") + +//ftl:export +var Topic2 = ftl.Topic[Event]("topic2") + +var subscription1_1 = ftl.Subscription(Topic1, "subscription_1_1") +var subscription1_2 = ftl.Subscription(Topic1, "subscription_1_2") +var subscription2_1 = ftl.Subscription(Topic2, "subscription_2_1") +var subscription2_2 = ftl.Subscription(Topic2, "subscription_2_3") //ftl:data type Event struct { @@ -20,10 +27,20 @@ type Event struct { //ftl:verb func PublishToTopicOne(ctx context.Context, event Event) error { - return Topic.Publish(ctx, event) + return Topic1.Publish(ctx, event) +} + +//ftl:verb +func PropagateToTopic2(ctx context.Context, event Event) error { + return Topic2.Publish(ctx, event) +} + +//ftl:verb +func ConsumeEvent(_ context.Context, _ Event) error { + return nil } -//ftl:subscribe subscription +//ftl:subscribe subscription_1_1 func ErrorsAfterASecond(ctx context.Context, event Event) error { time.Sleep(1 * time.Second) return fmt.Errorf("SubscriberThatFails always fails") diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go index 78ce174464..d77467cf6a 100644 --- a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go @@ -15,15 +15,58 @@ import ( func TestSubscriberReturningErrors(t *testing.T) { // Test that we can publish multiple events, which will take time to consume, and that we track each error ctx := ftltest.Context( - ftltest.WithSubscriber(subscription, ErrorsAfterASecond), + ftltest.WithSubscriber(subscription1_1, ErrorsAfterASecond), ) count := 5 for i := 0; i < count; i++ { assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) } ftltest.WaitForSubscriptionsToComplete(ctx) - assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription))) - assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic))) + assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription1_1))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on a single subscription +// and does NOT register any subscribers against topic 2's subscription +func TestForwardedEvent(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic2))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions +// and does NOT register any subscribers against topic 2's subscriptions +func TestPropagatedEvent(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ftltest.WithSubscriber(subscription1_2, PropagateToTopic2), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions +// and consumes from topic 2 via two subscriptions +func TestPropagationNetwork(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ftltest.WithSubscriber(subscription1_2, PropagateToTopic2), + ftltest.WithSubscriber(subscription2_1, ConsumeEvent), + ftltest.WithSubscriber(subscription2_2, ConsumeEvent), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2))) } func TestMultipleMultipleFakeSubscribers(t *testing.T) { @@ -32,13 +75,13 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) { var counter atomic.Value[int] ctx := ftltest.Context( - ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error { ftl.LoggerFromContext(ctx).Infof("Fake Subscriber A") current := counter.Load() counter.Store(current + 1) return nil }), - ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error { ftl.LoggerFromContext(ctx).Infof("Fake Subscriber B") current := counter.Load() counter.Store(current + 1) @@ -49,7 +92,7 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) { assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) } ftltest.WaitForSubscriptionsToComplete(ctx) - assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription))) - assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic))) + assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription1_1))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1))) assert.Equal(t, count, counter.Load()) } diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go index 9a7c5fa22e..a034921dca 100644 --- a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go @@ -6,4 +6,4 @@ import ( "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. ) -var _ = ftl.Subscription(pubsub.Topic, "subscription") +var _ = ftl.Subscription(pubsub.Topic1, "subscription1_1") diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go index e85e6bf7a0..970fdea989 100644 --- a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go @@ -2,17 +2,20 @@ package subscriber_test import ( "ftl/pubsub" - "testing" - "github.com/TBD54566975/ftl/go-runtime/ftl/ftltest" "github.com/alecthomas/assert/v2" + "testing" ) func TestPublishToExternalModule(t *testing.T) { ctx := ftltest.Context() - assert.NoError(t, pubsub.Topic.Publish(ctx, pubsub.Event{Value: "external"})) - assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic))) + + assert.NoError(t, pubsub.Topic1.Publish(ctx, pubsub.Event{Value: "external"})) + + ftltest.WaitForSubscriptionsToComplete(ctx) + + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic1))) // Make sure we correctly made the right ref for the external module. - assert.Equal(t, "pubsub", pubsub.Topic.Ref.Module) + assert.Equal(t, "pubsub", pubsub.Topic1.Ref.Module) }