Skip to content

Commit

Permalink
feat: add initial implementation of serializers and forwarders (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek authored Aug 5, 2022
1 parent 08eee9b commit 6f99136
Show file tree
Hide file tree
Showing 26 changed files with 523 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ linters:
- godot
- gofmt
- goimports
- golint
- revive
- gomnd
- gosec
- govet
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,35 @@
Toolkit for telemetry data for [Kong's][kong] Kubernetes products, such as the
[Kong Kubernetes Ingress Controller (KIC)][kic].

## Usage

### Forwarders

Forwarders can be used to forward serialized telemetry reports to a particular destination.

- `TLSForwarder` can be used to forward data to a TLS endpoint
- `LogForwarder` can be used to forward data to a configured logger instance
- `DiscardForwarder` can be used to discard received reports

### Serializers

Users can pick the serializer of their choice for data serialization.

Currently only 1 serializer is supported with more implementations to come as needed.

#### Semicolon delimited values

This serializer uses the following predefined keys to express telemetry data:

- `k8s_arch` - inferred kubernetes cluster architecture
- `k8sv` - inferred kubernetes cluster version
- `k8sv_semver` - inferred kubernetes cluster version in [semver format][semver]
- `k8s_provider` - inferred kubernetes cluster provider
- `k8s_pods_count` - number of pods running in the cluster
- `k8s_services_count` - number of services defined in the cluster
- `hn` - hostname where this telemetry framework is running on
- `feature-<NAME>` - feature gate (with the boolean state indicated whether enabled or disabled)

[kong]:https://github.com/kong
[kic]:https://github.com/kong/kubernetes-ingress-controller
[semver]:https://semver.org/
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
sigs.k8s.io/controller-runtime v0.12.3
)

require github.com/spf13/pflag v1.0.5 // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions pkg/forwarders/discardforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package forwarders

type discardForwarder struct{}

// NewDiscardForwarder creates a new discardForwarder which discards all received
// data.
func NewDiscardForwarder() *discardForwarder {
return &discardForwarder{}
}

func (df *discardForwarder) Name() string {
return "DiscardForwarder"
}

func (df *discardForwarder) Forward(payload []byte) error {
return nil
}
26 changes: 26 additions & 0 deletions pkg/forwarders/logforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package forwarders

import (
"github.com/go-logr/logr"
)

type logForwarder struct {
log logr.Logger
}

// NewLogForwarder creates new logForwarded which uses the provided logger to
// print all the received data.
func NewLogForwarder(log logr.Logger) *logForwarder {
return &logForwarder{
log: log,
}
}

func (lf *logForwarder) Name() string {
return "LogForwarder"
}

func (lf *logForwarder) Forward(payload []byte) error {
lf.log.Info("got a report", "report", payload)
return nil
}
83 changes: 83 additions & 0 deletions pkg/forwarders/tlsforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package forwarders

import (
"crypto/tls"
"fmt"
"net"
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
)

const (
defaultTimeout = time.Second * 30
defaultDeadline = time.Minute
)

var tlsConf = tls.Config{
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)

type tlsForwarder struct {
log logr.Logger
conn *tls.Conn
}

// NewTLSForwarder creates a TLS forwarder which forwards received serialized reports
// to a TLS endpoint specified by the provided address.
func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
conn, err := tls.DialWithDialer(
&net.Dialer{
Timeout: defaultTimeout,
},
"tcp",
address,
&tlsConf,
)
if err != nil {
log.V(DebugLevel).Info("failed to connect to reporting server", "error", err)
return nil
}

return &tlsForwarder{
log: log,
conn: conn,
}
}

func (tf *tlsForwarder) Name() string {
return "TLSForwarder"
}

func (tf *tlsForwarder) Forward(payload []byte) error {
err := tf.conn.SetDeadline(time.Now().Add(defaultDeadline))
if err != nil {
return fmt.Errorf("failed to set report connection deadline: %w", err)
}

_, err = tf.conn.Write(payload)
if err != nil {
return fmt.Errorf("failed to send report: %w", err)
}
return nil
}
13 changes: 10 additions & 3 deletions pkg/provider/k8scloudproviderprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

