From c6d889c82aa846c1f98a53c86861e7b63dcc6e56 Mon Sep 17 00:00:00 2001 From: Jon Johnson <113393155+jonathanj-square@users.noreply.github.com> Date: Wed, 14 Aug 2024 20:35:37 -0700 Subject: [PATCH] fix: addresses event completion race condition in pubsub test utility (#2332) fixes: #2266 The race conditions was caused by two issues. 1. The WaitGroup down tick was not synchronized with the completion of the subscriber verb execution. Failure to wait for completion may result in a down time coming before a new event is dispatched (which ticks up the waitgroup) - which may result in the WaitGroup reaching zero too soon. 2. Non-linear pubsub networks are not supported by simply counting up when an event is published and down after it is completed. The up count needs to match the number of live subscriptions (e.g. subscriptions with registered subscribers) This synchronization scheme does not cover asynchronous event dispatch (e.g. events dispatched via go routines within the subscriber verb) such scenarios introduce a race condition that cannot be resolved by black box external synchronization. --- go-runtime/ftl/ftltest/pubsub.go | 71 +++++++++++-------- .../ftl/ftltest/testdata/go/pubsub/pubsub.go | 25 +++++-- .../ftltest/testdata/go/pubsub/pubsub_test.go | 57 +++++++++++++-- .../testdata/go/subscriber/subscriber.go | 2 +- .../testdata/go/subscriber/subscriber_test.go | 13 ++-- 5 files changed, 123 insertions(+), 45 deletions(-) diff --git a/go-runtime/ftl/ftltest/pubsub.go b/go-runtime/ftl/ftltest/pubsub.go index 26800d45cb..3f306747b9 100644 --- a/go-runtime/ftl/ftltest/pubsub.go +++ b/go-runtime/ftl/ftltest/pubsub.go @@ -17,6 +17,13 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) +type topicState struct { + // events published to this topic + events []any + // tracks the number of live subscriptions for this topic + subscriptionCount int +} + type fakePubSub struct { // all pubsub events are processed through globalTopic globalTopic *pubsub.Topic[pubSubEvent] @@ -25,7 +32,7 @@ type fakePubSub struct { // pubSubLock required to access [topics, subscriptions, subscribers] pubSubLock sync.Mutex - topics map[schema.RefKey][]any + topics map[schema.RefKey]*topicState subscriptions map[string]*subscription subscribers map[string][]subscriber } @@ -33,7 +40,7 @@ type fakePubSub struct { func newFakePubSub(ctx context.Context) *fakePubSub { f := &fakePubSub{ globalTopic: pubsub.New[pubSubEvent](), - topics: map[schema.RefKey][]any{}, + topics: map[schema.RefKey]*topicState{}, subscriptions: map[string]*subscription{}, subscribers: map[string][]subscriber{}, } @@ -42,10 +49,24 @@ func newFakePubSub(ctx context.Context) *fakePubSub { } func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error { + // tracks event publication to a topic f.publishWaitGroup.Add(1) + // tracks event subscription consumption completion + f.publishWaitGroup.Add(f.fetchTopicState(topic).subscriptionCount) return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event}) } +func (f *fakePubSub) fetchTopicState(topic *schema.Ref) *topicState { + ts, ok := f.topics[topic.ToRefKey()] + if !ok { + ts = &topicState{ + events: []any{}, + } + f.topics[topic.ToRefKey()] = ts + } + return ts +} + // addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) { f.pubSubLock.Lock() @@ -57,6 +78,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl topic: sub.Topic, errors: map[int]error{}, } + f.fetchTopicState(sub.Topic).subscriptionCount++ } f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error { @@ -77,11 +99,8 @@ func eventsForTopic[E any](ctx context.Context, f *fakePubSub, topic ftl.TopicHa logger := log.FromContext(ctx).Scope("pubsub") var events = []E{} - raw, ok := f.topics[topic.Ref.ToRefKey()] - if !ok { - return events - } - for _, e := range raw { + ts := f.fetchTopicState(topic.Ref) + for _, e := range ts.events { if event, ok := e.(E); ok { events = append(events, event) } else { @@ -103,17 +122,13 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft if !ok { return results } - topic, ok := f.topics[handle.Topic.ToRefKey()] - if !ok { - return results - } - + ts := f.fetchTopicState(subscription.topic) count := subscription.cursor.Default(-1) if !subscription.isExecuting { count++ } for i := range count { - e := topic[i] + e := ts.events[i] if event, ok := e.(E); ok { result := SubscriptionResult[E]{ Event: event, @@ -155,11 +170,9 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { switch event := e.(type) { case publishEvent: logger.Debugf("publishing to %s: %v", event.topic.Name, event.content) - if _, ok := f.topics[event.topic.ToRefKey()]; !ok { - f.topics[event.topic.ToRefKey()] = []any{event.content} - } else { - f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content) - } + ts := f.fetchTopicState(event.topic) + ts.events = append(ts.events, event.content) + // indicate that the event has been published to the topic f.publishWaitGroup.Done() case subscriptionDidConsumeEvent: sub, ok := f.subscriptions[event.subscription] @@ -170,6 +183,8 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { sub.errors[sub.cursor.MustGet()] = event.err } sub.isExecuting = false + // indicate that the subscription has processed the event + f.publishWaitGroup.Done() } for _, sub := range f.subscriptions { @@ -177,13 +192,13 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { // already executing continue } - topicEvents, ok := f.topics[sub.topic.ToRefKey()] - if !ok { - // no events publshed yet + ts := f.fetchTopicState(sub.topic) + if len(ts.events) == 0 { + // no events published yet continue } var cursor = sub.cursor.Default(-1) - if len(topicEvents) <= cursor+1 { + if len(ts.events) <= cursor+1 { // no new events continue } @@ -200,7 +215,7 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) { go func(sub string, chosenSubscriber subscriber, event any) { err := chosenSubscriber(ctx, event) f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err}) - }(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()]) + }(sub.name, chosenSubscriber, ts.events[sub.cursor.MustGet()]) } } @@ -237,13 +252,13 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr } remaining := []remainingState{} for _, sub := range f.subscriptions { - topicEvents, ok := f.topics[sub.topic.ToRefKey()] - if !ok { - // no events publshed yet + ts := f.fetchTopicState(sub.topic) + if len(ts.events) == 0 { + // no events published yet continue } var cursor = sub.cursor.Default(-1) - if !sub.isExecuting && len(topicEvents) <= cursor+1 { + if !sub.isExecuting && len(ts.events) <= cursor+1 { // all events have been consumed continue } @@ -255,7 +270,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr remaining = append(remaining, remainingState{ name: sub.name, isExecuting: sub.isExecuting, - pendingEvents: len(topicEvents) - cursor - 1, + pendingEvents: len(ts.events) - cursor - 1, }) } if len(remaining) == 0 { diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go index 5acde43143..7a5a535368 100644 --- a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go @@ -10,8 +10,15 @@ import ( ) //ftl:export -var Topic = ftl.Topic[Event]("topic") -var subscription = ftl.Subscription(Topic, "subscription") +var Topic1 = ftl.Topic[Event]("topic1") + +//ftl:export +var Topic2 = ftl.Topic[Event]("topic2") + +var subscription1_1 = ftl.Subscription(Topic1, "subscription_1_1") +var subscription1_2 = ftl.Subscription(Topic1, "subscription_1_2") +var subscription2_1 = ftl.Subscription(Topic2, "subscription_2_1") +var subscription2_2 = ftl.Subscription(Topic2, "subscription_2_3") //ftl:data type Event struct { @@ -20,10 +27,20 @@ type Event struct { //ftl:verb func PublishToTopicOne(ctx context.Context, event Event) error { - return Topic.Publish(ctx, event) + return Topic1.Publish(ctx, event) +} + +//ftl:verb +func PropagateToTopic2(ctx context.Context, event Event) error { + return Topic2.Publish(ctx, event) +} + +//ftl:verb +func ConsumeEvent(_ context.Context, _ Event) error { + return nil } -//ftl:subscribe subscription +//ftl:subscribe subscription_1_1 func ErrorsAfterASecond(ctx context.Context, event Event) error { time.Sleep(1 * time.Second) return fmt.Errorf("SubscriberThatFails always fails") diff --git a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go index 78ce174464..d77467cf6a 100644 --- a/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go +++ b/go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go @@ -15,15 +15,58 @@ import ( func TestSubscriberReturningErrors(t *testing.T) { // Test that we can publish multiple events, which will take time to consume, and that we track each error ctx := ftltest.Context( - ftltest.WithSubscriber(subscription, ErrorsAfterASecond), + ftltest.WithSubscriber(subscription1_1, ErrorsAfterASecond), ) count := 5 for i := 0; i < count; i++ { assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) } ftltest.WaitForSubscriptionsToComplete(ctx) - assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription))) - assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic))) + assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription1_1))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on a single subscription +// and does NOT register any subscribers against topic 2's subscription +func TestForwardedEvent(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic2))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions +// and does NOT register any subscribers against topic 2's subscriptions +func TestPropagatedEvent(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ftltest.WithSubscriber(subscription1_2, PropagateToTopic2), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2))) +} + +// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions +// and consumes from topic 2 via two subscriptions +func TestPropagationNetwork(t *testing.T) { + // Test that we can publish multiple events, which will take time to consume, and that we track each error + ctx := ftltest.Context( + ftltest.WithSubscriber(subscription1_1, PropagateToTopic2), + ftltest.WithSubscriber(subscription1_2, PropagateToTopic2), + ftltest.WithSubscriber(subscription2_1, ConsumeEvent), + ftltest.WithSubscriber(subscription2_2, ConsumeEvent), + ) + assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"})) + ftltest.WaitForSubscriptionsToComplete(ctx) + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1))) + assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2))) } func TestMultipleMultipleFakeSubscribers(t *testing.T) { @@ -32,13 +75,13 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) { var counter atomic.Value[int] ctx := ftltest.Context( - ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error { ftl.LoggerFromContext(ctx).Infof("Fake Subscriber A") current := counter.Load() counter.Store(current + 1) return nil }), - ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error { + ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error { ftl.LoggerFromContext(ctx).Infof("Fake Subscriber B") current := counter.Load() counter.Store(current + 1) @@ -49,7 +92,7 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) { assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)})) } ftltest.WaitForSubscriptionsToComplete(ctx) - assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription))) - assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic))) + assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription1_1))) + assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1))) assert.Equal(t, count, counter.Load()) } diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go index 9a7c5fa22e..a034921dca 100644 --- a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber.go @@ -6,4 +6,4 @@ import ( "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. ) -var _ = ftl.Subscription(pubsub.Topic, "subscription") +var _ = ftl.Subscription(pubsub.Topic1, "subscription1_1") diff --git a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go index e85e6bf7a0..970fdea989 100644 --- a/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go +++ b/go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go @@ -2,17 +2,20 @@ package subscriber_test import ( "ftl/pubsub" - "testing" - "github.com/TBD54566975/ftl/go-runtime/ftl/ftltest" "github.com/alecthomas/assert/v2" + "testing" ) func TestPublishToExternalModule(t *testing.T) { ctx := ftltest.Context() - assert.NoError(t, pubsub.Topic.Publish(ctx, pubsub.Event{Value: "external"})) - assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic))) + + assert.NoError(t, pubsub.Topic1.Publish(ctx, pubsub.Event{Value: "external"})) + + ftltest.WaitForSubscriptionsToComplete(ctx) + + assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic1))) // Make sure we correctly made the right ref for the external module. - assert.Equal(t, "pubsub", pubsub.Topic.Ref.Module) + assert.Equal(t, "pubsub", pubsub.Topic1.Ref.Module) }