From fc6817d06b089d2c302b97a103702f1fc723fba9 Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 27 Dec 2024 17:28:42 +0800 Subject: [PATCH] apiserver: Adding RemoteProxyREST for pass-through any requests Signed-off-by: Iceber Gu --- pkg/kubeapiserver/apiserver.go | 11 +- pkg/kubeapiserver/resource_handler.go | 1 + .../resourcerest/pod/subresource.go | 131 ------------------ pkg/kubeapiserver/resourcerest/proxy/proxy.go | 93 +++++++++++++ .../resourcerest/proxy/subresource.go | 110 +++++++++++++++ 5 files changed, 210 insertions(+), 136 deletions(-) delete mode 100644 pkg/kubeapiserver/resourcerest/pod/subresource.go create mode 100644 pkg/kubeapiserver/resourcerest/proxy/proxy.go create mode 100644 pkg/kubeapiserver/resourcerest/proxy/subresource.go diff --git a/pkg/kubeapiserver/apiserver.go b/pkg/kubeapiserver/apiserver.go index f621a8982..8e0d8e882 100644 --- a/pkg/kubeapiserver/apiserver.go +++ b/pkg/kubeapiserver/apiserver.go @@ -22,7 +22,7 @@ import ( informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery" - podrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/pod" + proxyrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/proxy" "github.com/clusterpedia-io/clusterpedia/pkg/storage" "github.com/clusterpedia-io/clusterpedia/pkg/utils/filters" ) @@ -140,16 +140,17 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) controller := NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters()) - for _, rest := range podrest.GetPodSubresourceRESTs(controller) { + for _, rest := range proxyrest.GetSubresourceRESTs(controller) { restManager.preRegisterSubresource(subresource{ - gr: schema.GroupResource{Group: "", Resource: "pods"}, - kind: "Pod", - namespaced: true, + gr: rest.ParentGroupResource(), + kind: rest.ParentKind(), + namespaced: rest.Namespaced(), name: rest.Subresource(), connecter: rest, }) } + resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, controller) return genericserver, nil } diff --git a/pkg/kubeapiserver/resource_handler.go b/pkg/kubeapiserver/resource_handler.go index 7a8d290b3..d6533c5de 100644 --- a/pkg/kubeapiserver/resource_handler.go +++ b/pkg/kubeapiserver/resource_handler.go @@ -27,6 +27,7 @@ import ( type ResourceHandler struct { minRequestTimeout time.Duration delegate http.Handler + proxy http.Handler rest *RESTManager discovery *discovery.DiscoveryManager diff --git a/pkg/kubeapiserver/resourcerest/pod/subresource.go b/pkg/kubeapiserver/resourcerest/pod/subresource.go deleted file mode 100644 index 4cf260272..000000000 --- a/pkg/kubeapiserver/resourcerest/pod/subresource.go +++ /dev/null @@ -1,131 +0,0 @@ -package pod - -import ( - "context" - "errors" - "net/http" - "net/url" - - "k8s.io/apimachinery/pkg/runtime" - utilnet "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apimachinery/pkg/util/proxy" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit" - genericrequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/registry/rest" - registryrest "k8s.io/apiserver/pkg/registry/rest" - api "k8s.io/kubernetes/pkg/apis/core" - - "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" -) - -type ClusterConnectionGetter interface { - GetClusterDefaultConnection(ctx context.Context, cluster string) (string, http.RoundTripper, error) - GetClusterConnectionWithTLSConfig(ctx context.Context, cluster string) (string, http.RoundTripper, error) -} - -func GetPodSubresourceRESTs(connGetter ClusterConnectionGetter) []*PodSubresourceRemoteProxyREST { - return []*PodSubresourceRemoteProxyREST{ - { - subresource: "attach", - methods: []string{"GET", "POST"}, - upgradeRequired: true, - options: &api.PodAttachOptions{}, - connGetter: connGetter, - }, - { - subresource: "exec", - methods: []string{"GET", "POST"}, - upgradeRequired: true, - options: &api.PodExecOptions{}, - connGetter: connGetter, - }, - { - subresource: "portforward", - methods: []string{"GET", "POST"}, - upgradeRequired: true, - options: &api.PodPortForwardOptions{}, - connGetter: connGetter, - }, - { - subresource: "log", - methods: []string{"GET"}, - upgradeRequired: false, - options: &api.PodLogOptions{}, - connGetter: connGetter, - }, - } -} - -type PodSubresourceRemoteProxyREST struct { - subresource string - methods []string - options runtime.Object - upgradeRequired bool - - connGetter ClusterConnectionGetter -} - -var _ rest.Storage = &PodSubresourceRemoteProxyREST{} -var _ rest.Connecter = &PodSubresourceRemoteProxyREST{} - -func (r *PodSubresourceRemoteProxyREST) Subresource() string { - return r.subresource -} - -func (r *PodSubresourceRemoteProxyREST) New() runtime.Object { - return r.options.DeepCopyObject() -} - -func (r *PodSubresourceRemoteProxyREST) Destroy() { -} - -func (r *PodSubresourceRemoteProxyREST) NewConnectOptions() (runtime.Object, bool, string) { - return r.options.DeepCopyObject(), false, "" -} - -func (r *PodSubresourceRemoteProxyREST) ConnectMethods() []string { - return r.methods -} - -func (r *PodSubresourceRemoteProxyREST) Connect(ctx context.Context, name string, opts runtime.Object, responder registryrest.Responder) (http.Handler, error) { - clusterName := request.ClusterNameValue(ctx) - if clusterName == "" { - return nil, errors.New("missing cluster") - } - - requestInfo, ok := genericrequest.RequestInfoFrom(ctx) - if !ok { - return nil, errors.New("missing RequestInfo") - } - - // TODO(iceber): need disconnect when the cluster authentication information changes - endpoint, transport, err := r.connGetter.GetClusterDefaultConnection(ctx, clusterName) - if err != nil { - return nil, err - } - - target, err := url.ParseRequestURI(endpoint + requestInfo.Path) - if err != nil { - return nil, err - } - target.RawQuery = request.RequestQueryFrom(ctx).Encode() - - proxy := proxy.NewUpgradeAwareHandler(target, transport, false, r.upgradeRequired, proxy.NewErrorResponder(responder)) - return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - r := req.WithContext(req.Context()) - r.Header = utilnet.CloneHeader(req.Header) - if auditID, _ := audit.AuditIDFrom(ctx); auditID != "" { - req.Header.Set(auditinternal.HeaderAuditID, string(auditID)) - } - - proxy.ServeHTTP(rw, req) - - // merge headers - for _, header := range []string{"Cache-Control", auditinternal.HeaderAuditID} { - if vs := rw.Header().Values(header); len(vs) > 1 { - rw.Header().Set(header, vs[0]) - } - } - }), nil -} diff --git a/pkg/kubeapiserver/resourcerest/proxy/proxy.go b/pkg/kubeapiserver/resourcerest/proxy/proxy.go new file mode 100644 index 000000000..9dbca8fca --- /dev/null +++ b/pkg/kubeapiserver/resourcerest/proxy/proxy.go @@ -0,0 +1,93 @@ +package proxy + +import ( + "context" + "errors" + "net/http" + "net/url" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/proxy" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + + "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" +) + +type ClusterConnectionGetter interface { + GetClusterDefaultConnection(ctx context.Context, cluster string) (string, http.RoundTripper, error) + GetClusterConnectionWithTLSConfig(ctx context.Context, cluster string) (string, http.RoundTripper, error) +} + +type RemoteProxyREST struct { + serializer runtime.NegotiatedSerializer + connGetter ClusterConnectionGetter +} + +func NewRemoteProxyREST(serializer runtime.NegotiatedSerializer, connGetter ClusterConnectionGetter) http.Handler { + return &RemoteProxyREST{serializer: serializer, connGetter: connGetter} +} + +func (r *RemoteProxyREST) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + handler, err := proxyConn(req.Context(), r.connGetter, false, r, nil) + if err != nil { + r.Error(rw, req, err) + } + handler.ServeHTTP(rw, req) +} + +func (r *RemoteProxyREST) Error(w http.ResponseWriter, req *http.Request, err error) { + responsewriters.ErrorNegotiated(err, r.serializer, schema.GroupVersion{}, w, req) +} + +func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeRequired bool, responder proxy.ErrorResponder, wrapProxy func(*proxy.UpgradeAwareHandler) http.Handler) (http.Handler, error) { + clusterName := request.ClusterNameValue(ctx) + if clusterName == "" { + return nil, errors.New("missing cluster") + } + + requestInfo, ok := genericrequest.RequestInfoFrom(ctx) + if !ok { + return nil, errors.New("missing RequestInfo") + } + + // TODO(iceber): need disconnect when the cluster authentication information changes + endpoint, transport, err := connGetter.GetClusterDefaultConnection(ctx, clusterName) + if err != nil { + return nil, err + } + + target, err := url.ParseRequestURI(endpoint + requestInfo.Path) + if err != nil { + return nil, err + } + target.RawQuery = request.RequestQueryFrom(ctx).Encode() + + proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder) + proxy.UseLocationHost = true + + var handler http.Handler = proxy + if wrapProxy != nil { + handler = wrapProxy(proxy) + } + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + r := req.WithContext(req.Context()) + r.Header = utilnet.CloneHeader(req.Header) + if auditID, _ := audit.AuditIDFrom(ctx); auditID != "" { + req.Header.Set(auditinternal.HeaderAuditID, string(auditID)) + } + + handler.ServeHTTP(rw, req) + + // merge headers + for _, header := range []string{"Cache-Control", auditinternal.HeaderAuditID} { + if vs := rw.Header().Values(header); len(vs) > 1 { + rw.Header().Set(header, vs[0]) + } + } + }), nil +} diff --git a/pkg/kubeapiserver/resourcerest/proxy/subresource.go b/pkg/kubeapiserver/resourcerest/proxy/subresource.go new file mode 100644 index 000000000..42097fc9c --- /dev/null +++ b/pkg/kubeapiserver/resourcerest/proxy/subresource.go @@ -0,0 +1,110 @@ +package proxy + +import ( + "context" + "net/http" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/apiserver/pkg/registry/rest" + registryrest "k8s.io/apiserver/pkg/registry/rest" + api "k8s.io/kubernetes/pkg/apis/core" +) + +func GetSubresourceRESTs(connGetter ClusterConnectionGetter) []*PodSubresourceRemoteProxyREST { + return []*PodSubresourceRemoteProxyREST{ + { + parent: schema.GroupResource{Group: "", Resource: "pods"}, + parentKind: "Pod", + namespaced: true, + subresource: "attach", + methods: []string{"GET", "POST"}, + upgradeRequired: true, + options: &api.PodAttachOptions{}, + connGetter: connGetter, + }, + { + parent: schema.GroupResource{Group: "", Resource: "pods"}, + parentKind: "Pod", + namespaced: true, + subresource: "exec", + methods: []string{"GET", "POST"}, + upgradeRequired: true, + options: &api.PodExecOptions{}, + connGetter: connGetter, + }, + { + parent: schema.GroupResource{Group: "", Resource: "pods"}, + parentKind: "Pod", + namespaced: true, + subresource: "portforward", + methods: []string{"GET", "POST"}, + upgradeRequired: true, + options: &api.PodPortForwardOptions{}, + connGetter: connGetter, + }, + { + parent: schema.GroupResource{Group: "", Resource: "pods"}, + parentKind: "Pod", + namespaced: true, + subresource: "log", + methods: []string{"GET"}, + upgradeRequired: false, + options: &api.PodLogOptions{}, + connGetter: connGetter, + }, + } +} + +type PodSubresourceRemoteProxyREST struct { + parent schema.GroupResource + namespaced bool + parentKind string + + subresource string + + methods []string + options runtime.Object + + upgradeRequired bool + connGetter ClusterConnectionGetter +} + +var _ rest.Storage = &PodSubresourceRemoteProxyREST{} +var _ rest.Connecter = &PodSubresourceRemoteProxyREST{} + +func (r *PodSubresourceRemoteProxyREST) ParentGroupResource() schema.GroupResource { + return r.parent +} + +func (r *PodSubresourceRemoteProxyREST) ParentKind() string { + return r.parentKind +} + +func (r *PodSubresourceRemoteProxyREST) Namespaced() bool { + return r.namespaced +} + +func (r *PodSubresourceRemoteProxyREST) Subresource() string { + return r.subresource +} + +func (r *PodSubresourceRemoteProxyREST) New() runtime.Object { + return r.options.DeepCopyObject() +} + +func (r *PodSubresourceRemoteProxyREST) Destroy() { +} + +func (r *PodSubresourceRemoteProxyREST) NewConnectOptions() (runtime.Object, bool, string) { + return r.options.DeepCopyObject(), false, "" +} + +func (r *PodSubresourceRemoteProxyREST) ConnectMethods() []string { + return r.methods +} + +func (r *PodSubresourceRemoteProxyREST) Connect(ctx context.Context, name string, opts runtime.Object, responder registryrest.Responder) (http.Handler, error) { + return proxyConn(ctx, r.connGetter, r.upgradeRequired, proxy.NewErrorResponder(responder), nil) +}