const (
// ClusterProviderKey is report key under which the cluster provider will be provided.
ClusterProviderKey = "k8s-provider"
ClusterProviderKey = ReportKey("k8s_provider")
// ClusterProviderKind represents cluster provider kind.
ClusterProviderKind = Kind(ClusterProviderKey)
)
Expand All @@ -24,6 +24,8 @@ const (
ClusterProviderGKE = ClusterProvider("GKE")
// ClusterProviderAWS identifies Amazon's AWS cluster provider.
ClusterProviderAWS = ClusterProvider("AWS")
// ClusterProviderKubernetesInDocker identifies kind (kubernetes in docker) as cluster provider.
ClusterProviderKubernetesInDocker = ClusterProvider("kind")
// ClusterProviderUnknown represents an unknown cluster provider.
ClusterProviderUnknown = ClusterProvider("UNKNOWN")
)
Expand Down Expand Up @@ -116,8 +118,9 @@ func getClusterProviderFromNodes(nodeList *corev1.NodeList) (ClusterProvider, bo
func getClusterProviderFromNodesProviderID(nodeList *corev1.NodeList) (ClusterProvider, bool) {
const (
// Nodes on GKE are provided by GCE (Google Compute Engine)
providerIDPrefixGKE = "gce"
providerIDPrefixAWS = "aws"
providerIDPrefixGKE = "gce"
providerIDPrefixAWS = "aws"
providerIDPrefixKind = "kind"
)
d := make(clusterProviderDistribution)
for _, n := range nodeList.Items {
Expand All @@ -129,6 +132,10 @@ func getClusterProviderFromNodesProviderID(nodeList *corev1.NodeList) (ClusterPr
d[ClusterProviderAWS]++
continue
}
if strings.HasPrefix(n.Spec.ProviderID, providerIDPrefixKind) {
d[ClusterProviderKubernetesInDocker]++
continue
}
}
if p, ok := getMostCommonClusterProviderFromDistribution(d); ok {
return p, true
Expand Down
27 changes: 26 additions & 1 deletion pkg/provider/k8scloudproviderprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestClusterProvider(t *testing.T) {
return clientgo_fake.NewSimpleClientset(
&corev1.Node{
Spec: corev1.NodeSpec{
ProviderID: "providerID: aws:///eu-west-1b/i-0fa11111111111111",
ProviderID: "aws:///eu-west-1b/i-0fa11111111111111",
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
Expand Down Expand Up @@ -250,6 +250,31 @@ func TestClusterProvider(t *testing.T) {
},
expected: ClusterProviderAWS,
},
// kind
{
name: "kind gets inferred from provider ID prefix",
clientFunc: func() *clientgo_fake.Clientset {
return clientgo_fake.NewSimpleClientset(
&corev1.Node{
Spec: corev1.NodeSpec{
ProviderID: "kind://docker/kong/kong-control-plane",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"beta.kubernetes.io/arch": "arm64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "arm64",
"kubernetes.io/hostname": "kong-control-plane",
"kubernetes.io/os": "linux",
"node-role.kubernetes.io/control-plane": "",
"node.kubernetes.io/exclude-from-external-load-balancers": "",
},
},
},
)
},
expected: ClusterProviderKubernetesInDocker,
},
}

for _, tc := range testcases {
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/k8sclusterarchprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

const (
// ClusterArchKey is report key under which cluster architecture will be provided.
ClusterArchKey = "k8s-cluster-arch"
ClusterArchKey = ReportKey("k8s_arch")
// ClusterArchKind represents cluster arch provider kind.
ClusterArchKind = Kind(ClusterArchKey)
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/k8sclusterversionprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
const (
// ClusterVersionKey is the report key under which cluster k8s version will
// be provided as returned by the /version API.
ClusterVersionKey = "k8s-cluster-version"
ClusterVersionKey = ReportKey("k8sv")
// ClusterVersionSemverKey is the report key under which cluster k8s semver
// version will be provided.
ClusterVersionSemverKey = "k8s-cluster-version-semver"
ClusterVersionSemverKey = ReportKey("k8sv_semver")
// ClusterVersionKind represents cluster version provider kind.
ClusterVersionKind = Kind(ClusterVersionKey)
)
Expand Down
6 changes: 4 additions & 2 deletions pkg/provider/k8sobjectcountprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

Expand All @@ -12,6 +13,7 @@ import (
//
// Example: Use {Group: "", Version: "v1", Resource: "pods"} to get a Provider that counts all Pods in the cluster.
type k8sObjectCount struct {
gvk schema.GroupVersionResource
resource dynamic.NamespaceableResourceInterface

base
Expand All @@ -34,7 +36,7 @@ func (k *k8sObjectCount) Provide(ctx context.Context) (Report, error) {
Continue: continueToken,
})
if err != nil {
return Report{}, err
return Report{}, k.WrapError(err)
}

count += len(list.Items)
Expand All @@ -44,6 +46,6 @@ func (k *k8sObjectCount) Provide(ctx context.Context) (Report, error) {
}

return Report{
k.Name(): count,
ReportKey("k8s_" + k.gvk.Resource + "_count"): count,
}, nil
}
20 changes: 13 additions & 7 deletions pkg/provider/k8spodscountprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
const (
// PodCountKey is report key under which the number of pods in the cluster
// will be provided.
PodCountKey = "k8s-pod-count"
PodCountKey = ReportKey("k8s_pods_count")
// PodCountKind represents the pod count provider kind.
PodCountKind = Kind(PodCountKey)
)
Expand All @@ -17,12 +17,18 @@ const (
// configured k8s cluster - using the provided client - to get a pod count from
// the cluster.
func NewK8sPodCountProvider(name string, d dynamic.Interface) (Provider, error) {
gvk := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}

return &k8sObjectCount{
resource: d.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}),
base: base{name: name, kind: PodCountKind},
resource: d.Resource(gvk),
gvk: gvk,
base: base{
name: name,
kind: PodCountKind,
},
}, nil
}
20 changes: 13 additions & 7 deletions pkg/provider/k8sservicecountprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
const (
// ServiceCountKey is report key under which the number of services in the cluster
// will be provided.
ServiceCountKey = "k8s-service-count"
ServiceCountKey = ReportKey("k8s_services_count")
// ServiceCountKind represents the service count provider kind.
ServiceCountKind = Kind(ServiceCountKey)
)
Expand All @@ -17,12 +17,18 @@ const (
// configured k8s cluster - using the provided client - to get a service count from
// the cluster.
func NewK8sServiceCountProvider(name string, d dynamic.Interface) (Provider, error) {
gvk := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
}

return &k8sObjectCount{
resource: d.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
}),
base: base{name: name, kind: PodCountKind},
resource: d.Resource(gvk),
gvk: gvk,
base: base{
name: name,
kind: ServiceCountKind,
},
}, nil
}
5 changes: 4 additions & 1 deletion pkg/provider/report.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package provider

// ReportKey represents a key type for providers' reports.
type ReportKey string

// Report represents a report from a provider.
type Report map[string]any
type Report map[ReportKey]any

// Merge merges the report with a different report overriding already existing
// entries if there's a collision.
Expand Down
Loading

0 comments on commit 6f99136

Please sign in to comment.