Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: addresses event completion race condition in pubsub test utility #2332

Merged
merged 7 commits into from
Aug 15, 2024
71 changes: 43 additions & 28 deletions go-runtime/ftl/ftltest/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

type topicState struct {
// events published to this topic
events []any
// tracks the number of live subscriptions for this topic
subscriptionCount int
}

type fakePubSub struct {
// all pubsub events are processed through globalTopic
globalTopic *pubsub.Topic[pubSubEvent]
Expand All @@ -25,15 +32,15 @@ type fakePubSub struct {

// pubSubLock required to access [topics, subscriptions, subscribers]
pubSubLock sync.Mutex
topics map[schema.RefKey][]any
topics map[schema.RefKey]*topicState
subscriptions map[string]*subscription
subscribers map[string][]subscriber
}

func newFakePubSub(ctx context.Context) *fakePubSub {
f := &fakePubSub{
globalTopic: pubsub.New[pubSubEvent](),
topics: map[schema.RefKey][]any{},
topics: map[schema.RefKey]*topicState{},
subscriptions: map[string]*subscription{},
subscribers: map[string][]subscriber{},
}
Expand All @@ -42,10 +49,24 @@ func newFakePubSub(ctx context.Context) *fakePubSub {
}

func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error {
// tracks event publication to a topic
f.publishWaitGroup.Add(1)
// tracks event subscription consumption completion
f.publishWaitGroup.Add(f.fetchTopicState(topic).subscriptionCount)
return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
}

func (f *fakePubSub) fetchTopicState(topic *schema.Ref) *topicState {
ts, ok := f.topics[topic.ToRefKey()]
if !ok {
ts = &topicState{
events: []any{},
}
f.topics[topic.ToRefKey()] = ts
}
return ts
}

// addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added
func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) {
f.pubSubLock.Lock()
Expand All @@ -57,6 +78,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl
topic: sub.Topic,
errors: map[int]error{},
}
f.fetchTopicState(sub.Topic).subscriptionCount++
}

