diff --git a/control/cmd/control/BUILD.bazel b/control/cmd/control/BUILD.bazel index 4b0abe0435..65e85d1b88 100644 --- a/control/cmd/control/BUILD.bazel +++ b/control/cmd/control/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//control/segreq/connect:go_default_library", "//control/segreq/grpc:go_default_library", "//control/trust:go_default_library", + "//control/trust/connect:go_default_library", "//control/trust/grpc:go_default_library", "//control/trust/metrics:go_default_library", "//pkg/addr:go_default_library", @@ -84,9 +85,9 @@ go_library( "@in_gopkg_yaml_v2//:go_default_library", "@org_go4_netipx//:go_default_library", "@org_golang_google_grpc//:go_default_library", - "@org_golang_google_grpc//health:go_default_library", - "@org_golang_google_grpc//health/grpc_health_v1:go_default_library", "@org_golang_google_grpc//peer:go_default_library", + "@org_golang_x_net//http2:go_default_library", + "@org_golang_x_net//http2/h2c:go_default_library", "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/control/cmd/control/main.go b/control/cmd/control/main.go index 20c06e5b6d..0f509bbf4e 100644 --- a/control/cmd/control/main.go +++ b/control/cmd/control/main.go @@ -24,11 +24,15 @@ import ( "net" "net/http" _ "net/http/pprof" + "net/netip" "path/filepath" "strings" "sync" "time" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "github.com/go-chi/chi/v5" "github.com/go-chi/cors" promgrpc "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -38,8 +42,6 @@ import ( "go4.org/netipx" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/peer" cpconnect "github.com/scionproto/scion/bufgen/proto/control_plane/v1/control_planeconnect" @@ -62,6 +64,7 @@ import ( segreqconnect "github.com/scionproto/scion/control/segreq/connect" segreqgrpc "github.com/scionproto/scion/control/segreq/grpc" cstrust "github.com/scionproto/scion/control/trust" + cstrustconnect "github.com/scionproto/scion/control/trust/connect" cstrustgrpc "github.com/scionproto/scion/control/trust/grpc" cstrustmetrics "github.com/scionproto/scion/control/trust/metrics" "github.com/scionproto/scion/pkg/addr" @@ -110,16 +113,23 @@ import ( trustmetrics "github.com/scionproto/scion/private/trust/metrics" ) -type loggingHandler struct{ next http.Handler } +type loggingHandler struct { + prefix string + next http.Handler +} func (h loggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Println(r.Method) - fmt.Println(r.URL) + fmt.Println(h.prefix, r.Method, r.URL) if addr, ok := r.Context().Value(http3.RemoteAddrContextKey).(net.Addr); ok { log.Info("HTTP3 request", "remote", r.Context().Value(http3.RemoteAddrContextKey)) ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: addr}) r = r.WithContext(ctx) + } else if addrPort, err := netip.ParseAddrPort(r.RemoteAddr); err == nil { + log.Info("HTTP request", "remote", addrPort) + tcpAddr := net.TCPAddrFromAddrPort(addrPort) + ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: tcpAddr}) + r = r.WithContext(ctx) } h.next.ServeHTTP(w, r) } @@ -260,10 +270,6 @@ func realMain(ctx context.Context) error { return serrors.WrapStr("initializing QUIC stack", err) } defer quicStack.RedirectCloser() - tcpStack, err := nc.TCPStack() - if err != nil { - return serrors.WrapStr("initializing TCP stack", err) - } dialer := &libgrpc.QUICDialer{ Rewriter: &onehop.AddressRewriter{ Rewriter: nc.AddressRewriter(nil), @@ -344,11 +350,8 @@ func realMain(ctx context.Context) error { libgrpc.UnaryServerInterceptor(), libgrpc.DefaultMaxConcurrentStreams(), ) - connectMux := http.NewServeMux() - tcpServer := grpc.NewServer( - libgrpc.UnaryServerInterceptor(), - libgrpc.DefaultMaxConcurrentStreams(), - ) + connectInter := http.NewServeMux() + connectIntra := http.NewServeMux() // Register trust material related handlers. trustServer := &cstrustgrpc.MaterialServer{ @@ -356,10 +359,9 @@ func realMain(ctx context.Context) error { IA: topo.IA(), Requests: libmetrics.NewPromCounter(cstrustmetrics.Handler.Requests), } - // TODO needs a wrapper here... - //connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(nil)) + connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) cppb.RegisterTrustMaterialServiceServer(quicServer, trustServer) - cppb.RegisterTrustMaterialServiceServer(tcpServer, trustServer) + connectIntra.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer})) // Handle beaconing. segmentCreationServer := &beaconinggrpc.SegmentCreationServer{ @@ -375,7 +377,7 @@ func realMain(ctx context.Context) error { { pattern, handler := cpconnect.NewSegmentCreationServiceHandler(beaconingconnect.SegmentCreationServer{SegmentCreationServer: segmentCreationServer}) fmt.Println(pattern) - connectMux.Handle(pattern, handler) + connectInter.Handle(pattern, handler) } // Handle segment lookup @@ -407,10 +409,10 @@ func realMain(ctx context.Context) error { } // Always register a forwarding lookup for AS internal requests. - cppb.RegisterSegmentLookupServiceServer(tcpServer, forwardingLookupServer) + connectIntra.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: forwardingLookupServer})) if topo.Core() { cppb.RegisterSegmentLookupServiceServer(quicServer, authLookupServer) - connectMux.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer})) + connectInter.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer})) } // Handle segment registration. @@ -429,7 +431,7 @@ func realMain(ctx context.Context) error { Registrations: libmetrics.NewPromCounter(metrics.SegmentRegistrationsTotal), } cppb.RegisterSegmentRegistrationServiceServer(quicServer, registrationServer) - connectMux.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer})) + connectInter.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer})) } signer := cs.NewSigner(topo.IA(), trustDB, globalCfg.General.ConfigDir) @@ -550,8 +552,8 @@ func realMain(ctx context.Context) error { } cppb.RegisterChainRenewalServiceServer(quicServer, renewalServer) - connectMux.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) - cppb.RegisterChainRenewalServiceServer(tcpServer, renewalServer) + connectInter.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) + connectIntra.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer})) } // Frequently regenerate signers to catch problems, and update the metrics. @@ -606,18 +608,18 @@ func realMain(ctx context.Context) error { } dpb.RegisterDiscoveryServiceServer(quicServer, ds) - dsHealth := health.NewServer() - dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING) - healthpb.RegisterHealthServer(tcpServer, dsHealth) + // dsHealth := health.NewServer() + // dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING) + // healthpb.RegisterHealthServer(tcpServer, dsHealth) hpCfg := cs.HiddenPathConfigurator{ - LocalIA: topo.IA(), - Verifier: verifier, - Signer: signer, - PathDB: pathDB, - Dialer: dialer, - FetcherConfig: fetcherCfg, - IntraASTCPServer: tcpServer, + LocalIA: topo.IA(), + Verifier: verifier, + Signer: signer, + PathDB: pathDB, + Dialer: dialer, + FetcherConfig: fetcherCfg, + //IntraASTCPServer: tcpServer, InterASQUICServer: quicServer, } hpWriterCfg, err := hpCfg.Setup(globalCfg.PS.HiddenPathsCfg) @@ -696,18 +698,19 @@ func realMain(ctx context.Context) error { AllowedSVHostProto: globalCfg.DRKey.Delegation.ToAllowedSet(), } cppb.RegisterDRKeyInterServiceServer(quicServer, drkeyService) - cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService) + //cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService) log.Info("DRKey is enabled") } else { log.Info("DRKey is DISABLED by configuration") } promgrpc.Register(quicServer) - promgrpc.Register(tcpServer) + // TODO prom middleware + //promgrpc.Register(tcpServer) var cleanup app.Cleanup connectServer := http3.Server{ - Handler: loggingHandler{connectMux}, + Handler: loggingHandler{"inter", connectInter}, } grpcConns := make(chan quic.Connection) @@ -752,14 +755,27 @@ func realMain(ctx context.Context) error { }) cleanup.Add(func() error { quicServer.GracefulStop(); return nil }) + intraServer := http.Server{ + Handler: h2c.NewHandler(loggingHandler{"intra", connectIntra}, &http2.Server{}), + } g.Go(func() error { defer log.HandlePanic() - if err := tcpServer.Serve(tcpStack); err != nil { - return serrors.WrapStr("serving gRPC/TCP API", err) + tcpListener, err := nc.TCPStack() + if err != nil { + return serrors.WrapStr("initializing TCP stack", err) } + if err := intraServer.Serve(tcpListener); err != nil { + return serrors.WrapStr("serving connect/TCP API", err) + } + return nil + }) + cleanup.Add(func() error { + // TODO: add cleanup.AddCtx ? + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + intraServer.Shutdown(ctx) return nil }) - cleanup.Add(func() error { tcpServer.GracefulStop(); return nil }) if globalCfg.API.Addr != "" { r := chi.NewRouter() diff --git a/control/trust/connect/material.go b/control/trust/connect/material.go index 34b33768db..cb54648196 100644 --- a/control/trust/connect/material.go +++ b/control/trust/connect/material.go @@ -12,7 +12,7 @@ import ( var _ control_planeconnect.TrustMaterialServiceHandler = MaterialServer{} type MaterialServer struct { - grpc.MaterialServer + *grpc.MaterialServer } func (m MaterialServer) Chains(ctx context.Context, req *connect.Request[control_plane.ChainsRequest]) (*connect.Response[control_plane.ChainsResponse], error) {