-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
102 lines (90 loc) · 3.06 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"context"
"fmt"
"io"
"strings"
pbdevproxy "github.com/streamingfast/sf-saas-priv/pb/dfuse/devproxy/v1"
proxy "github.com/mwitkow/grpc-proxy/proxy"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
pbreflect "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
type ReflectServer struct {
conf *config
}
func (s *ReflectServer) Director(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, metadataExists := metadata.FromIncomingContext(ctx)
zlog.Info("reflect server director request", zap.String("method_name", fullMethodName), zap.Reflect("metadata", md))
var endpoint string
// check in the headers if `server` is specified, in which case, forward to it directly
if metadataExists && len(md.Get("server")) != 0 {
endpoint = md.Get("server")[0]
} else {
parts := strings.Split(fullMethodName, "/")
endpoint = s.conf.serviceToEndpoint[parts[1]]
}
if endpoint == "" {
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method or endpoint to reach")
}
opts := dialOptions(endpoint)
opts = append(opts, grpc.WithCodec(proxy.Codec()))
conn, err := grpc.DialContext(ctx, endpoint, opts...)
return ctx, conn, err
}
func (s *ReflectServer) ListServers(ctx context.Context, req *pbdevproxy.ListRequest) (*pbdevproxy.ListResponse, error) {
return &pbdevproxy.ListResponse{
Servers: s.conf.allServices,
}, nil
}
func (s *ReflectServer) ServerReflectionInfo(stream pbreflect.ServerReflection_ServerReflectionInfoServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
zlog.Error("error reading message", zap.Error(err))
break
}
switch req := msg.MessageRequest.(type) {
case *pbreflect.ServerReflectionRequest_FileByFilename:
err = stream.Send(s.conf.filesByFilename[req.FileByFilename])
case *pbreflect.ServerReflectionRequest_FileContainingSymbol:
err = stream.Send(s.conf.fileContainingSymbol[req.FileContainingSymbol])
case *pbreflect.ServerReflectionRequest_FileContainingExtension:
err = fmt.Errorf("unimplemented")
case *pbreflect.ServerReflectionRequest_AllExtensionNumbersOfType:
err = stream.Send(s.conf.extensionNumbers[req.AllExtensionNumbersOfType])
case *pbreflect.ServerReflectionRequest_ListServices:
var services []*pbreflect.ServiceResponse
seen := map[string]bool{}
for _, svc := range s.conf.allServices {
if seen[svc] {
continue
}
seen[svc] = true
services = append(services, &pbreflect.ServiceResponse{
Name: svc,
})
}
err = stream.Send(&pbreflect.ServerReflectionResponse{
ValidHost: msg.Host, //// wuut anyway?
OriginalRequest: msg,
MessageResponse: &pbreflect.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &pbreflect.ListServiceResponse{
Service: services,
},
},
})
}
if err != nil {
zlog.Error("error sending", zap.Error(err))
break
}
}
// Take in the command, fetch the right responses from the last global `config`
return nil
}