Skip to content

Commit

Permalink
hackathon: CS intra AS RPC connect server
Browse files Browse the repository at this point in the history
  • Loading branch information
matzf committed Nov 5, 2023
1 parent debd8aa commit 7d96b05
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 42 deletions.
5 changes: 3 additions & 2 deletions control/cmd/control/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//control/segreq/connect:go_default_library",
"//control/segreq/grpc:go_default_library",
"//control/trust:go_default_library",
"//control/trust/connect:go_default_library",
"//control/trust/grpc:go_default_library",
"//control/trust/metrics:go_default_library",
"//pkg/addr:go_default_library",
Expand Down Expand Up @@ -84,9 +85,9 @@ go_library(
"@in_gopkg_yaml_v2//:go_default_library",
"@org_go4_netipx//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//health:go_default_library",
"@org_golang_google_grpc//health/grpc_health_v1:go_default_library",
"@org_golang_google_grpc//peer:go_default_library",
"@org_golang_x_net//http2:go_default_library",
"@org_golang_x_net//http2/h2c:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
Expand Down
94 changes: 55 additions & 39 deletions control/cmd/control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ import (
"net"
"net/http"
_ "net/http/pprof"
"net/netip"
"path/filepath"
"strings"
"sync"
"time"

"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
promgrpc "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -38,8 +42,6 @@ import (
"go4.org/netipx"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/peer"

cpconnect "github.com/scionproto/scion/bufgen/proto/control_plane/v1/control_planeconnect"
Expand All @@ -62,6 +64,7 @@ import (
segreqconnect "github.com/scionproto/scion/control/segreq/connect"
segreqgrpc "github.com/scionproto/scion/control/segreq/grpc"
cstrust "github.com/scionproto/scion/control/trust"
cstrustconnect "github.com/scionproto/scion/control/trust/connect"
cstrustgrpc "github.com/scionproto/scion/control/trust/grpc"
cstrustmetrics "github.com/scionproto/scion/control/trust/metrics"
"github.com/scionproto/scion/pkg/addr"
Expand Down Expand Up @@ -110,16 +113,23 @@ import (
trustmetrics "github.com/scionproto/scion/private/trust/metrics"
)

type loggingHandler struct{ next http.Handler }
type loggingHandler struct {
prefix string
next http.Handler
}

func (h loggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.Method)
fmt.Println(r.URL)
fmt.Println(h.prefix, r.Method, r.URL)

if addr, ok := r.Context().Value(http3.RemoteAddrContextKey).(net.Addr); ok {
log.Info("HTTP3 request", "remote", r.Context().Value(http3.RemoteAddrContextKey))
ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: addr})
r = r.WithContext(ctx)
} else if addrPort, err := netip.ParseAddrPort(r.RemoteAddr); err == nil {
log.Info("HTTP request", "remote", addrPort)
tcpAddr := net.TCPAddrFromAddrPort(addrPort)
ctx := peer.NewContext(r.Context(), &peer.Peer{Addr: tcpAddr})
r = r.WithContext(ctx)
}
h.next.ServeHTTP(w, r)
}
Expand Down Expand Up @@ -260,10 +270,6 @@ func realMain(ctx context.Context) error {
return serrors.WrapStr("initializing QUIC stack", err)
}
defer quicStack.RedirectCloser()
tcpStack, err := nc.TCPStack()
if err != nil {
return serrors.WrapStr("initializing TCP stack", err)
}
dialer := &libgrpc.QUICDialer{
Rewriter: &onehop.AddressRewriter{
Rewriter: nc.AddressRewriter(nil),
Expand Down Expand Up @@ -344,22 +350,18 @@ func realMain(ctx context.Context) error {
libgrpc.UnaryServerInterceptor(),
libgrpc.DefaultMaxConcurrentStreams(),
)
connectMux := http.NewServeMux()
tcpServer := grpc.NewServer(
libgrpc.UnaryServerInterceptor(),
libgrpc.DefaultMaxConcurrentStreams(),
)
connectInter := http.NewServeMux()
connectIntra := http.NewServeMux()

// Register trust material related handlers.
trustServer := &cstrustgrpc.MaterialServer{
Provider: provider,
IA: topo.IA(),
Requests: libmetrics.NewPromCounter(cstrustmetrics.Handler.Requests),
}
// TODO needs a wrapper here...
//connectMux.Handle(cpconnect.NewTrustMaterialServiceHandler(nil))
connectInter.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))
cppb.RegisterTrustMaterialServiceServer(quicServer, trustServer)
cppb.RegisterTrustMaterialServiceServer(tcpServer, trustServer)
connectIntra.Handle(cpconnect.NewTrustMaterialServiceHandler(cstrustconnect.MaterialServer{MaterialServer: trustServer}))

