diff --git a/errors.go b/errors.go index e43d0327..04d7446c 100644 --- a/errors.go +++ b/errors.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -43,4 +43,5 @@ const ( ErrNotRaw = errors.ErrNotRaw ErrCanceled = errors.ErrCanceled ErrNoContext = errors.ErrNoContext + ErrNoPeers = errors.ErrNoPeers ) diff --git a/errors/errors.go b/errors/errors.go index 660e88e2..7bf27c22 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -49,4 +49,5 @@ const ( ErrNotRaw = err("socket not raw") ErrCanceled = err("operation canceled") ErrNoContext = err("protocol does not support contexts") + ErrNoPeers = err("no connected peers") ) diff --git a/options.go b/options.go index 29643e42..8af3d320 100644 --- a/options.go +++ b/options.go @@ -214,4 +214,15 @@ const ( // Solaris platforms at present, and only when cgo support is enabled. // The value is an int. OptionPeerZone = "PEER-ZONE" + + // OptionFailNoPeers causes send or receive operations to fail + // immediately rather than waiting for a timeout if there are no + // connected peers. This helps discriminate between cases involving + // flow control, from those where we we have no peers. Use of this + // option may make applications more brittle, as a temporary disconnect + // that may otherwise self-heal quickly will now create an immediate + // failure. Applications using this should be prepared to deal with + // such failures. Note that not all protocols respect this -- best + // effort protocols will particularly not support this. + OptionFailNoPeers = "FAIL-NO-PEERS" ) diff --git a/protocol/protocol.go b/protocol/protocol.go index b9c3f6ec..f2cb8fd4 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -67,6 +67,7 @@ const ( ErrProtoOp = errors.ErrProtoOp ErrProtoState = errors.ErrProtoState ErrCanceled = errors.ErrCanceled + ErrNoPeers = errors.ErrNoPeers ) // Common option definitions @@ -84,6 +85,7 @@ const ( OptionLinger = mangos.OptionLinger // Remove? OptionTTL = mangos.OptionTTL OptionBestEffort = mangos.OptionBestEffort + OptionFailNoPeers = mangos.OptionFailNoPeers ) // MakeSocket creates a Socket on top of a Protocol. diff --git a/protocol/push/push_test.go b/protocol/push/push_test.go index 8240fe99..5d418c01 100644 --- a/protocol/push/push_test.go +++ b/protocol/push/push_test.go @@ -49,3 +49,11 @@ func TestPushOptions(t *testing.T) { func TestPushNoRecv(t *testing.T) { CannotRecv(t, NewSocket) } + +func TestPushFastFailNoPeer(t *testing.T) { + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) + + s := GetSocket(t, NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + MustBeError(t, s.Send([]byte("junk")), mangos.ErrNoPeers) +} diff --git a/protocol/req/req.go b/protocol/req/req.go index b0f60ec9..142c9de6 100644 --- a/protocol/req/req.go +++ b/protocol/req/req.go @@ -1,4 +1,4 @@ -// Copyright 2020 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -40,23 +40,24 @@ type pipe struct { } type context struct { - s *socket - cond *sync.Cond - resendTime time.Duration // tunable resend time - sendExpire time.Duration // how long to wait in send - recvExpire time.Duration // how long to wait in recv - sendTimer *time.Timer // send timer - recvTimer *time.Timer // recv timer - resender *time.Timer // resend timeout - reqMsg *protocol.Message // message for transmit - repMsg *protocol.Message // received reply - sendMsg *protocol.Message // messaging waiting for send - lastPipe *pipe // last pipe used for transmit - reqID uint32 // request ID - recvWait bool // true if a thread is blocked in RecvMsg - bestEffort bool // if true, don't block waiting in send - queued bool // true if we need to send a message - closed bool // true if we are closed + s *socket + cond *sync.Cond + resendTime time.Duration // tunable resend time + sendExpire time.Duration // how long to wait in send + recvExpire time.Duration // how long to wait in recv + sendTimer *time.Timer // send timer + recvTimer *time.Timer // recv timer + resender *time.Timer // resend timeout + reqMsg *protocol.Message // message for transmit + repMsg *protocol.Message // received reply + sendMsg *protocol.Message // messaging waiting for send + lastPipe *pipe // last pipe used for transmit + reqID uint32 // request ID + recvWait bool // true if a thread is blocked in RecvMsg + bestEffort bool // if true, don't block waiting in send + failNoPeers bool // fast fail if no peers present + queued bool // true if we need to send a message + closed bool // true if we are closed } type socket struct { @@ -68,6 +69,7 @@ type socket struct { closed bool // true if we are closed sendq []*context // contexts waiting to send readyq []*pipe // pipes available for sending + pipes map[uint32]*pipe // all pipes } func (s *socket) send() { @@ -101,7 +103,7 @@ func (s *socket) send() { } } -func (p *pipe) sendCtx(c *context, m *protocol.Message) { +func (p *pipe) sendCtx(_ *context, m *protocol.Message) { s := p.s // Send this message. If an error occurs, we examine the @@ -245,6 +247,9 @@ func (c *context) SendMsg(m *protocol.Message) error { return protocol.ErrClosed } + if c.failNoPeers && len(s.pipes) == 0 { + return protocol.ErrNoPeers + } c.cancel() // this cancels any pending send or recv calls c.unscheduleSend() @@ -281,7 +286,7 @@ func (c *context) SendMsg(m *protocol.Message) error { // It is responsible for providing the blocking semantic and // ultimately back-pressure. Note that we will "continue" if // the send is canceled by a subsequent send. - for c.sendMsg == m && !expired && !c.closed { + for c.sendMsg == m && !expired && !c.closed && !(c.failNoPeers && len(s.pipes) == 0) { c.cond.Wait() } if c.sendMsg == m { @@ -291,6 +296,9 @@ func (c *context) SendMsg(m *protocol.Message) error { if c.closed { return protocol.ErrClosed } + if c.failNoPeers && len(s.pipes) == 0 { + return protocol.ErrNoPeers + } return protocol.ErrSendTimeout } return nil @@ -303,6 +311,9 @@ func (c *context) RecvMsg() (*protocol.Message, error) { if s.closed || c.closed { return nil, protocol.ErrClosed } + if c.failNoPeers && len(s.pipes) == 0 { + return nil, protocol.ErrNoPeers + } if c.recvWait || c.reqID == 0 { return nil, protocol.ErrProtoState } @@ -332,11 +343,14 @@ func (c *context) RecvMsg() (*protocol.Message, error) { c.cond.Broadcast() if m == nil { + if c.closed { + return nil, protocol.ErrClosed + } if expired { return nil, protocol.ErrRecvTimeout } - if c.closed { - return nil, protocol.ErrClosed + if c.failNoPeers && len(s.pipes) == 0 { + return nil, protocol.ErrNoPeers } return nil, protocol.ErrCanceled } @@ -380,6 +394,16 @@ func (c *context) SetOption(name string, value interface{}) error { return nil } return protocol.ErrBadValue + + case protocol.OptionFailNoPeers: + if v, ok := value.(bool); ok { + c.s.Lock() + c.failNoPeers = v + c.s.Unlock() + return nil + } + return protocol.ErrBadValue + } return protocol.ErrBadOption @@ -407,6 +431,11 @@ func (c *context) GetOption(option string) (interface{}, error) { v := c.bestEffort c.s.Unlock() return v, nil + case protocol.OptionFailNoPeers: + c.s.Lock() + v := c.failNoPeers + c.s.Unlock() + return v, nil } return nil, protocol.ErrBadOption @@ -493,6 +522,7 @@ func (s *socket) AddPipe(pp protocol.Pipe) error { } s.readyq = append(s.readyq, p) s.send() + s.pipes[pp.ID()] = p go p.receiver() return nil } @@ -506,8 +536,11 @@ func (s *socket) RemovePipe(pp protocol.Pipe) { s.readyq = append(s.readyq[:i], s.readyq[i+1:]...) } } + delete(s.pipes, pp.ID()) for c := range s.ctxs { - if c.lastPipe == p && c.reqMsg != nil { + if c.failNoPeers && len(s.pipes) == 0 { + c.cancel() + } else if c.lastPipe == p && c.reqMsg != nil { // We are closing this pipe, so we need to // immediately reschedule it. c.lastPipe = nil @@ -539,6 +572,7 @@ func NewProtocol() protocol.Protocol { nextID: uint32(time.Now().UnixNano()), // quasi-random ctxs: make(map[*context]struct{}), ctxByID: make(map[uint32]*context), + pipes: make(map[uint32]*pipe), } s.defCtx = &context{ s: s, diff --git a/protocol/req/req_test.go b/protocol/req/req_test.go index b55542af..747a8ce6 100644 --- a/protocol/req/req_test.go +++ b/protocol/req/req_test.go @@ -45,6 +45,7 @@ func TestReqOptions(t *testing.T) { VerifyOptionDuration(t, NewSocket, mangos.OptionSendDeadline) VerifyOptionDuration(t, NewSocket, mangos.OptionRetryTime) VerifyOptionBool(t, NewSocket, mangos.OptionBestEffort) + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) } func TestReqClosed(t *testing.T) { @@ -450,3 +451,89 @@ func TestReqMultiContexts(t *testing.T) { MustBeTrue(t, recv[i] == repeat) } } + +func TestReqSendNoPeers(t *testing.T) { + s := GetSocket(t, NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + MustBeError(t, s.Send([]byte("junk")), mangos.ErrNoPeers) + MustSucceed(t, s.Close()) +} + +func TestReqSendNoPeerDisconnect(t *testing.T) { + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) + + s := GetSocket(t, NewSocket) + p := GetSocket(t, rep.NewSocket) + c1, err := s.OpenContext() + MustSucceed(t, err) + c2, err := s.OpenContext() + MustSucceed(t, err) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second)) + + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + + // Now connect them so they can drain -- we should only have 3 messages + // that arrive at the peer. + ConnectPair(t, s, p) + time.Sleep(time.Millisecond * 20) + // this logic fills the queue + go func() { + _ = c1.Send([]byte("one")) + }() + go func() { + _ = c2.Send([]byte("one")) + }() + time.Sleep(time.Millisecond * 10) + go func() { + time.Sleep(time.Millisecond * 20) + MustSucceed(t, p.Close()) + }() + time.Sleep(time.Millisecond * 20) + + MustBeError(t, s.Send([]byte("three")), mangos.ErrNoPeers) + MustSucceed(t, s.Close()) +} + + +func TestReqRecvNoPeer(t *testing.T) { + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) + + s := GetSocket(t, NewSocket) + p := GetSocket(t, rep.NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second)) + + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + + // Now connect them so they can drain -- we should only have 3 messages + // that arrive at the peer. + ConnectPair(t, s, p) + time.Sleep(time.Millisecond * 20) + MustSucceed(t, s.Send([]byte("one"))) // sent by the socket, picked up by rep socket + time.Sleep(time.Millisecond * 20) + MustSucceed(t, p.Close()) + time.Sleep(time.Millisecond*20) + MustNotRecv(t, s, mangos.ErrNoPeers) + MustSucceed(t, s.Close()) +} + +func TestReqRecvNoPeerDisconnect(t *testing.T) { + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) + + s := GetSocket(t, NewSocket) + p := GetSocket(t, rep.NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second)) + + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + + // Now connect them so they can drain -- we should only have 3 messages + // that arrive at the peer. + ConnectPair(t, s, p) + time.Sleep(time.Millisecond * 20) + MustSucceed(t, s.Send([]byte("one"))) // sent by the socket, picked up by rep socket + go func() { + time.Sleep(time.Millisecond*10) + MustSucceed(t, p.Close()) + }() + MustNotRecv(t, s, mangos.ErrNoPeers) + MustSucceed(t, s.Close()) +} diff --git a/protocol/xpush/xpush.go b/protocol/xpush/xpush.go index c11a9cb3..837e0f4a 100644 --- a/protocol/xpush/xpush.go +++ b/protocol/xpush/xpush.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -39,14 +39,17 @@ type pipe struct { } type socket struct { - closed bool - closeQ chan struct{} - sendQ chan *protocol.Message - sendExpire time.Duration - sendQLen int - bestEffort bool - readyQ []*pipe - cv *sync.Cond + closed bool + closeQ chan struct{} + sendQ chan *protocol.Message + noPeerQ chan struct{} + sendExpire time.Duration + sendQLen int + bestEffort bool + failNoPeers bool + readyQ []*pipe + pipes map[uint32]*pipe + cv *sync.Cond sync.Mutex } @@ -79,9 +82,16 @@ func (s *socket) SendMsg(m *protocol.Message) error { s.Unlock() return protocol.ErrClosed } + if s.failNoPeers && len(s.pipes) == 0 { + s.Unlock() + return protocol.ErrNoPeers + } + pq := s.noPeerQ s.Unlock() select { + case <-pq: + return protocol.ErrNoPeers case s.sendQ <- m: case <-s.closeQ: return protocol.ErrClosed @@ -174,6 +184,15 @@ func (s *socket) SetOption(name string, value interface{}) error { } return protocol.ErrBadValue + case protocol.OptionFailNoPeers: + if v, ok := value.(bool); ok { + s.Lock() + s.failNoPeers = v + s.Unlock() + return nil + } + return protocol.ErrBadValue + case protocol.OptionWriteQLen: if v, ok := value.(int); ok && v >= 0 { @@ -225,6 +244,11 @@ func (s *socket) GetOption(option string) (interface{}, error) { v := s.sendQLen s.Unlock() return v, nil + case protocol.OptionFailNoPeers: + s.Lock() + v := s.failNoPeers + s.Unlock() + return v, nil } return nil, protocol.ErrBadOption @@ -256,7 +280,7 @@ func (s *socket) AddPipe(pp protocol.Pipe) error { return protocol.ErrClosed } go p.receiver() - + s.pipes[pp.ID()] = p s.readyQ = append(s.readyQ, p) s.cv.Broadcast() return nil @@ -274,6 +298,11 @@ func (s *socket) RemovePipe(pp protocol.Pipe) { break } } + delete(s.pipes, pp.ID()) + if s.failNoPeers && len(s.pipes) == 0 { + close(s.noPeerQ) + s.noPeerQ = make(chan struct{}) + } s.Unlock() } @@ -294,8 +323,10 @@ func (*socket) Info() protocol.Info { func NewProtocol() protocol.Protocol { s := &socket{ closeQ: make(chan struct{}), + noPeerQ: make(chan struct{}), sendQ: make(chan *protocol.Message, defaultQLen), sendQLen: defaultQLen, + pipes: make(map[uint32]*pipe), } s.cv = sync.NewCond(s) go s.sender() diff --git a/protocol/xpush/xpush_test.go b/protocol/xpush/xpush_test.go index 73a9ee72..ea205232 100644 --- a/protocol/xpush/xpush_test.go +++ b/protocol/xpush/xpush_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2021 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -42,6 +42,7 @@ func TestXPushRaw(t *testing.T) { func TestXPushOptions(t *testing.T) { VerifyInvalidOption(t, NewSocket) VerifyOptionBool(t, NewSocket, mangos.OptionBestEffort) + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) VerifyOptionDuration(t, NewSocket, mangos.OptionSendDeadline) VerifyOptionQLen(t, NewSocket, mangos.OptionWriteQLen) } @@ -222,3 +223,27 @@ func TestXPushSendFail(t *testing.T) { MustBeTrue(t, nRem == 1) MustBeTrue(t, len(mp.SendQ()) == 0) } + +func TestXPushFastFailNoPeerDisconnect(t *testing.T) { + VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers) + + s := GetSocket(t, NewSocket) + p := GetSocket(t, pull.NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second)) + + MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true)) + MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 0)) + MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 0)) + + // Now connect them so they can drain -- we should only have 3 messages + // that arrive at the peer. + ConnectPair(t, s, p) + + go func() { + time.Sleep(time.Millisecond*20) + MustSucceed(t, p.Close()) + }() + + MustBeError(t ,s.Send([]byte("three")), mangos.ErrNoPeers) + MustSucceed(t, s.Close()) +} \ No newline at end of file