Skip to content

Commit

Permalink
Merge branch 'connectrpc' of github.com:scionproto/scion into connectrpc
Browse files Browse the repository at this point in the history
  • Loading branch information
oncilla committed Nov 5, 2023
2 parents 40a8712 + 7d96b05 commit 9b099b6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 42 deletions.
4 changes: 2 additions & 2 deletions control/cmd/control/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ go_library(
"@in_gopkg_yaml_v2//:go_default_library",
"@org_go4_netipx//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//health:go_default_library",
"@org_golang_google_grpc//health/grpc_health_v1:go_default_library",
"@org_golang_google_grpc//peer:go_default_library",
"@org_golang_x_net//http2:go_default_library",
"@org_golang_x_net//http2/h2c:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
Expand Down
95 changes: 55 additions & 40 deletions control/cmd/control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ import (
"net"
"net/http"
_ "net/http/pprof"
"net/netip"
"path/filepath"
"strings"
"sync"
"time"

"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
promgrpc "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -38,8 +42,6 @@ import (
"go4.org/netipx"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/peer"

cpconnect "github.com/scionproto/scion/bufgen/proto/control_plane/v1/control_planeconnect"
Expand Down Expand Up @@ -115,16 +117,23 @@ import (
trustmetrics "github.com/scionproto/scion/private/trust/metrics"
)

type loggingHandler struct{ next http.Handler }
type loggingHandler struct {
prefix string
next http.Handler
}

func (h loggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.Method)
fmt.Println(r.URL)
fmt.Println(h.prefix, r.Method, r.URL)

if addr, ok := r.Context().Value(http3.RemoteAddrContextKey).(net.Addr); ok {
log.Info("HTTP3 request", "remote", r.Context().Value(http3.RemoteAddrContextKey))
ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: addr})
r = r.WithContext(ctx)
} else if addrPort, err := netip.ParseAddrPort(r.RemoteAddr); err == nil {
log.Info("HTTP request", "remote", addrPort)
tcpAddr := net.TCPAddrFromAddrPort(addrPort)
ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: tcpAddr})
r = r.WithContext(ctx)
}
h.next.ServeHTTP(w, r)
}
Expand Down Expand Up @@ -265,10 +274,6 @@ func realMain(ctx context.Context) error {
return serrors.WrapStr("initializing QUIC stack", err)
}
defer quicStack.RedirectCloser()
tcpStack, err := nc.TCPStack()
if err != nil {
return serrors.WrapStr("initializing TCP stack", err)
}
dialer := &libgrpc.QUICDialer{
Rewriter: &onehop.AddressRewriter{
Rewriter: nc.AddressRewriter(nil),
Expand Down Expand Up @@ -376,23 +381,19 @@ func realMain(ctx context.Context) error {
libgrpc.UnaryServerInterceptor(),
libgrpc.DefaultMaxConcurrentStreams(),
)
connectMux := http.NewServeMux()
tcpServer := grpc.NewServer(
libgrpc.UnaryServerInterceptor(),
libgrpc.DefaultMaxConcurrentStreams(),
)
connectInter := http.NewServeMux()
connectIntra := http.NewServeMux()

// Register trust material related handlers.
trustServer := &cstrustgrpc.MaterialServer{
Provider: provider,
IA: topo.IA(),
Requests: libmetrics.NewPromCounter(cstrustmetrics.Handler.Requests),
}
// TODO needs a wrapper here...
//connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(nil))
connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))
cppb.RegisterTrustMaterialServiceServer(quicServer, trustServer)
cppb.RegisterTrustMaterialServiceServer(tcpServer, trustServer)
connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))
connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))
connectIntra.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))

// Handle beaconing.
segmentCreationServer := &beaconinggrpc.SegmentCreationServer{
Expand All @@ -408,7 +409,7 @@ func realMain(ctx context.Context) error {
{
pattern, handler := cpconnect.NewSegmentCreationServiceHandler(beaconingconnect.SegmentCreationServer{SegmentCreationServer: segmentCreationServer})
fmt.Println(pattern)
connectMux.Handle(pattern, handler)
connectInter.Handle(pattern, handler)
}

// Handle segment lookup
Expand Down Expand Up @@ -440,10 +441,10 @@ func realMain(ctx context.Context) error {
}

// Always register a forwarding lookup for AS internal requests.
cppb.RegisterSegmentLookupServiceServer(tcpServer, forwardingLookupServer)
connectIntra.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: forwardingLookupServer}))
if topo.Core() {
cppb.RegisterSegmentLookupServiceServer(quicServer, authLookupServer)
connectMux.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer}))
connectInter.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer}))
}

