Skip to content

Commit

Permalink
support grpc auth for grpc server. (#168)
Browse files Browse the repository at this point in the history
* support grpc auth.

Signed-off-by: morvencao <[email protected]>

* address comments.

Signed-off-by: morvencao <[email protected]>

---------

Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao authored Sep 5, 2024
1 parent 681b895 commit 908246a
Show file tree
Hide file tree
Showing 18 changed files with 789 additions and 101 deletions.
29 changes: 29 additions & 0 deletions cmd/maestro/environments/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"github.com/getsentry/sentry-go"
"github.com/golang/glog"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/client/grpcauthorizer"
"github.com/openshift-online/maestro/pkg/client/ocm"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/errors"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
)
Expand Down Expand Up @@ -196,6 +200,31 @@ func (e *Env) LoadClients() error {
}
}

// Create GRPC authorizer based on configuration
if e.Config.GRPCServer.EnableGRPCServer {
if e.Config.GRPCServer.GRPCAuthNType == "mock" {
glog.Infof("Using Mock GRPC Authorizer")
e.Clients.GRPCAuthorizer = grpcauthorizer.NewMockGRPCAuthorizer()
} else {
kubeConfig, err := clientcmd.BuildConfigFromFlags("", e.Config.GRPCServer.GRPCAuthorizerConfig)
if err != nil {
glog.Warningf("Unable to create kube client config: %s", err.Error())
// fallback to in-cluster config
kubeConfig, err = rest.InClusterConfig()
if err != nil {
glog.Errorf("Unable to create kube client config: %s", err.Error())
return err
}
}
kubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
glog.Errorf("Unable to create kube client: %s", err.Error())
return err
}
e.Clients.GRPCAuthorizer = grpcauthorizer.NewKubeGRPCAuthorizer(kubeClient)
}
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/maestro/environments/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift-online/maestro/pkg/auth"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/client/grpcauthorizer"
"github.com/openshift-online/maestro/pkg/client/ocm"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/db"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Services struct {

type Clients struct {
OCM *ocm.Client
GRPCAuthorizer grpcauthorizer.GRPCAuthorizer
CloudEventsSource cloudevents.SourceClient
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/maestro/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,8 @@ func NewAPIServer(eventBroadcaster *event.EventBroadcaster) Server {
Handler: mainHandler,
}

// TODO: support authn and authz for gRPC
if env().Config.GRPCServer.EnableGRPCServer {
s.grpcServer = NewGRPCServer(env().Services.Resources(), eventBroadcaster, *env().Config.GRPCServer)
s.grpcServer = NewGRPCServer(env().Services.Resources(), eventBroadcaster, *env().Config.GRPCServer, env().Clients.GRPCAuthorizer)
}
return s
}
Expand Down
172 changes: 172 additions & 0 deletions cmd/maestro/server/auth_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package server

import (
"context"
"fmt"
"strings"

"github.com/golang/glog"
"github.com/openshift-online/maestro/pkg/client/grpcauthorizer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

// Context key type defined to avoid collisions in other pkgs using context
// See https://golang.org/pkg/context/#WithValue
type contextKey string

const (
contextUserKey contextKey = "user"
contextGroupsKey contextKey = "groups"
)

func newContextWithIdentity(ctx context.Context, user string, groups []string) context.Context {
ctx = context.WithValue(ctx, contextUserKey, user)
return context.WithValue(ctx, contextGroupsKey, groups)
}

// identityFromCertificate retrieves the user and groups from the client certificate if they are present.
func identityFromCertificate(ctx context.Context) (string, []string, error) {
p, ok := peer.FromContext(ctx)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "no peer found")
}

tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
}

if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

if tlsAuth.State.VerifiedChains[0][0] == nil {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

user := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
groups := tlsAuth.State.VerifiedChains[0][0].Subject.Organization

if user == "" {
return "", nil, status.Error(codes.Unauthenticated, "could not find user in peer certificate")
}

if len(groups) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not find group in peer certificate")
}

return user, groups, nil
}

// identityFromToken retrieves the user and groups from the access token if they are present.
func identityFromToken(ctx context.Context, grpcAuthorizer grpcauthorizer.GRPCAuthorizer) (string, []string, error) {
// Extract the metadata from the context
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", nil, status.Error(codes.InvalidArgument, "missing metadata")
}

