Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sync mode for subscription events #114

Merged
merged 1 commit into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ client.
WithExitWhenNoSubscription(false).
// WithRetryStatusCodes allow retry the subscription connection when receiving one of these codes
// the input parameter can be number string or range, e.g 4000-5000
WithRetryStatusCodes("4000", "4000-4050")
WithRetryStatusCodes("4000", "4000-4050").
// WithSyncMode subscription messages are executed in sequence (without goroutine)
WithSyncMode(true)
```

#### Subscription Protocols
Expand Down
18 changes: 16 additions & 2 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ type SubscriptionClient struct {
onError func(sc *SubscriptionClient, err error) error
errorChan chan error
exitWhenNoSubscription bool
syncMode bool
keepAliveInterval time.Duration
retryDelay time.Duration
mutex sync.Mutex
Expand Down Expand Up @@ -464,6 +465,12 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti
return sc
}

// WithSyncMode subscription messages are executed in sequence (without goroutine)
func (sc *SubscriptionClient) WithSyncMode(value bool) *SubscriptionClient {
sc.syncMode = value
return sc
}

// Keep alive subroutine to send ping on specified interval
func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) {
ticker := time.NewTicker(interval)
Expand Down Expand Up @@ -806,13 +813,20 @@ func (sc *SubscriptionClient) Run() error {
if sub == nil {
sub = &Subscription{}
}
go func() {

execMessage := func() {
if err := sc.protocol.OnMessage(subContext, *sub, message); err != nil {
sc.errorChan <- err
}

sc.checkSubscriptionStatuses(subContext)
}()
}

if sc.syncMode {
execMessage()
} else {
go execMessage()
}
}
}
}()
Expand Down
12 changes: 11 additions & 1 deletion subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"nhooyr.io/websocket"
)

func TestSubscription_LifeCycleEvents(t *testing.T) {
func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) {

server := subscription_setupServer(8082)
client, subscriptionClient := subscription_setupClients(8082)
msg := randomID()
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
subscriptionClient = subscriptionClient.
WithExitWhenNoSubscription(false).
WithTimeout(3 * time.Second).
WithSyncMode(syncMode).
OnConnected(func() {
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -200,6 +202,14 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
}
}

func TestSubscription_LifeCycleEvents(t *testing.T) {
testSubscription_LifeCycleEvents(t, false)
}

func TestSubscription_WithSyncMode(t *testing.T) {
testSubscription_LifeCycleEvents(t, true)
}

func TestSubscription_WithRetryStatusCodes(t *testing.T) {
stop := make(chan bool)
msg := randomID()
Expand Down