Skip to content

Commit

Permalink
more tests, minor clean ups
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Apr 29, 2020
1 parent b837d83 commit b500398
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 65 deletions.
19 changes: 17 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/stretchr/testify/require"
)

func TestChannelNotFound(t *testing.T) {
c := DefaultConfig
_, ok := c.channelOpts("xxx")
require.False(t, ok)
}

func TestConfigValidateDefault(t *testing.T) {
err := DefaultConfig.Validate()
require.NoError(t, err)
Expand Down Expand Up @@ -39,14 +45,23 @@ func TestConfigValidateDuplicateNamespaceName(t *testing.T) {
require.Error(t, err)
}

func TestConfigValidateMalformedReciverTopLevel(t *testing.T) {
func TestConfigValidateNoPersonalNamespace(t *testing.T) {
c := DefaultConfig
c.Namespaces = []ChannelNamespace{}
c.UserSubscribeToPersonal = true
c.UserPersonalChannelNamespace = "name"
err := c.Validate()
require.Error(t, err)
}

func TestConfigValidateMalformedReceiverTopLevel(t *testing.T) {
c := DefaultConfig
c.HistoryRecover = true
err := c.Validate()
require.Error(t, err)
}

func TestConfigValidateMalformedReciverInNamespace(t *testing.T) {
func TestConfigValidateMalformedReceiverInNamespace(t *testing.T) {
c := DefaultConfig
c.Namespaces = []ChannelNamespace{
{
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// expects that code inside callbacks will not block.
//
// Centrifuge library provides several features on top of plain Websocket
// implementation - see full description in library README on Github –
// implementation - read highlights in library README on Github –
// https://github.com/centrifugal/centrifuge.
//
// Also check out examples in repo to see main library concepts in action.
Expand Down
4 changes: 3 additions & 1 deletion handler_sockjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/centrifugal/centrifuge/internal/cancelctx"

"github.com/gorilla/websocket"
"github.com/igm/sockjs-go/sockjs"
)
Expand Down Expand Up @@ -181,7 +183,7 @@ func (s *SockjsHandler) sockJSHandler(sess sockjs.Session) {

ctxCh := make(chan struct{})
defer close(ctxCh)
c, err := NewClient(newCustomCancelContext(sess.Request().Context(), ctxCh), s.node, transport)
c, err := NewClient(cancelctx.New(sess.Request().Context(), ctxCh), s.node, transport)
if err != nil {
s.node.logger.log(newLogEntry(LogLevelError, "error creating client", map[string]interface{}{"transport": transportSockJS}))
return
Expand Down
4 changes: 3 additions & 1 deletion handler_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/centrifugal/centrifuge/internal/cancelctx"

"github.com/gorilla/websocket"

"github.com/centrifugal/centrifuge/internal/timers"
Expand Down Expand Up @@ -317,7 +319,7 @@ func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
ctxCh := make(chan struct{})
defer close(ctxCh)

c, err := NewClient(newCustomCancelContext(r.Context(), ctxCh), s.node, transport)
c, err := NewClient(cancelctx.New(r.Context(), ctxCh), s.node, transport)
if err != nil {
s.node.logger.log(newLogEntry(LogLevelError, "error creating client", map[string]interface{}{"transport": transportWebsocket}))
return
Expand Down
7 changes: 4 additions & 3 deletions context.go → internal/cancelctx/context.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package centrifuge
package cancelctx

import (
"context"
Expand All @@ -21,7 +21,8 @@ func (c customCancelContext) Err() error {
}
}

// newCustomCancelContext returns a context that will be canceled on channel close.
func newCustomCancelContext(ctx context.Context, ch <-chan struct{}) context.Context {
// New returns a wrapper context around original context that will
// be canceled on channel close.
func New(ctx context.Context, ch <-chan struct{}) context.Context {
return customCancelContext{Context: ctx, ch: ch}
}
36 changes: 36 additions & 0 deletions internal/cancelctx/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cancelctx

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestCustomContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
customCtx := New(ctx, ch)
require.NoError(t, customCtx.Err())
dlTime, dlSet := customCtx.Deadline()
require.Zero(t, dlTime)
require.False(t, dlSet)
select {
case <-customCtx.Done():
require.Fail(t, "must not be cancelled")
default:
}
cancel()
select {
case <-customCtx.Done():
require.Fail(t, "must not be cancelled")
default:
}
close(ch)
select {
case <-customCtx.Done():
require.Equal(t, context.Canceled, customCtx.Err())
default:
require.Fail(t, "must be cancelled")
}
}
24 changes: 24 additions & 0 deletions internal/clientproto/protocol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package clientproto

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/centrifugal/protocol"
)

func TestPushHelpers(t *testing.T) {
msg := NewMessagePush(protocol.Raw("{}"))
require.NotNil(t, msg)
msg = NewJoinPush("test", protocol.Raw("{}"))
require.NotNil(t, msg)
msg = NewLeavePush("test", protocol.Raw("{}"))
require.NotNil(t, msg)
msg = NewPublicationPush("test", protocol.Raw("{}"))
require.NotNil(t, msg)
msg = NewSubPush("test", protocol.Raw("{}"))
require.NotNil(t, msg)
msg = NewUnsubPush("test", protocol.Raw("{}"))
require.NotNil(t, msg)
}
68 changes: 68 additions & 0 deletions internal/controlproto/controlpb_extra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package controlproto

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCommandProtoExtra(t *testing.T) {
msg := &Command{
UID: "test",
Method: MethodTypeDisconnect,
Params: Raw("{}"),
}

d, b := msg.Method.EnumDescriptor()
require.Equal(t, fileDescriptorControl, d)
require.Equal(t, []int{0}, b)

require.Equal(t, "test", msg.GetUID())
require.Equal(t, MethodTypeDisconnect, msg.GetMethod())
require.NotZero(t, msg.String())
}

func TestNodeProtoExtra(t *testing.T) {
msg := &Node{
UID: "test",
Name: "test name",
Version: "v1.0.0",
NumChannels: 2,
NumClients: 3,
NumUsers: 1,
Uptime: 12,
Metrics: &Metrics{
Interval: 60,
Items: map[string]float64{
"item": 1,
},
},
}
require.Equal(t, "test", msg.GetUID())
require.Equal(t, "test name", msg.GetName())
require.Equal(t, "v1.0.0", msg.GetVersion())
require.Equal(t, uint32(2), msg.GetNumChannels())
require.Equal(t, uint32(1), msg.GetNumUsers())
require.Equal(t, uint32(3), msg.GetNumClients())
require.Equal(t, uint32(12), msg.GetUptime())
require.NotNil(t, msg.GetMetrics())
require.NotZero(t, msg.String())
}

func TestDisconnectProtoExtra(t *testing.T) {
msg := &Disconnect{
User: "test",
}
require.Equal(t, "test", msg.GetUser())
require.NotZero(t, msg.String())
}

func TestUnsubscribeProtoExtra(t *testing.T) {
msg := &Unsubscribe{
User: "test",
Channel: "test channel",
}
require.Equal(t, "test", msg.GetUser())
require.Equal(t, "test channel", msg.GetChannel())
require.NotZero(t, msg.String())
}
28 changes: 20 additions & 8 deletions internal/dissolve/dissolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dissolve

import (
"errors"
"sync"
"testing"
"time"
)
Expand All @@ -10,14 +11,25 @@ func TestDissolver(t *testing.T) {
d := New(4)
_ = d.Run()
defer d.Close()
ch := make(chan struct{}, 1)
err := d.Submit(func() error {
ch <- struct{}{}
return nil
})
if err != nil {
t.Fatalf("Submit returned error: %v", err)
}
ch := make(chan struct{})
numJobs := 1024
var wg sync.WaitGroup
wg.Add(numJobs)
go func() {
for i := 0; i < numJobs; i++ {
err := d.Submit(func() error {
defer wg.Done()
return nil
})
if err != nil {
t.Fatalf("Submit returned error: %v", err)
}
}
}()
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
case <-time.After(time.Second):
Expand Down
49 changes: 0 additions & 49 deletions internal/dissolve/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ type queue interface {
// all goroutines in wait() will return
Close()

// CloseRemaining will close the queue and return all entried in the queue.
// All goroutines in wait() will return
CloseRemaining() []Job

// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
Expand All @@ -40,12 +36,6 @@ type queue interface {
// Will return "", false if the queue is closed.
// Otherwise the return value of "remove" is returned.
Wait() (Job, bool)

// Cap returns the capacity (without allocations).
Cap() int

// Len returns the current length of the queue.
Len() int
}

type queueImpl struct {
Expand Down Expand Up @@ -121,29 +111,6 @@ func (q *queueImpl) Close() {
q.cond.Broadcast()
}

// CloseRemaining will close the queue and return all entried in the queue.
// All goroutines in wait() will return.
func (q *queueImpl) CloseRemaining() []Job {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return []Job{}
}
rem := make([]Job, 0, q.cnt)
for q.cnt > 0 {
i := q.nodes[q.head]
q.head = (q.head + 1) % len(q.nodes)
q.cnt--
rem = append(rem, i)
}
q.closed = true
q.cnt = 0
q.nodes = nil
q.size = 0
q.cond.Broadcast()
return rem
}

// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
Expand Down Expand Up @@ -194,19 +161,3 @@ func (q *queueImpl) Remove() (Job, bool) {
q.mu.Unlock()
return i, true
}

// Return the capacity (without allocations)
func (q *queueImpl) Cap() int {
q.mu.RLock()
c := cap(q.nodes)
q.mu.RUnlock()
return c
}

// Return the current length of the queue.
func (q *queueImpl) Len() int {
q.mu.RLock()
l := q.cnt
q.mu.RUnlock()
return l
}

0 comments on commit b500398

Please sign in to comment.