diff --git a/control/cmd/control/BUILD.bazel b/control/cmd/control/BUILD.bazel index a37b41e50f..a21fc79314 100644 --- a/control/cmd/control/BUILD.bazel +++ b/control/cmd/control/BUILD.bazel @@ -89,9 +89,9 @@ go_library( "@in_gopkg_yaml_v2//:go_default_library", "@org_go4_netipx//:go_default_library", "@org_golang_google_grpc//:go_default_library", - "@org_golang_google_grpc//health:go_default_library", - "@org_golang_google_grpc//health/grpc_health_v1:go_default_library", "@org_golang_google_grpc//peer:go_default_library", + "@org_golang_x_net//http2:go_default_library", + "@org_golang_x_net//http2/h2c:go_default_library", "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/control/cmd/control/main.go b/control/cmd/control/main.go index ab45373f97..b6a1fb8f99 100644 --- a/control/cmd/control/main.go +++ b/control/cmd/control/main.go @@ -24,11 +24,15 @@ import ( "net" "net/http" _ "net/http/pprof" + "net/netip" "path/filepath" "strings" "sync" "time" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "github.com/go-chi/chi/v5" "github.com/go-chi/cors" promgrpc "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -38,8 +42,6 @@ import ( "go4.org/netipx" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/peer" cpconnect "github.com/scionproto/scion/bufgen/proto/control_plane/v1/control_planeconnect" @@ -115,16 +117,23 @@ import ( trustmetrics "github.com/scionproto/scion/private/trust/metrics" ) -type loggingHandler struct{ next http.Handler } +type loggingHandler struct { + prefix string + next http.Handler +} func (h loggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Println(r.Method) - fmt.Println(r.URL) + fmt.Println(h.prefix, r.Method, r.URL) if addr, ok := r.Context().Value(http3.RemoteAddrContextKey).(net.Addr); ok { log.Info("HTTP3 request", "remote", r.Context().Value(http3.RemoteAddrContextKey)) ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: addr}) r = r.WithContext(ctx) + } else if addrPort, err := netip.ParseAddrPort(r.RemoteAddr); err == nil { + log.Info("HTTP request", "remote", addrPort) + tcpAddr := net.TCPAddrFromAddrPort(addrPort) + ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: tcpAddr}) + r = r.WithContext(ctx) } h.next.ServeHTTP(w, r) } @@ -265,10 +274,6 @@ func realMain(ctx context.Context) error { return serrors.WrapStr("initializing QUIC stack", err) } defer quicStack.RedirectCloser() - tcpStack, err := nc.TCPStack() - if err != nil { - return serrors.WrapStr("initializing TCP stack", err) - } dialer := &libgrpc.QUICDialer{ Rewriter: &onehop.AddressRewriter{ Rewriter: nc.AddressRewriter(nil), @@ -376,11 +381,8 @@ func realMain(ctx context.Context) error { libgrpc.UnaryServerInterceptor(), libgrpc.DefaultMaxConcurrentStreams(), ) - connectMux := http.NewServeMux() - tcpServer := grpc.NewServer( - libgrpc.UnaryServerInterceptor(), - libgrpc.DefaultMaxConcurrentStreams(), - ) + connectInter := http.NewServeMux() + connectIntra := http.NewServeMux() // Register trust material related handlers. trustServer := &cstrustgrpc.MaterialServer{ @@ -388,11 +390,10 @@ func realMain(ctx context.Context) error { IA: topo.IA(), Requests: libmetrics.NewPromCounter(cstrustmetrics.Handler.Requests), } - // TODO needs a wrapper here... - //connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(nil)) + connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) cppb.RegisterTrustMaterialServiceServer(quicServer, trustServer) - cppb.RegisterTrustMaterialServiceServer(tcpServer, trustServer) - connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) + connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) + connectIntra.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) // Handle beaconing. segmentCreationServer := &beaconinggrpc.SegmentCreationServer{ @@ -408,7 +409,7 @@ func realMain(ctx context.Context) error { { pattern, handler := cpconnect.NewSegmentCreationServiceHandler(beaconingconnect.SegmentCreationServer{SegmentCreationServer: segmentCreationServer}) fmt.Println(pattern) - connectMux.Handle(pattern, handler) + connectInter.Handle(pattern, handler) } // Handle segment lookup @@ -440,10 +441,10 @@ func realMain(ctx context.Context) error { } // Always register a forwarding lookup for AS internal requests. - cppb.RegisterSegmentLookupServiceServer(tcpServer, forwardingLookupServer) + connectIntra.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: forwardingLookupServer})) if topo.Core() { cppb.RegisterSegmentLookupServiceServer(quicServer, authLookupServer) - connectMux.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer})) + connectInter.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer})) } // Handle segment registration. @@ -462,7 +463,7 @@ func realMain(ctx context.Context) error { Registrations: libmetrics.NewPromCounter(metrics.SegmentRegistrationsTotal), } cppb.RegisterSegmentRegistrationServiceServer(quicServer, registrationServer) - connectMux.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer})) + connectInter.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer})) } signer := cs.NewSigner(topo.IA(), trustDB, globalCfg.General.ConfigDir) @@ -583,8 +584,8 @@ func realMain(ctx context.Context) error { } cppb.RegisterChainRenewalServiceServer(quicServer, renewalServer) - connectMux.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) - cppb.RegisterChainRenewalServiceServer(tcpServer, renewalServer) + connectInter.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) + connectIntra.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) } // Frequently regenerate signers to catch problems, and update the metrics. @@ -639,18 +640,18 @@ func realMain(ctx context.Context) error { } dpb.RegisterDiscoveryServiceServer(quicServer, ds) - dsHealth := health.NewServer() - dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING) - healthpb.RegisterHealthServer(tcpServer, dsHealth) + // dsHealth := health.NewServer() + // dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING) + // healthpb.RegisterHealthServer(tcpServer, dsHealth) hpCfg := cs.HiddenPathConfigurator{ - LocalIA: topo.IA(), - Verifier: verifier, - Signer: signer, - PathDB: pathDB, - Dialer: dialer, - FetcherConfig: fetcherCfg, - IntraASTCPServer: tcpServer, + LocalIA: topo.IA(), + Verifier: verifier, + Signer: signer, + PathDB: pathDB, + Dialer: dialer, + FetcherConfig: fetcherCfg, + //IntraASTCPServer: tcpServer, InterASQUICServer: quicServer, } hpWriterCfg, err := hpCfg.Setup(globalCfg.PS.HiddenPathsCfg) @@ -729,18 +730,19 @@ func realMain(ctx context.Context) error { AllowedSVHostProto: globalCfg.DRKey.Delegation.ToAllowedSet(), } cppb.RegisterDRKeyInterServiceServer(quicServer, drkeyService) - cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService) + //cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService) log.Info("DRKey is enabled") } else { log.Info("DRKey is DISABLED by configuration") } promgrpc.Register(quicServer) - promgrpc.Register(tcpServer) + // TODO prom middleware + //promgrpc.Register(tcpServer) var cleanup app.Cleanup connectServer := http3.Server{ - Handler: loggingHandler{connectMux}, + Handler: loggingHandler{"inter", connectInter}, } grpcConns := make(chan quic.Connection) @@ -785,14 +787,27 @@ func realMain(ctx context.Context) error { }) cleanup.Add(func() error { quicServer.GracefulStop(); return nil }) + intraServer := http.Server{ + Handler: h2c.NewHandler(loggingHandler{"intra", connectIntra}, &http2.Server{}), + } g.Go(func() error { defer log.HandlePanic() - if err := tcpServer.Serve(tcpStack); err != nil { - return serrors.WrapStr("serving gRPC/TCP API", err) + tcpListener, err := nc.TCPStack() + if err != nil { + return serrors.WrapStr("initializing TCP stack", err) } + if err := intraServer.Serve(tcpListener); err != nil { + return serrors.WrapStr("serving connect/TCP API", err) + } + return nil + }) + cleanup.Add(func() error { + // TODO: add cleanup.AddCtx ? + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + intraServer.Shutdown(ctx) return nil }) - cleanup.Add(func() error { tcpServer.GracefulStop(); return nil }) if globalCfg.API.Addr != "" { r := chi.NewRouter()