Skip to content

Commit

Permalink
fixes #224 Desire FastSendFailNoPeer option
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Mar 24, 2021
1 parent 233273c commit c7f0d32
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 37 deletions.
3 changes: 2 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Mangos Authors
// Copyright 2021 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -43,4 +43,5 @@ const (
ErrNotRaw = errors.ErrNotRaw
ErrCanceled = errors.ErrCanceled
ErrNoContext = errors.ErrNoContext
ErrNoPeers = errors.ErrNoPeers
)
3 changes: 2 additions & 1 deletion errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Mangos Authors
// Copyright 2021 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -49,4 +49,5 @@ const (
ErrNotRaw = err("socket not raw")
ErrCanceled = err("operation canceled")
ErrNoContext = err("protocol does not support contexts")
ErrNoPeers = err("no connected peers")
)
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,15 @@ const (
// Solaris platforms at present, and only when cgo support is enabled.
// The value is an int.
OptionPeerZone = "PEER-ZONE"

// OptionFailNoPeers causes send or receive operations to fail
// immediately rather than waiting for a timeout if there are no
// connected peers. This helps discriminate between cases involving
// flow control, from those where we we have no peers. Use of this
// option may make applications more brittle, as a temporary disconnect
// that may otherwise self-heal quickly will now create an immediate
// failure. Applications using this should be prepared to deal with
// such failures. Note that not all protocols respect this -- best
// effort protocols will particularly not support this.
OptionFailNoPeers = "FAIL-NO-PEERS"
)
4 changes: 3 additions & 1 deletion protocol/protocol.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Mangos Authors
// Copyright 2021 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,7 @@ const (
ErrProtoOp = errors.ErrProtoOp
ErrProtoState = errors.ErrProtoState
ErrCanceled = errors.ErrCanceled
ErrNoPeers = errors.ErrNoPeers
)

// Common option definitions
Expand All @@ -84,6 +85,7 @@ const (
OptionLinger = mangos.OptionLinger // Remove?
OptionTTL = mangos.OptionTTL
OptionBestEffort = mangos.OptionBestEffort
OptionFailNoPeers = mangos.OptionFailNoPeers
)

// MakeSocket creates a Socket on top of a Protocol.
Expand Down
8 changes: 8 additions & 0 deletions protocol/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ func TestPushOptions(t *testing.T) {
func TestPushNoRecv(t *testing.T) {
CannotRecv(t, NewSocket)
}

func TestPushFastFailNoPeer(t *testing.T) {
VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers)

s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true))
MustBeError(t, s.Send([]byte("junk")), mangos.ErrNoPeers)
}
80 changes: 57 additions & 23 deletions protocol/req/req.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The Mangos Authors
// Copyright 2021 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -40,23 +40,24 @@ type pipe struct {
}

type context struct {
s *socket
cond *sync.Cond
resendTime time.Duration // tunable resend time
sendExpire time.Duration // how long to wait in send
recvExpire time.Duration // how long to wait in recv
sendTimer *time.Timer // send timer
recvTimer *time.Timer // recv timer
resender *time.Timer // resend timeout
reqMsg *protocol.Message // message for transmit
repMsg *protocol.Message // received reply
sendMsg *protocol.Message // messaging waiting for send
lastPipe *pipe // last pipe used for transmit
reqID uint32 // request ID
recvWait bool // true if a thread is blocked in RecvMsg
bestEffort bool // if true, don't block waiting in send
queued bool // true if we need to send a message
closed bool // true if we are closed
s *socket
cond *sync.Cond
resendTime time.Duration // tunable resend time
sendExpire time.Duration // how long to wait in send
recvExpire time.Duration // how long to wait in recv
sendTimer *time.Timer // send timer
recvTimer *time.Timer // recv timer
resender *time.Timer // resend timeout
reqMsg *protocol.Message // message for transmit
repMsg *protocol.Message // received reply
sendMsg *protocol.Message // messaging waiting for send
lastPipe *pipe // last pipe used for transmit
reqID uint32 // request ID
recvWait bool // true if a thread is blocked in RecvMsg
bestEffort bool // if true, don't block waiting in send
failNoPeers bool // fast fail if no peers present
queued bool // true if we need to send a message
closed bool // true if we are closed
}

