Skip to content

Commit

Permalink
Fix: notify closed client
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Oct 31, 2023
1 parent 6ab0cbb commit b7ca47c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
10 changes: 10 additions & 0 deletions cmd/api/handler/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"net"
"sync/atomic"
"time"

"github.com/dipdup-io/workerpool"
Expand Down Expand Up @@ -43,14 +44,19 @@ type Client struct {
filters *Filters
ch chan any
g workerpool.Group

closed *atomic.Bool
}

func newClient(id uint64, manager *Manager) *Client {
closed := new(atomic.Bool)
closed.Store(false)
return &Client{
id: id,
manager: manager,
ch: make(chan any, 1024),
g: workerpool.NewGroup(),
closed: closed,
}
}

Expand Down Expand Up @@ -102,11 +108,15 @@ func (c *Client) DetachFilters(msg Unsubscribe) error {
}

func (c *Client) Notify(msg any) {
if c.closed.Load() {
return
}
c.ch <- msg
}

func (c *Client) Close() error {
c.g.Wait()
c.closed.Store(true)
close(c.ch)
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions cmd/api/handler/websocket/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package websocket

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNotifyClosedClient(t *testing.T) {
client := newClient(10, nil)
err := client.Close()
require.NoError(t, err, "closing client")
client.Notify("test")
}
12 changes: 6 additions & 6 deletions cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func initHandlers(ctx context.Context, e *echo.Echo, cfg Config, db postgres.Sto

v1.GET("/swagger/*", echoSwagger.WrapHandler)

// initWebsocket(ctx, db, v1)
initWebsocket(ctx, db, v1)

log.Info().Msg("API routes:")
for _, route := range e.Routes() {
Expand All @@ -300,8 +300,8 @@ var (
wsManager *websocket.Manager
)

// func initWebsocket(ctx context.Context, db postgres.Storage, group *echo.Group) {
// wsManager = websocket.NewManager(db, db.Blocks, db.Tx)
// wsManager.Start(ctx)
// group.GET("/ws", wsManager.Handle)
// }
func initWebsocket(ctx context.Context, db postgres.Storage, group *echo.Group) {
wsManager = websocket.NewManager(db, db.Blocks, db.Tx)
wsManager.Start(ctx)
group.GET("/ws", wsManager.Handle)
}

0 comments on commit b7ca47c

Please sign in to comment.