// Handle beaconing.
segmentCreationServer := &beaconinggrpc.SegmentCreationServer{
Expand All @@ -375,7 +377,7 @@ func realMain(ctx context.Context) error {
{
pattern, handler := cpconnect.NewSegmentCreationServiceHandler(beaconingconnect.SegmentCreationServer{SegmentCreationServer: segmentCreationServer})
fmt.Println(pattern)
connectMux.Handle(pattern, handler)
connectInter.Handle(pattern, handler)
}

// Handle segment lookup
Expand Down Expand Up @@ -407,10 +409,10 @@ func realMain(ctx context.Context) error {
}

// Always register a forwarding lookup for AS internal requests.
cppb.RegisterSegmentLookupServiceServer(tcpServer, forwardingLookupServer)
connectIntra.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: forwardingLookupServer}))
if topo.Core() {
cppb.RegisterSegmentLookupServiceServer(quicServer, authLookupServer)
connectMux.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer}))
connectInter.Handle(cpconnect.NewSegmentLookupServiceHandler(segreqconnect.LookupServer{LookupServer: authLookupServer}))
}

// Handle segment registration.
Expand All @@ -429,7 +431,7 @@ func realMain(ctx context.Context) error {
Registrations: libmetrics.NewPromCounter(metrics.SegmentRegistrationsTotal),
}
cppb.RegisterSegmentRegistrationServiceServer(quicServer, registrationServer)
connectMux.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer}))
connectInter.Handle(cpconnect.NewSegmentRegistrationServiceHandler(segregconnect.RegistrationServer{RegistrationServer: registrationServer}))
}

signer := cs.NewSigner(topo.IA(), trustDB, globalCfg.General.ConfigDir)
Expand Down Expand Up @@ -550,8 +552,8 @@ func realMain(ctx context.Context) error {
}

cppb.RegisterChainRenewalServiceServer(quicServer, renewalServer)
connectMux.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
cppb.RegisterChainRenewalServiceServer(tcpServer, renewalServer)
connectInter.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
connectIntra.Handle(cpconnect.NewChainRenewalServiceHandler(renewalconnect.RenewalServer{RenewalServer: renewalServer}))
}

// Frequently regenerate signers to catch problems, and update the metrics.
Expand Down Expand Up @@ -606,18 +608,18 @@ func realMain(ctx context.Context) error {
}
dpb.RegisterDiscoveryServiceServer(quicServer, ds)

dsHealth := health.NewServer()
dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(tcpServer, dsHealth)
// dsHealth := health.NewServer()
// dsHealth.SetServingStatus("discovery", healthpb.HealthCheckResponse_SERVING)
// healthpb.RegisterHealthServer(tcpServer, dsHealth)

hpCfg := cs.HiddenPathConfigurator{
LocalIA: topo.IA(),
Verifier: verifier,
Signer: signer,
PathDB: pathDB,
Dialer: dialer,
FetcherConfig: fetcherCfg,
IntraASTCPServer: tcpServer,
LocalIA: topo.IA(),
Verifier: verifier,
Signer: signer,
PathDB: pathDB,
Dialer: dialer,
FetcherConfig: fetcherCfg,
//IntraASTCPServer: tcpServer,
InterASQUICServer: quicServer,
}
hpWriterCfg, err := hpCfg.Setup(globalCfg.PS.HiddenPathsCfg)
Expand Down Expand Up @@ -696,18 +698,19 @@ func realMain(ctx context.Context) error {
AllowedSVHostProto: globalCfg.DRKey.Delegation.ToAllowedSet(),
}
cppb.RegisterDRKeyInterServiceServer(quicServer, drkeyService)
cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService)
//cppb.RegisterDRKeyIntraServiceServer(tcpServer, drkeyService)
log.Info("DRKey is enabled")
} else {
log.Info("DRKey is DISABLED by configuration")
}

promgrpc.Register(quicServer)
promgrpc.Register(tcpServer)
// TODO prom middleware
//promgrpc.Register(tcpServer)

var cleanup app.Cleanup
connectServer := http3.Server{
Handler: loggingHandler{connectMux},
Handler: loggingHandler{"inter", connectInter},
}

grpcConns := make(chan quic.Connection)
Expand Down Expand Up @@ -752,14 +755,27 @@ func realMain(ctx context.Context) error {
})
cleanup.Add(func() error { quicServer.GracefulStop(); return nil })

intraServer := http.Server{
Handler: h2c.NewHandler(loggingHandler{"intra", connectIntra}, &http2.Server{}),
}
g.Go(func() error {
defer log.HandlePanic()
if err := tcpServer.Serve(tcpStack); err != nil {
return serrors.WrapStr("serving gRPC/TCP API", err)
tcpListener, err := nc.TCPStack()
if err != nil {
return serrors.WrapStr("initializing TCP stack", err)
}
if err := intraServer.Serve(tcpListener); err != nil {
return serrors.WrapStr("serving connect/TCP API", err)
}
return nil
})
cleanup.Add(func() error {
// TODO: add cleanup.AddCtx ?
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
intraServer.Shutdown(ctx)
return nil
})
cleanup.Add(func() error { tcpServer.GracefulStop(); return nil })

if globalCfg.API.Addr != "" {
r := chi.NewRouter()
Expand Down
2 changes: 1 addition & 1 deletion control/trust/connect/material.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var _ control_planeconnect.TrustMaterialServiceHandler = MaterialServer{}

type MaterialServer struct {
grpc.MaterialServer
*grpc.MaterialServer
}

func (m MaterialServer) Chains(ctx context.Context, req *connect.Request[control_plane.ChainsRequest]) (*connect.Response[control_plane.ChainsResponse], error) {
Expand Down

0 comments on commit 7d96b05

Please sign in to comment.