Skip to content

Commit

Permalink
server: use separate goroutines for sotw bidi streams (envoyproxy#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Dec 21, 2021
1 parent 3e0d165 commit 9134e3c
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 115 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/proto/otlp v0.7.0
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.36.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
181 changes: 98 additions & 83 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package sotw
import (
"context"
"errors"
"reflect"
"strconv"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -72,6 +73,24 @@ type lastDiscoveryResponse struct {
resources map[string]struct{}
}

type lastDiscoveryResponses struct {
mu sync.RWMutex
responses map[string]lastDiscoveryResponse
}

func (l *lastDiscoveryResponses) Set(key string, value lastDiscoveryResponse) {
l.mu.Lock()
l.responses[key] = value
l.mu.Unlock()
}

func (l *lastDiscoveryResponses) Get(key string) (value lastDiscoveryResponse, ok bool) {
l.mu.RLock()
value, ok = l.responses[key]
l.mu.RUnlock()
return
}

// process handles a bi-di stream request
func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
Expand All @@ -82,13 +101,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
var streamNonce int64

streamState := stream.NewStreamState(false, map[string]string{})
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
lastDiscoveryResponses := lastDiscoveryResponses{responses: make(map[string]lastDiscoveryResponse)}

// a collection of stack allocated watches per request type
watches := newWatches()

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
Expand Down Expand Up @@ -116,7 +134,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
lastDiscoveryResponses.Set(resp.GetRequest().TypeUrl, lastResponse)

if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
Expand All @@ -133,103 +151,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// node may only be set on the first discovery request
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

for {
// The list of select cases looks like this:
// 0: <- ctx.Done
// 1: <- reqCh
// 2...: per type watches
index, value, ok := reflect.Select(watches.cases)
switch index {
// ctx.Done() -> if we receive a value here we return as no further computation is needed
case 0:
return nil
// Case 1 handles any request inbound on the stream and handles all initialization as needed
case 1:
// input stream ended or errored out
if !ok {
return nil
}

req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
var resCh = make(chan cache.Response, 1)

// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
ctx, cancel := context.WithCancel(s.ctx)
eg, ctx := errgroup.WithContext(ctx)

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()
eg.Go(func() error {
defer func() {
watches.close() // this should remove all watches from the cache
close(resCh) // close resCh and let the second eg.Go drain it
}()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
for {
select {
case <-ctx.Done():
return nil
case req, more := <-reqCh:
if !more {
return nil
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
}

if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
}
}
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()
if lastResponse, ok := lastDiscoveryResponses.Get(req.TypeUrl); ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
}
}

typeURL := req.GetTypeUrl()
if w := watches.getWatch(typeURL); w != nil {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if n := w.getNonce(); n == "" || n == nonce {
w.close()

watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
cancel: s.cache.CreateWatch(req, streamState, resCh),
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
}
})

// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
eg.Go(func() (err error) {
var nonce string
for res := range resCh {
if res == nil || err != nil {
continue // this loop should not exit until resCh closed
}

res := value.Interface().(cache.Response)
nonce, err := send(res)
if err != nil {
return err
if nonce, err = send(res); err == nil {
if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil {
w.setNonce(nonce)
}
} else {
cancel()
}

watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
return err
})

return eg.Wait()
}

// StreamHandler converts a blocking read call to channels and initiates stream processing
Expand Down
58 changes: 26 additions & 32 deletions pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package sotw

import (
"context"
"reflect"
"sync"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

// watches for all xDS resource types
type watches struct {
mu sync.RWMutex
responders map[string]*watch

// cases is a dynamic select case for the watched channels.
cases []reflect.SelectCase
}

// newWatches creates and initializes watches.
func newWatches() watches {
return watches{
responders: make(map[string]*watch, int(types.UnknownType)),
cases: make([]reflect.SelectCase, 0),
}
}

// addWatch creates a new watch entry in the watches map.
// Watches are sorted by typeURL.
func (w *watches) addWatch(typeURL string, watch *watch) {
w.mu.Lock()
w.responders[typeURL] = watch
w.mu.Unlock()
}

func (w *watches) getWatch(typeURL string) (watch *watch) {
w.mu.RLock()
watch = w.responders[typeURL]
w.mu.RUnlock()
return
}

// close all open watches
Expand All @@ -38,33 +41,24 @@ func (w *watches) close() {
}
}

// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
w.cases = w.cases[:0] // Clear the existing cases while retaining capacity.

w.cases = append(w.cases,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(req),
},
)
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
mu sync.RWMutex
cancel func()
nonce string
}

for _, watch := range w.responders {
w.cases = append(w.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(watch.response),
})
}
func (w *watch) getNonce() (n string) {
w.mu.RLock()
n = w.nonce
w.mu.RUnlock()
return n
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
cancel func()
nonce string
response chan cache.Response
func (w *watch) setNonce(n string) {
w.mu.Lock()
w.nonce = n
w.mu.Unlock()
}

// close cancels an open watch
Expand Down

0 comments on commit 9134e3c

Please sign in to comment.