Skip to content

Commit

Permalink
Fix parsing Kubernetes Resources without GroupVersionKind (#38956)
Browse files Browse the repository at this point in the history
* Fix parsing Kubernetes Resources without `GroupVersionKind`

Kubernetes API has a concept of `GroupVersionKind` where each payload
includes the type identification `(group, version, kind)` that uniquely
identifies the resource withing Kubernetes itself.

Kubernetes API and `kubectl` always honor this information but custom
tools and tools that use equivalents of `json.Marshal` without setting
the `GVK` data fail to be decoded when decoding `CREATE` requests
targeted to support kinds.

This PR aims to fix that by providing an hint to the Kubernetes decoder
that aims to be used as fallback GVK when unmarshaling a payload without
the specification.

The hint is provided by the API request path that Teleport extracts from
the request URI.

Fixes #37972

Signed-off-by: Tiago Silva <[email protected]>

* handle review comments

---------

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Mar 10, 2024
1 parent 13ceef6 commit 1e77bb3
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 28 deletions.
2 changes: 1 addition & 1 deletion lib/kube/proxy/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ current-context: foo
require.Empty(t, cmp.Diff(fwd.clusterDetails, tt.want,
cmp.AllowUnexported(staticKubeCreds{}),
cmp.AllowUnexported(kubeDetails{}),
cmpopts.IgnoreFields(kubeDetails{}, "rwMu", "kubeCodecs", "wg", "cancelFunc"),
cmpopts.IgnoreFields(kubeDetails{}, "rwMu", "kubeCodecs", "wg", "cancelFunc", "gvkSupportedResources"),
cmp.Comparer(func(a, b *transport.Config) bool { return (a == nil) == (b == nil) }),
cmp.Comparer(func(a, b *tls.Config) bool { return true }),
cmp.Comparer(func(a, b *kubernetes.Clientset) bool { return (a == nil) == (b == nil) }),
Expand Down
42 changes: 32 additions & 10 deletions lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package proxy
import (
"context"
"encoding/base64"
"strings"
"sync"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -52,7 +54,7 @@ type kubeDetails struct {
// kubeCluster is the dynamic kube_cluster or a static generated from kubeconfig and that only has the name populated.
kubeCluster types.KubeCluster

// rwMu is the mutex to protect the kubeCodecs and rbacSupportedTypes.
// rwMu is the mutex to protect the kubeCodecs, gvkSupportedResources, and rbacSupportedTypes.
rwMu sync.RWMutex
// kubeCodecs is the codec factory for the cluster resources.
// The codec factory includes the default resources and the namespaced resources
Expand All @@ -64,6 +66,9 @@ type kubeDetails struct {
// The list is updated periodically to include the latest custom resources
// that are added to the cluster.
rbacSupportedTypes rbacSupportedResources
// gvkSupportedResources is the list of registered API path resources and their
// GVK definition.
gvkSupportedResources gvkSupportedResources
// isClusterOffline is true if the cluster is offline.
// An offline cluster will not be able to serve any requests until it comes back online.
// The cluster is marked as offline if the cluster schema cannot be created
Expand Down Expand Up @@ -121,7 +126,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
}
var isClusterOffline bool
// Create the codec factory and the list of supported types for RBAC.
codecFactory, rbacSupportedTypes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.")
// If the cluster is offline, we will not be able to create the codec factory
Expand All @@ -133,13 +138,14 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe

ctx, cancel := context.WithCancel(ctx)
k := &kubeDetails{
kubeCreds: creds,
dynamicLabels: dynLabels,
kubeCluster: cfg.cluster,
kubeCodecs: codecFactory,
rbacSupportedTypes: rbacSupportedTypes,
cancelFunc: cancel,
isClusterOffline: isClusterOffline,
kubeCreds: creds,
dynamicLabels: dynLabels,
kubeCluster: cfg.cluster,
kubeCodecs: codecFactory,
rbacSupportedTypes: rbacSupportedTypes,
cancelFunc: cancel,
isClusterOffline: isClusterOffline,
gvkSupportedResources: gvkSupportedRes,
}

k.wg.Add(1)
Expand All @@ -153,7 +159,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
case <-ctx.Done():
return
case <-ticker.Chan():
codecFactory, rbacSupportedTypes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
codecFactory, rbacSupportedTypes, gvkSupportedResources, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Error("Failed to update cluster schema")
continue
Expand All @@ -162,6 +168,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
k.rwMu.Lock()
k.kubeCodecs = codecFactory
k.rbacSupportedTypes = rbacSupportedTypes
k.gvkSupportedResources = gvkSupportedResources
k.isClusterOffline = false
k.rwMu.Unlock()
}
Expand Down Expand Up @@ -193,6 +200,21 @@ func (k *kubeDetails) getClusterSupportedResources() (*serializer.CodecFactory,
return &(k.kubeCodecs), k.rbacSupportedTypes, nil
}

// getObjectGVK returns the default GVK (if any) registered for the specified request path.
func (k *kubeDetails) getObjectGVK(resource apiResource) *schema.GroupVersionKind {
k.rwMu.RLock()
defer k.rwMu.RUnlock()
// kube doesn't use core but teleport does.
if resource.apiGroup == "core" {
resource.apiGroup = ""
}
return k.gvkSupportedResources[gvkSupportedResourcesKey{
name: strings.Split(resource.resourceKind, "/")[0],
apiGroup: resource.apiGroup,
version: resource.apiGroupVersion,
}]
}

// getKubeClusterCredentials generates kube credentials for dynamic clusters.
func getKubeClusterCredentials(ctx context.Context, cfg clusterDetailsConfig) (kubeCreds, error) {
dynCredsCfg := dynamicCredsConfig{kubeCluster: cfg.cluster, log: cfg.log, checker: cfg.checker, resourceMatchers: cfg.resourceMatchers, clock: cfg.clock, component: cfg.component}
Expand Down
4 changes: 3 additions & 1 deletion lib/kube/proxy/resource_deletecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (f *Forwarder) handleDeleteCollectionReq(req *http.Request, sess *clusterSe
req.Body.Close()

// decode memory rw body.
obj, err := decodeAndSetGVK(decoder, memWriter.Buffer().Bytes())
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
obj, err := decodeAndSetGVK(decoder, memWriter.Buffer().Bytes(), nil /* defaults GVK */)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}
Expand Down
15 changes: 11 additions & 4 deletions lib/kube/proxy/resource_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -602,7 +603,9 @@ func (d *resourceFilterer) decode(buffer []byte) (runtime.Object, []byte, error)
// Logic from: https://github.com/kubernetes/client-go/blob/58ff029093df37cad9fa28778a37f11fa495d9cf/rest/request.go#L1040
return nil, buffer, nil
default:
out, err := decodeAndSetGVK(d.decoder, buffer)
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
out, err := decodeAndSetGVK(d.decoder, buffer, nil /* defaults GVK */)
return out, nil, trace.Wrap(err)
}
}
Expand All @@ -616,7 +619,9 @@ func (d *resourceFilterer) decodePartialObjectMetadata(row *metav1.TableRow) err
}
var err error
// decode only if row.Object.Object was not decoded before.
row.Object.Object, err = decodeAndSetGVK(d.decoder, row.Object.Raw)
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
row.Object.Object, err = decodeAndSetGVK(d.decoder, row.Object.Raw, nil /* defaults GVK */)
return trace.Wrap(err)
}

Expand Down Expand Up @@ -729,8 +734,10 @@ func newEncoderAndDecoderForContentType(contentType string, negotiator runtime.C

// decodeAndSetGVK decodes the payload into the appropriate type using the decoder
// provider and sets the GVK if available.
func decodeAndSetGVK(decoder runtime.Decoder, payload []byte) (runtime.Object, error) {
obj, gvk, err := decoder.Decode(payload, nil, nil)
// defaults is the fallback GVK used by the decoder if the payload doesn't set their
// own GVK.
func decodeAndSetGVK(decoder runtime.Decoder, payload []byte, defaults *schema.GroupVersionKind) (runtime.Object, error) {
obj, gvk, err := decoder.Decode(payload, defaults, nil)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func Test_filterBuffer(t *testing.T) {
if row.Object.Object == nil {
var err error
// decode only if row.Object.Object was not decoded before.
row.Object.Object, err = decodeAndSetGVK(decoder, row.Object.Raw)
row.Object.Object, err = decodeAndSetGVK(decoder, row.Object.Raw, nil)
require.NoError(t, err)
}

Expand Down
36 changes: 30 additions & 6 deletions lib/kube/proxy/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,28 @@ func newClientNegotiator(codecFactory *serializer.CodecFactory) runtime.ClientNe
)
}

// gvkSupportedResourcesKey is the key used in gvkSupportedResources
// to map from a parsed API path to the corresponding resource GVK.
type gvkSupportedResourcesKey struct {
name string
apiGroup string
version string
}

// gvkSupportedResources maps a parsed API path to the corresponding resource GVK.
type gvkSupportedResources map[gvkSupportedResourcesKey]*schema.GroupVersionKind

// newClusterSchemaBuilder creates a new schema builder for the given cluster.
// This schema includes all well-known Kubernetes types and all namespaced
// custom resources.
// It also returns a map of resources that we support RBAC restrictions for.
func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface) (serializer.CodecFactory, rbacSupportedResources, error) {
func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface) (serializer.CodecFactory, rbacSupportedResources, gvkSupportedResources, error) {
kubeScheme := runtime.NewScheme()
kubeCodecs := serializer.NewCodecFactory(kubeScheme)
supportedResources := maps.Clone(defaultRBACResources)

gvkSupportedRes := make(gvkSupportedResources)
if err := registerDefaultKubeTypes(kubeScheme); err != nil {
return serializer.CodecFactory{}, nil, trace.Wrap(err)
return serializer.CodecFactory{}, nil, nil, trace.Wrap(err)
}
// discoveryErr is returned when the discovery of one or more API groups fails.
var discoveryErr *discovery.ErrGroupDiscoveryFailed
Expand All @@ -126,17 +137,30 @@ func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface
// available in the cluster.
log.WithError(err).Debugf("Failed to discover some API groups: %v", maps.Keys(discoveryErr.Groups))
case err != nil:
return serializer.CodecFactory{}, nil, trace.Wrap(err)
return serializer.CodecFactory{}, nil, nil, trace.Wrap(err)
}

for _, apiGroup := range apiGroups {
group, version := getKubeAPIGroupAndVersion(apiGroup.GroupVersion)

for _, apiResource := range apiGroup.APIResources {
// register all types
gvkSupportedRes[gvkSupportedResourcesKey{
name: apiResource.Name, /* pods, configmaps, ... */
apiGroup: group,
version: version,
}] = &schema.GroupVersionKind{
Group: group,
Version: version,
Kind: apiResource.Kind, /* Pod, ConfigMap ...*/
}
}

// Skip well-known Kubernetes API groups because they are already registered
// in the scheme.
if _, ok := knownKubernetesGroups[group]; ok {
continue
}

groupVersion := schema.GroupVersion{Group: group, Version: version}
for _, apiResource := range apiGroup.APIResources {
// Skip cluster-scoped resources because we don't support RBAC restrictions
Expand Down Expand Up @@ -177,7 +201,7 @@ func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface
}
}

return kubeCodecs, supportedResources, nil
return kubeCodecs, supportedResources, gvkSupportedRes, nil
}

// getKubeAPIGroupAndVersion returns the API group and version from the given
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// TestNewClusterSchemaBuilder tests that newClusterSchemaBuilder doesn't panic
// when it's given types already registered in the global scheme.
func Test_newClusterSchemaBuilder(t *testing.T) {
_, _, err := newClusterSchemaBuilder(logrus.StandardLogger(), &clientSet{})
_, _, _, err := newClusterSchemaBuilder(logrus.StandardLogger(), &clientSet{})
require.NoError(t, err)
}

Expand Down
3 changes: 2 additions & 1 deletion lib/kube/proxy/self_subject_reviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ func parseSelfSubjectAccessReviewRequest(decoder runtime.Decoder, req *http.Requ
req.Body.Close()

req.Body = io.NopCloser(bytes.NewReader(payload))
obj, err := decodeAndSetGVK(decoder, payload)
gvk := authv1.SchemeGroupVersion.WithKind("SelfSubjectAccessReview")
obj, err := decodeAndSetGVK(decoder, payload, &gvk)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
11 changes: 8 additions & 3 deletions lib/kube/proxy/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -238,7 +239,7 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types
case apiResource.resourceName == "" && verb == types.KubeVerbCreate:
// If the request is a create request, extract the resource name from the request body.
var err error
if apiResource.resourceName, err = extractResourceNameFromPostRequest(req, codecFactory); err != nil {
if apiResource.resourceName, err = extractResourceNameFromPostRequest(req, codecFactory, kubeDetails.getObjectGVK(apiResource)); err != nil {
return nil, apiResource, trace.Wrap(err)
}
}
Expand All @@ -256,7 +257,11 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types
// and decodes it into a Kubernetes object. It then extracts the resource name
// from the object.
// The body is then reset to the original request body using a new buffer.
func extractResourceNameFromPostRequest(req *http.Request, codecs *serializer.CodecFactory) (string, error) {
func extractResourceNameFromPostRequest(
req *http.Request,
codecs *serializer.CodecFactory,
defaults *schema.GroupVersionKind,
) (string, error) {
if req.Body == nil {
return "", trace.BadParameter("request body is empty")
}
Expand All @@ -279,7 +284,7 @@ func extractResourceNameFromPostRequest(req *http.Request, codecs *serializer.Co
}
req.Body = io.NopCloser(newBody)
// decode memory rw body.
obj, err := decodeAndSetGVK(decoder, newBody.Bytes())
obj, err := decodeAndSetGVK(decoder, newBody.Bytes(), defaults)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down
Loading

0 comments on commit 1e77bb3

Please sign in to comment.