type socket struct {
Expand All @@ -68,6 +69,7 @@ type socket struct {
closed bool // true if we are closed
sendq []*context // contexts waiting to send
readyq []*pipe // pipes available for sending
pipes map[uint32]*pipe // all pipes
}

func (s *socket) send() {
Expand Down Expand Up @@ -101,7 +103,7 @@ func (s *socket) send() {
}
}

func (p *pipe) sendCtx(c *context, m *protocol.Message) {
func (p *pipe) sendCtx(_ *context, m *protocol.Message) {
s := p.s

// Send this message. If an error occurs, we examine the
Expand Down Expand Up @@ -245,6 +247,9 @@ func (c *context) SendMsg(m *protocol.Message) error {
return protocol.ErrClosed
}

if c.failNoPeers && len(s.pipes) == 0 {
return protocol.ErrNoPeers
}
c.cancel() // this cancels any pending send or recv calls
c.unscheduleSend()

Expand Down Expand Up @@ -281,7 +286,7 @@ func (c *context) SendMsg(m *protocol.Message) error {
// It is responsible for providing the blocking semantic and
// ultimately back-pressure. Note that we will "continue" if
// the send is canceled by a subsequent send.
for c.sendMsg == m && !expired && !c.closed {
for c.sendMsg == m && !expired && !c.closed && !(c.failNoPeers && len(s.pipes) == 0) {
c.cond.Wait()
}
if c.sendMsg == m {
Expand All @@ -291,6 +296,9 @@ func (c *context) SendMsg(m *protocol.Message) error {
if c.closed {
return protocol.ErrClosed
}
if c.failNoPeers && len(s.pipes) == 0 {
return protocol.ErrNoPeers
}
return protocol.ErrSendTimeout
}
return nil
Expand All @@ -303,6 +311,9 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
if s.closed || c.closed {
return nil, protocol.ErrClosed
}
if c.failNoPeers && len(s.pipes) == 0 {
return nil, protocol.ErrNoPeers
}
if c.recvWait || c.reqID == 0 {
return nil, protocol.ErrProtoState
}
Expand Down Expand Up @@ -332,11 +343,14 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
c.cond.Broadcast()

if m == nil {
if c.closed {
return nil, protocol.ErrClosed
}
if expired {
return nil, protocol.ErrRecvTimeout
}
if c.closed {
return nil, protocol.ErrClosed
if c.failNoPeers && len(s.pipes) == 0 {
return nil, protocol.ErrNoPeers
}
return nil, protocol.ErrCanceled
}
Expand Down Expand Up @@ -380,6 +394,16 @@ func (c *context) SetOption(name string, value interface{}) error {
return nil
}
return protocol.ErrBadValue

case protocol.OptionFailNoPeers:
if v, ok := value.(bool); ok {
c.s.Lock()
c.failNoPeers = v
c.s.Unlock()
return nil
}
return protocol.ErrBadValue

}

return protocol.ErrBadOption
Expand Down Expand Up @@ -407,6 +431,11 @@ func (c *context) GetOption(option string) (interface{}, error) {
v := c.bestEffort
c.s.Unlock()
return v, nil
case protocol.OptionFailNoPeers:
c.s.Lock()
v := c.failNoPeers
c.s.Unlock()
return v, nil
}

return nil, protocol.ErrBadOption
Expand Down Expand Up @@ -493,6 +522,7 @@ func (s *socket) AddPipe(pp protocol.Pipe) error {
}
s.readyq = append(s.readyq, p)
s.send()
s.pipes[pp.ID()] = p
go p.receiver()
return nil
}
Expand All @@ -506,8 +536,11 @@ func (s *socket) RemovePipe(pp protocol.Pipe) {
s.readyq = append(s.readyq[:i], s.readyq[i+1:]...)
}
}
delete(s.pipes, pp.ID())
for c := range s.ctxs {
if c.lastPipe == p && c.reqMsg != nil {
if c.failNoPeers && len(s.pipes) == 0 {
c.cancel()
} else if c.lastPipe == p && c.reqMsg != nil {
// We are closing this pipe, so we need to
// immediately reschedule it.
c.lastPipe = nil
Expand Down Expand Up @@ -539,6 +572,7 @@ func NewProtocol() protocol.Protocol {
nextID: uint32(time.Now().UnixNano()), // quasi-random
ctxs: make(map[*context]struct{}),
ctxByID: make(map[uint32]*context),
pipes: make(map[uint32]*pipe),
}
s.defCtx = &context{
s: s,
Expand Down
87 changes: 87 additions & 0 deletions protocol/req/req_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestReqOptions(t *testing.T) {
VerifyOptionDuration(t, NewSocket, mangos.OptionSendDeadline)
VerifyOptionDuration(t, NewSocket, mangos.OptionRetryTime)
VerifyOptionBool(t, NewSocket, mangos.OptionBestEffort)
VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers)
}

func TestReqClosed(t *testing.T) {
Expand Down Expand Up @@ -450,3 +451,89 @@ func TestReqMultiContexts(t *testing.T) {
MustBeTrue(t, recv[i] == repeat)
}
}

func TestReqSendNoPeers(t *testing.T) {
s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true))
MustBeError(t, s.Send([]byte("junk")), mangos.ErrNoPeers)
MustSucceed(t, s.Close())
}

func TestReqSendNoPeerDisconnect(t *testing.T) {
VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers)

s := GetSocket(t, NewSocket)
p := GetSocket(t, rep.NewSocket)
c1, err := s.OpenContext()
MustSucceed(t, err)
c2, err := s.OpenContext()
MustSucceed(t, err)
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second))

MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true))

// Now connect them so they can drain -- we should only have 3 messages
// that arrive at the peer.
ConnectPair(t, s, p)
time.Sleep(time.Millisecond * 20)
// this logic fills the queue
go func() {
_ = c1.Send([]byte("one"))
}()
go func() {
_ = c2.Send([]byte("one"))
}()
time.Sleep(time.Millisecond * 10)
go func() {
time.Sleep(time.Millisecond * 20)
MustSucceed(t, p.Close())
}()
time.Sleep(time.Millisecond * 20)

MustBeError(t, s.Send([]byte("three")), mangos.ErrNoPeers)
MustSucceed(t, s.Close())
}