f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error {
Expand All @@ -77,11 +99,8 @@ func eventsForTopic[E any](ctx context.Context, f *fakePubSub, topic ftl.TopicHa

logger := log.FromContext(ctx).Scope("pubsub")
var events = []E{}
raw, ok := f.topics[topic.Ref.ToRefKey()]
if !ok {
return events
}
for _, e := range raw {
ts := f.fetchTopicState(topic.Ref)
for _, e := range ts.events {
if event, ok := e.(E); ok {
events = append(events, event)
} else {
Expand All @@ -103,17 +122,13 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft
if !ok {
return results
}
topic, ok := f.topics[handle.Topic.ToRefKey()]
if !ok {
return results
}

ts := f.fetchTopicState(subscription.topic)
count := subscription.cursor.Default(-1)
if !subscription.isExecuting {
count++
}
for i := range count {
e := topic[i]
e := ts.events[i]
if event, ok := e.(E); ok {
result := SubscriptionResult[E]{
Event: event,
Expand Down Expand Up @@ -155,11 +170,9 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
switch event := e.(type) {
case publishEvent:
logger.Debugf("publishing to %s: %v", event.topic.Name, event.content)
if _, ok := f.topics[event.topic.ToRefKey()]; !ok {
f.topics[event.topic.ToRefKey()] = []any{event.content}
} else {
f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content)
}
ts := f.fetchTopicState(event.topic)
ts.events = append(ts.events, event.content)
// indicate that the event has been published to the topic
f.publishWaitGroup.Done()
case subscriptionDidConsumeEvent:
sub, ok := f.subscriptions[event.subscription]
Expand All @@ -170,20 +183,22 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
sub.errors[sub.cursor.MustGet()] = event.err
}
sub.isExecuting = false
// indicate that the subscription has processed the event
f.publishWaitGroup.Done()
}

for _, sub := range f.subscriptions {
if sub.isExecuting {
// already executing
continue
}
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
ts := f.fetchTopicState(sub.topic)
if len(ts.events) == 0 {
// no events published yet
continue
}
var cursor = sub.cursor.Default(-1)
if len(topicEvents) <= cursor+1 {
if len(ts.events) <= cursor+1 {
// no new events
continue
}
Expand All @@ -200,7 +215,7 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
go func(sub string, chosenSubscriber subscriber, event any) {
err := chosenSubscriber(ctx, event)
f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err})
}(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()])
}(sub.name, chosenSubscriber, ts.events[sub.cursor.MustGet()])
}
}

Expand Down Expand Up @@ -237,13 +252,13 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr
}
remaining := []remainingState{}
for _, sub := range f.subscriptions {
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
ts := f.fetchTopicState(sub.topic)
if len(ts.events) == 0 {
// no events published yet
continue
}
var cursor = sub.cursor.Default(-1)
if !sub.isExecuting && len(topicEvents) <= cursor+1 {
if !sub.isExecuting && len(ts.events) <= cursor+1 {
// all events have been consumed
continue
}
Expand All @@ -255,7 +270,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr
remaining = append(remaining, remainingState{
name: sub.name,
isExecuting: sub.isExecuting,
pendingEvents: len(topicEvents) - cursor - 1,
pendingEvents: len(ts.events) - cursor - 1,
})
}
if len(remaining) == 0 {
Expand Down
25 changes: 21 additions & 4 deletions go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
)

//ftl:export
var Topic = ftl.Topic[Event]("topic")
var subscription = ftl.Subscription(Topic, "subscription")
var Topic1 = ftl.Topic[Event]("topic1")

//ftl:export
var Topic2 = ftl.Topic[Event]("topic2")

var subscription1_1 = ftl.Subscription(Topic1, "subscription_1_1")
var subscription1_2 = ftl.Subscription(Topic1, "subscription_1_2")
var subscription2_1 = ftl.Subscription(Topic2, "subscription_2_1")
var subscription2_2 = ftl.Subscription(Topic2, "subscription_2_3")

//ftl:data
type Event struct {
Expand All @@ -20,10 +27,20 @@ type Event struct {

//ftl:verb
func PublishToTopicOne(ctx context.Context, event Event) error {
return Topic.Publish(ctx, event)
return Topic1.Publish(ctx, event)
}

//ftl:verb
func PropagateToTopic2(ctx context.Context, event Event) error {
return Topic2.Publish(ctx, event)
}

//ftl:verb
func ConsumeEvent(_ context.Context, _ Event) error {
return nil
}

//ftl:subscribe subscription
//ftl:subscribe subscription_1_1
func ErrorsAfterASecond(ctx context.Context, event Event) error {
time.Sleep(1 * time.Second)
return fmt.Errorf("SubscriberThatFails always fails")
Expand Down
57 changes: 50 additions & 7 deletions go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,58 @@ import (
func TestSubscriberReturningErrors(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription, ErrorsAfterASecond),
ftltest.WithSubscriber(subscription1_1, ErrorsAfterASecond),
)
count := 5
for i := 0; i < count; i++ {
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)}))
}
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic)))
assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription1_1)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on a single subscription
// and does NOT register any subscribers against topic 2's subscription
func TestForwardedEvent(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic2)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions
// and does NOT register any subscribers against topic 2's subscriptions
func TestPropagatedEvent(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
ftltest.WithSubscriber(subscription1_2, PropagateToTopic2),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions
// and consumes from topic 2 via two subscriptions
func TestPropagationNetwork(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
ftltest.WithSubscriber(subscription1_2, PropagateToTopic2),
ftltest.WithSubscriber(subscription2_1, ConsumeEvent),
ftltest.WithSubscriber(subscription2_2, ConsumeEvent),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2)))
}

func TestMultipleMultipleFakeSubscribers(t *testing.T) {
Expand All @@ -32,13 +75,13 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) {
var counter atomic.Value[int]

ctx := ftltest.Context(
ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error {
ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error {
ftl.LoggerFromContext(ctx).Infof("Fake Subscriber A")
current := counter.Load()
counter.Store(current + 1)
return nil
}),
ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error {
ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error {
ftl.LoggerFromContext(ctx).Infof("Fake Subscriber B")
current := counter.Load()
counter.Store(current + 1)
Expand All @@ -49,7 +92,7 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) {
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)}))
}
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic)))
assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription1_1)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, count, counter.Load())
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import (
"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

var _ = ftl.Subscription(pubsub.Topic, "subscription")
var _ = ftl.Subscription(pubsub.Topic1, "subscription1_1")
13 changes: 8 additions & 5 deletions go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package subscriber_test

import (
"ftl/pubsub"
"testing"

"github.com/TBD54566975/ftl/go-runtime/ftl/ftltest"
"github.com/alecthomas/assert/v2"
"testing"
)

func TestPublishToExternalModule(t *testing.T) {
ctx := ftltest.Context()
assert.NoError(t, pubsub.Topic.Publish(ctx, pubsub.Event{Value: "external"}))
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic)))

assert.NoError(t, pubsub.Topic1.Publish(ctx, pubsub.Event{Value: "external"}))

ftltest.WaitForSubscriptionsToComplete(ctx)

assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic1)))

// Make sure we correctly made the right ref for the external module.
assert.Equal(t, "pubsub", pubsub.Topic.Ref.Module)
assert.Equal(t, "pubsub", pubsub.Topic1.Ref.Module)
}