Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiserver: Adding RemoteProxyREST for pass-through any requests #716

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery"
podrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/pod"
proxyrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/proxy"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
)
Expand Down Expand Up @@ -140,16 +140,17 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)

controller := NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())

for _, rest := range podrest.GetPodSubresourceRESTs(controller) {
for _, rest := range proxyrest.GetSubresourceRESTs(controller) {
restManager.preRegisterSubresource(subresource{
gr: schema.GroupResource{Group: "", Resource: "pods"},
kind: "Pod",
namespaced: true,
gr: rest.ParentGroupResource(),
kind: rest.ParentKind(),
namespaced: rest.Namespaced(),

name: rest.Subresource(),
connecter: rest,
})
}
resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, controller)
return genericserver, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kubeapiserver/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type ResourceHandler struct {
minRequestTimeout time.Duration
delegate http.Handler
proxy http.Handler

rest *RESTManager
discovery *discovery.DiscoveryManager
Expand Down
131 changes: 0 additions & 131 deletions pkg/kubeapiserver/resourcerest/pod/subresource.go

This file was deleted.

93 changes: 93 additions & 0 deletions pkg/kubeapiserver/resourcerest/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package proxy

import (
"context"
"errors"
"net/http"
"net/url"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"

"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
)

type ClusterConnectionGetter interface {
GetClusterDefaultConnection(ctx context.Context, cluster string) (string, http.RoundTripper, error)
GetClusterConnectionWithTLSConfig(ctx context.Context, cluster string) (string, http.RoundTripper, error)
}

type RemoteProxyREST struct {
serializer runtime.NegotiatedSerializer
connGetter ClusterConnectionGetter
}

func NewRemoteProxyREST(serializer runtime.NegotiatedSerializer, connGetter ClusterConnectionGetter) http.Handler {
return &RemoteProxyREST{serializer: serializer, connGetter: connGetter}
}

func (r *RemoteProxyREST) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
handler, err := proxyConn(req.Context(), r.connGetter, false, r, nil)
if err != nil {
r.Error(rw, req, err)
}
handler.ServeHTTP(rw, req)
}

func (r *RemoteProxyREST) Error(w http.ResponseWriter, req *http.Request, err error) {
responsewriters.ErrorNegotiated(err, r.serializer, schema.GroupVersion{}, w, req)
}

func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeRequired bool, responder proxy.ErrorResponder, wrapProxy func(*proxy.UpgradeAwareHandler) http.Handler) (http.Handler, error) {
clusterName := request.ClusterNameValue(ctx)
if clusterName == "" {
return nil, errors.New("missing cluster")
}

requestInfo, ok := genericrequest.RequestInfoFrom(ctx)
if !ok {
return nil, errors.New("missing RequestInfo")
}

// TODO(iceber): need disconnect when the cluster authentication information changes
endpoint, transport, err := connGetter.GetClusterDefaultConnection(ctx, clusterName)
if err != nil {
return nil, err
}

target, err := url.ParseRequestURI(endpoint + requestInfo.Path)
if err != nil {
return nil, err
}
target.RawQuery = request.RequestQueryFrom(ctx).Encode()

proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder)
proxy.UseLocationHost = true

var handler http.Handler = proxy
if wrapProxy != nil {
handler = wrapProxy(proxy)
}
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
r := req.WithContext(req.Context())
r.Header = utilnet.CloneHeader(req.Header)
if auditID, _ := audit.AuditIDFrom(ctx); auditID != "" {
req.Header.Set(auditinternal.HeaderAuditID, string(auditID))
}

handler.ServeHTTP(rw, req)

// merge headers
for _, header := range []string{"Cache-Control", auditinternal.HeaderAuditID} {
if vs := rw.Header().Values(header); len(vs) > 1 {
rw.Header().Set(header, vs[0])
}
}
}), nil
}
110 changes: 110 additions & 0 deletions pkg/kubeapiserver/resourcerest/proxy/subresource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package proxy

import (
"context"
"net/http"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/registry/rest"
registryrest "k8s.io/apiserver/pkg/registry/rest"
api "k8s.io/kubernetes/pkg/apis/core"
)

func GetSubresourceRESTs(connGetter ClusterConnectionGetter) []*PodSubresourceRemoteProxyREST {
return []*PodSubresourceRemoteProxyREST{
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "attach",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodAttachOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "exec",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodExecOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "portforward",
methods: []string{"GET", "POST"},
upgradeRequired: true,
options: &api.PodPortForwardOptions{},
connGetter: connGetter,
},
{
parent: schema.GroupResource{Group: "", Resource: "pods"},
parentKind: "Pod",
namespaced: true,
subresource: "log",
methods: []string{"GET"},
upgradeRequired: false,
options: &api.PodLogOptions{},
connGetter: connGetter,
},
}
}

type PodSubresourceRemoteProxyREST struct {
parent schema.GroupResource
namespaced bool
parentKind string

subresource string

methods []string
options runtime.Object

upgradeRequired bool
connGetter ClusterConnectionGetter
}

var _ rest.Storage = &PodSubresourceRemoteProxyREST{}
var _ rest.Connecter = &PodSubresourceRemoteProxyREST{}

func (r *PodSubresourceRemoteProxyREST) ParentGroupResource() schema.GroupResource {
return r.parent
}

func (r *PodSubresourceRemoteProxyREST) ParentKind() string {
return r.parentKind
}

func (r *PodSubresourceRemoteProxyREST) Namespaced() bool {
return r.namespaced
}

func (r *PodSubresourceRemoteProxyREST) Subresource() string {
return r.subresource
}

func (r *PodSubresourceRemoteProxyREST) New() runtime.Object {
return r.options.DeepCopyObject()
}

func (r *PodSubresourceRemoteProxyREST) Destroy() {
}

func (r *PodSubresourceRemoteProxyREST) NewConnectOptions() (runtime.Object, bool, string) {
return r.options.DeepCopyObject(), false, ""
}

func (r *PodSubresourceRemoteProxyREST) ConnectMethods() []string {
return r.methods
}

func (r *PodSubresourceRemoteProxyREST) Connect(ctx context.Context, name string, opts runtime.Object, responder registryrest.Responder) (http.Handler, error) {
return proxyConn(ctx, r.connGetter, r.upgradeRequired, proxy.NewErrorResponder(responder), nil)
}
Loading