func TestReqRecvNoPeer(t *testing.T) {
VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers)

s := GetSocket(t, NewSocket)
p := GetSocket(t, rep.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second))

MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true))

// Now connect them so they can drain -- we should only have 3 messages
// that arrive at the peer.
ConnectPair(t, s, p)
time.Sleep(time.Millisecond * 20)
MustSucceed(t, s.Send([]byte("one"))) // sent by the socket, picked up by rep socket
time.Sleep(time.Millisecond * 20)
MustSucceed(t, p.Close())
time.Sleep(time.Millisecond*20)
MustNotRecv(t, s, mangos.ErrNoPeers)
MustSucceed(t, s.Close())
}

func TestReqRecvNoPeerDisconnect(t *testing.T) {
VerifyOptionBool(t, NewSocket, mangos.OptionFailNoPeers)

s := GetSocket(t, NewSocket)
p := GetSocket(t, rep.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Second))

MustSucceed(t, s.SetOption(mangos.OptionFailNoPeers, true))

// Now connect them so they can drain -- we should only have 3 messages
// that arrive at the peer.
ConnectPair(t, s, p)
time.Sleep(time.Millisecond * 20)
MustSucceed(t, s.Send([]byte("one"))) // sent by the socket, picked up by rep socket
go func() {
time.Sleep(time.Millisecond*10)
MustSucceed(t, p.Close())
}()
MustNotRecv(t, s, mangos.ErrNoPeers)
MustSucceed(t, s.Close())
}
Loading

0 comments on commit c7f0d32

Please sign in to comment.