diff --git a/config_test.go b/config_test.go index 792c0416..1c39fc38 100644 --- a/config_test.go +++ b/config_test.go @@ -6,6 +6,12 @@ import ( "github.com/stretchr/testify/require" ) +func TestChannelNotFound(t *testing.T) { + c := DefaultConfig + _, ok := c.channelOpts("xxx") + require.False(t, ok) +} + func TestConfigValidateDefault(t *testing.T) { err := DefaultConfig.Validate() require.NoError(t, err) @@ -39,14 +45,23 @@ func TestConfigValidateDuplicateNamespaceName(t *testing.T) { require.Error(t, err) } -func TestConfigValidateMalformedReciverTopLevel(t *testing.T) { +func TestConfigValidateNoPersonalNamespace(t *testing.T) { + c := DefaultConfig + c.Namespaces = []ChannelNamespace{} + c.UserSubscribeToPersonal = true + c.UserPersonalChannelNamespace = "name" + err := c.Validate() + require.Error(t, err) +} + +func TestConfigValidateMalformedReceiverTopLevel(t *testing.T) { c := DefaultConfig c.HistoryRecover = true err := c.Validate() require.Error(t, err) } -func TestConfigValidateMalformedReciverInNamespace(t *testing.T) { +func TestConfigValidateMalformedReceiverInNamespace(t *testing.T) { c := DefaultConfig c.Namespaces = []ChannelNamespace{ { diff --git a/doc.go b/doc.go index 230be54a..849dae8f 100644 --- a/doc.go +++ b/doc.go @@ -8,7 +8,7 @@ // expects that code inside callbacks will not block. // // Centrifuge library provides several features on top of plain Websocket -// implementation - see full description in library README on Github – +// implementation - read highlights in library README on Github – // https://github.com/centrifugal/centrifuge. // // Also check out examples in repo to see main library concepts in action. diff --git a/handler_sockjs.go b/handler_sockjs.go index fe650c1b..bce6c30a 100644 --- a/handler_sockjs.go +++ b/handler_sockjs.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/centrifugal/centrifuge/internal/cancelctx" + "github.com/gorilla/websocket" "github.com/igm/sockjs-go/sockjs" ) @@ -181,7 +183,7 @@ func (s *SockjsHandler) sockJSHandler(sess sockjs.Session) { ctxCh := make(chan struct{}) defer close(ctxCh) - c, err := NewClient(newCustomCancelContext(sess.Request().Context(), ctxCh), s.node, transport) + c, err := NewClient(cancelctx.New(sess.Request().Context(), ctxCh), s.node, transport) if err != nil { s.node.logger.log(newLogEntry(LogLevelError, "error creating client", map[string]interface{}{"transport": transportSockJS})) return diff --git a/handler_websocket.go b/handler_websocket.go index 3cbb8b2c..f15550da 100644 --- a/handler_websocket.go +++ b/handler_websocket.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/centrifugal/centrifuge/internal/cancelctx" + "github.com/gorilla/websocket" "github.com/centrifugal/centrifuge/internal/timers" @@ -317,7 +319,7 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { ctxCh := make(chan struct{}) defer close(ctxCh) - c, err := NewClient(newCustomCancelContext(r.Context(), ctxCh), s.node, transport) + c, err := NewClient(cancelctx.New(r.Context(), ctxCh), s.node, transport) if err != nil { s.node.logger.log(newLogEntry(LogLevelError, "error creating client", map[string]interface{}{"transport": transportWebsocket})) return diff --git a/context.go b/internal/cancelctx/context.go similarity index 69% rename from context.go rename to internal/cancelctx/context.go index d8120c7c..7796a67b 100644 --- a/context.go +++ b/internal/cancelctx/context.go @@ -1,4 +1,4 @@ -package centrifuge +package cancelctx import ( "context" @@ -21,7 +21,8 @@ func (c customCancelContext) Err() error { } } -// newCustomCancelContext returns a context that will be canceled on channel close. -func newCustomCancelContext(ctx context.Context, ch <-chan struct{}) context.Context { +// New returns a wrapper context around original context that will +// be canceled on channel close. +func New(ctx context.Context, ch <-chan struct{}) context.Context { return customCancelContext{Context: ctx, ch: ch} } diff --git a/internal/cancelctx/context_test.go b/internal/cancelctx/context_test.go new file mode 100644 index 00000000..2ff64a72 --- /dev/null +++ b/internal/cancelctx/context_test.go @@ -0,0 +1,36 @@ +package cancelctx + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCustomContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + customCtx := New(ctx, ch) + require.NoError(t, customCtx.Err()) + dlTime, dlSet := customCtx.Deadline() + require.Zero(t, dlTime) + require.False(t, dlSet) + select { + case <-customCtx.Done(): + require.Fail(t, "must not be cancelled") + default: + } + cancel() + select { + case <-customCtx.Done(): + require.Fail(t, "must not be cancelled") + default: + } + close(ch) + select { + case <-customCtx.Done(): + require.Equal(t, context.Canceled, customCtx.Err()) + default: + require.Fail(t, "must be cancelled") + } +} diff --git a/internal/clientproto/protocol_test.go b/internal/clientproto/protocol_test.go new file mode 100644 index 00000000..d2c6338a --- /dev/null +++ b/internal/clientproto/protocol_test.go @@ -0,0 +1,24 @@ +package clientproto + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/centrifugal/protocol" +) + +func TestPushHelpers(t *testing.T) { + msg := NewMessagePush(protocol.Raw("{}")) + require.NotNil(t, msg) + msg = NewJoinPush("test", protocol.Raw("{}")) + require.NotNil(t, msg) + msg = NewLeavePush("test", protocol.Raw("{}")) + require.NotNil(t, msg) + msg = NewPublicationPush("test", protocol.Raw("{}")) + require.NotNil(t, msg) + msg = NewSubPush("test", protocol.Raw("{}")) + require.NotNil(t, msg) + msg = NewUnsubPush("test", protocol.Raw("{}")) + require.NotNil(t, msg) +} diff --git a/internal/controlproto/controlpb_extra_test.go b/internal/controlproto/controlpb_extra_test.go new file mode 100644 index 00000000..227eb60e --- /dev/null +++ b/internal/controlproto/controlpb_extra_test.go @@ -0,0 +1,68 @@ +package controlproto + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCommandProtoExtra(t *testing.T) { + msg := &Command{ + UID: "test", + Method: MethodTypeDisconnect, + Params: Raw("{}"), + } + + d, b := msg.Method.EnumDescriptor() + require.Equal(t, fileDescriptorControl, d) + require.Equal(t, []int{0}, b) + + require.Equal(t, "test", msg.GetUID()) + require.Equal(t, MethodTypeDisconnect, msg.GetMethod()) + require.NotZero(t, msg.String()) +} + +func TestNodeProtoExtra(t *testing.T) { + msg := &Node{ + UID: "test", + Name: "test name", + Version: "v1.0.0", + NumChannels: 2, + NumClients: 3, + NumUsers: 1, + Uptime: 12, + Metrics: &Metrics{ + Interval: 60, + Items: map[string]float64{ + "item": 1, + }, + }, + } + require.Equal(t, "test", msg.GetUID()) + require.Equal(t, "test name", msg.GetName()) + require.Equal(t, "v1.0.0", msg.GetVersion()) + require.Equal(t, uint32(2), msg.GetNumChannels()) + require.Equal(t, uint32(1), msg.GetNumUsers()) + require.Equal(t, uint32(3), msg.GetNumClients()) + require.Equal(t, uint32(12), msg.GetUptime()) + require.NotNil(t, msg.GetMetrics()) + require.NotZero(t, msg.String()) +} + +func TestDisconnectProtoExtra(t *testing.T) { + msg := &Disconnect{ + User: "test", + } + require.Equal(t, "test", msg.GetUser()) + require.NotZero(t, msg.String()) +} + +func TestUnsubscribeProtoExtra(t *testing.T) { + msg := &Unsubscribe{ + User: "test", + Channel: "test channel", + } + require.Equal(t, "test", msg.GetUser()) + require.Equal(t, "test channel", msg.GetChannel()) + require.NotZero(t, msg.String()) +} diff --git a/internal/dissolve/dissolve_test.go b/internal/dissolve/dissolve_test.go index 80ff1b31..185a2272 100644 --- a/internal/dissolve/dissolve_test.go +++ b/internal/dissolve/dissolve_test.go @@ -2,6 +2,7 @@ package dissolve import ( "errors" + "sync" "testing" "time" ) @@ -10,14 +11,25 @@ func TestDissolver(t *testing.T) { d := New(4) _ = d.Run() defer d.Close() - ch := make(chan struct{}, 1) - err := d.Submit(func() error { - ch <- struct{}{} - return nil - }) - if err != nil { - t.Fatalf("Submit returned error: %v", err) - } + ch := make(chan struct{}) + numJobs := 1024 + var wg sync.WaitGroup + wg.Add(numJobs) + go func() { + for i := 0; i < numJobs; i++ { + err := d.Submit(func() error { + defer wg.Done() + return nil + }) + if err != nil { + t.Fatalf("Submit returned error: %v", err) + } + } + }() + go func() { + wg.Wait() + close(ch) + }() select { case <-ch: case <-time.After(time.Second): diff --git a/internal/dissolve/queue.go b/internal/dissolve/queue.go index 7841ebe9..7f0870fd 100644 --- a/internal/dissolve/queue.go +++ b/internal/dissolve/queue.go @@ -25,10 +25,6 @@ type queue interface { // all goroutines in wait() will return Close() - // CloseRemaining will close the queue and return all entried in the queue. - // All goroutines in wait() will return - CloseRemaining() []Job - // Closed returns true if the queue has been closed // The call cannot guarantee that the queue hasn't been // closed while the function returns, so only "true" has a definite meaning. @@ -40,12 +36,6 @@ type queue interface { // Will return "", false if the queue is closed. // Otherwise the return value of "remove" is returned. Wait() (Job, bool) - - // Cap returns the capacity (without allocations). - Cap() int - - // Len returns the current length of the queue. - Len() int } type queueImpl struct { @@ -121,29 +111,6 @@ func (q *queueImpl) Close() { q.cond.Broadcast() } -// CloseRemaining will close the queue and return all entried in the queue. -// All goroutines in wait() will return. -func (q *queueImpl) CloseRemaining() []Job { - q.mu.Lock() - defer q.mu.Unlock() - if q.closed { - return []Job{} - } - rem := make([]Job, 0, q.cnt) - for q.cnt > 0 { - i := q.nodes[q.head] - q.head = (q.head + 1) % len(q.nodes) - q.cnt-- - rem = append(rem, i) - } - q.closed = true - q.cnt = 0 - q.nodes = nil - q.size = 0 - q.cond.Broadcast() - return rem -} - // Closed returns true if the queue has been closed // The call cannot guarantee that the queue hasn't been // closed while the function returns, so only "true" has a definite meaning. @@ -194,19 +161,3 @@ func (q *queueImpl) Remove() (Job, bool) { q.mu.Unlock() return i, true } - -// Return the capacity (without allocations) -func (q *queueImpl) Cap() int { - q.mu.RLock() - c := cap(q.nodes) - q.mu.RUnlock() - return c -} - -// Return the current length of the queue. -func (q *queueImpl) Len() int { - q.mu.RLock() - l := q.cnt - q.mu.RUnlock() - return l -}