// Extract the access token from the metadata
authorization, ok := md["authorization"]
if !ok || len(authorization) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "invalid token")
}

token := strings.TrimPrefix(authorization[0], "Bearer ")
// Extract the user and groups from the access token
return grpcAuthorizer.TokenReview(ctx, token)
}

// newAuthUnaryInterceptor creates a unary interceptor that retrieves the user and groups
// based on the specified authentication type. It supports retrieving from either the access
// token or the client certificate depending on the provided authNType.
// The interceptor then adds the retrieved identity information (user and groups) to the
// context and invokes the provided handler.
func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAuthorizer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
var user string
var groups []string
var err error
switch authNType {
case "token":
user, groups, err = identityFromToken(ctx, authorizer)
if err != nil {
glog.Errorf("unable to get user and groups from token: %v", err)
return nil, err
}
case "mtls":
user, groups, err = identityFromCertificate(ctx)
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return nil, err
}
default:
return nil, fmt.Errorf("unsupported authentication type %s", authNType)
}

// call the handler with the new context containing the user and groups
return handler(newContextWithIdentity(ctx, user, groups), req)
}
}

// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and
// a custom context containing the user and groups derived from the client certificate
// specified in the incoming RPC metadata
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
return w.ctx
}

func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s, ctx}
}

// newAuthStreamInterceptor creates a stream interceptor that retrieves the user and groups
// based on the specified authentication type. It supports retrieving from either the access
// token or the client certificate depending on the provided authNType.
// The interceptor then adds the retrieved identity information (user and groups) to the
// context and invokes the provided handler.
func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAuthorizer) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
var user string
var groups []string
var err error
switch authNType {
case "token":
user, groups, err = identityFromToken(ss.Context(), authorizer)
if err != nil {
glog.Errorf("unable to get user and groups from token: %v", err)
return err
}
case "mtls":
user, groups, err = identityFromCertificate(ss.Context())
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return err
}
default:
return fmt.Errorf("unsupported authentication Type %s", authNType)
}

return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss))
}
}
27 changes: 4 additions & 23 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/golang/glog"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -74,26 +73,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
MaxConnectionAge: config.MaxConnectionAge,
}))

if config.EnableTLS {
// Check tls cert and key path path
if config.TLSCertFile == "" || config.TLSKeyFile == "" {
check(
fmt.Errorf("unspecified required --grpc-tls-cert-file, --grpc-tls-key-file"),
"Can't start gRPC broker",
)
}

// Serve with TLS
creds, err := credentials.NewServerTLSFromFile(config.TLSCertFile, config.TLSKeyFile)
if err != nil {
glog.Fatalf("Failed to generate credentials %v", err)
}
grpcServerOptions = append(grpcServerOptions, grpc.Creds(creds))
glog.Infof("Serving gRPC broker with TLS at %s", config.BrokerBindPort)
} else {
glog.Infof("Serving gRPC broker without TLS at %s", config.BrokerBindPort)
}

glog.Infof("Serving gRPC broker without TLS at %s", config.BrokerBindPort)
sessionFactory := env().Database.SessionFactory
return &GRPCBroker{
grpcServer: grpc.NewServer(grpcServerOptions...),
Expand All @@ -109,14 +89,15 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {

// Start starts the gRPC broker
func (bkr *GRPCBroker) Start(ctx context.Context) {
glog.Info("Starting gRPC broker")
lis, err := net.Listen("tcp", bkr.bindAddress)
if err != nil {
glog.Fatalf("failed to listen: %v", err)
check(fmt.Errorf("failed to listen: %v", err), "Can't start gRPC broker")
}
pbv1.RegisterCloudEventServiceServer(bkr.grpcServer, bkr)
go func() {
if err := bkr.grpcServer.Serve(lis); err != nil {
glog.Fatalf("failed to start gRPC broker: %v", err)
check(fmt.Errorf("failed to serve gRPC broker: %v", err), "Can't start gRPC broker")
}
}()

Expand Down
Loading

0 comments on commit 908246a

Please sign in to comment.