// Handle segment registration.
Expand All @@ -462,7 +463,7 @@ func realMain(ctx context.Context) error {
Registrations: libmetrics.NewPromCounter(metrics.SegmentRegistrationsTotal),
}
cppb.RegisterSegmentRegistrationServiceServer(quicServer, registrationServer)
connectMux.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer}))
connectInter.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer}))
}

signer := cs.NewSigner(topo.IA(), trustDB, globalCfg.General.ConfigDir)
Expand Down Expand Up @@ -583,8 +584,8 @@ func realMain(ctx context.Context) error {
}

cppb.RegisterChainRenewalServiceServer(quicServer, renewalServer)
connectMux.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
cppb.RegisterChainRenewalServiceServer(tcpServer, renewalServer)
connectInter.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
connectIntra.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
}

// Frequently regenerate signers to catch problems, and update the metrics.
Expand Down Expand Up @@ -639,18 +640,18 @@ func realMain(ctx context.Context) error {
}
dpb.RegisterDiscoveryServiceServer(quicServer, ds)

dsHealth := health.NewServer()
dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(tcpServer, dsHealth)
// dsHealth := health.NewServer()
// dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING)
// healthpb.RegisterHealthServer(tcpServer, dsHealth)

hpCfg := cs.HiddenPathConfigurator{
LocalIA: topo.IA(),
Verifier: verifier,
Signer: signer,
PathDB: pathDB,
Dialer: dialer,
FetcherConfig: fetcherCfg,
IntraASTCPServer: tcpServer,
LocalIA: topo.IA(),
Verifier: verifier,
Signer: signer,
PathDB: pathDB,
Dialer: dialer,
FetcherConfig: fetcherCfg,
//IntraASTCPServer: tcpServer,
InterASQUICServer: quicServer,
}
hpWriterCfg, err := hpCfg.Setup(globalCfg.PS.HiddenPathsCfg)
Expand Down Expand Up @@ -729,18 +730,19 @@ func realMain(ctx context.Context) error {
AllowedSVHostProto: globalCfg.DRKey.Delegation.ToAllowedSet(),
}
cppb.RegisterDRKeyInterServiceServer(quicServer, drkeyService)
cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService)
//cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService)
log.Info("DRKey is enabled")
} else {
log.Info("DRKey is DISABLED by configuration")
}

promgrpc.Register(quicServer)
promgrpc.Register(tcpServer)
// TODO prom middleware
//promgrpc.Register(tcpServer)

var cleanup app.Cleanup
connectServer := http3.Server{
Handler: loggingHandler{connectMux},
Handler: loggingHandler{"inter", connectInter},
}

grpcConns := make(chan quic.Connection)
Expand Down Expand Up @@ -785,14 +787,27 @@ func realMain(ctx context.Context) error {
})
cleanup.Add(func() error { quicServer.GracefulStop(); return nil })

intraServer := http.Server{
Handler: h2c.NewHandler(loggingHandler{"intra", connectIntra}, &http2.Server{}),
}
g.Go(func() error {
defer log.HandlePanic()
if err := tcpServer.Serve(tcpStack); err != nil {
return serrors.WrapStr("serving gRPC/TCP API", err)
tcpListener, err := nc.TCPStack()
if err != nil {
return serrors.WrapStr("initializing TCP stack", err)
}
if err := intraServer.Serve(tcpListener); err != nil {
return serrors.WrapStr("serving connect/TCP API", err)
}
return nil
})
cleanup.Add(func() error {
// TODO: add cleanup.AddCtx ?
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
intraServer.Shutdown(ctx)
return nil
})
cleanup.Add(func() error { tcpServer.GracefulStop(); return nil })

if globalCfg.API.Addr != "" {
r := chi.NewRouter()
Expand Down

0 comments on commit 9b099b6

Please sign in to comment.