diff --git a/.golangci.yaml b/.golangci.yaml index 44003e7..3bd6af0 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -15,7 +15,7 @@ linters: - godot - gofmt - goimports - - golint + - revive - gomnd - gosec - govet diff --git a/README.md b/README.md index 0b1f430..c4e65c3 100644 --- a/README.md +++ b/README.md @@ -3,5 +3,35 @@ Toolkit for telemetry data for [Kong's][kong] Kubernetes products, such as the [Kong Kubernetes Ingress Controller (KIC)][kic]. +## Usage + +### Forwarders + +Forwarders can be used to forward serialized telemetry reports to a particular destination. + +- `TLSForwarder` can be used to forward data to a TLS endpoint +- `LogForwarder` can be used to forward data to a configured logger instance +- `DiscardForwarder` can be used to discard received reports + +### Serializers + +Users can pick the serializer of their choice for data serialization. + +Currently only 1 serializer is supported with more implementations to come as needed. + +#### Semicolon delimited values + +This serializer uses the following predefined keys to express telemetry data: + +- `k8s_arch` - inferred kubernetes cluster architecture +- `k8sv` - inferred kubernetes cluster version +- `k8sv_semver` - inferred kubernetes cluster version in [semver format][semver] +- `k8s_provider` - inferred kubernetes cluster provider +- `k8s_pods_count` - number of pods running in the cluster +- `k8s_services_count` - number of services defined in the cluster +- `hn` - hostname where this telemetry framework is running on +- `feature-` - feature gate (with the boolean state indicated whether enabled or disabled) + [kong]:https://github.com/kong [kic]:https://github.com/kong/kubernetes-ingress-controller +[semver]:https://semver.org/ diff --git a/go.mod b/go.mod index 743db62..1b81907 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( sigs.k8s.io/controller-runtime v0.12.3 ) +require github.com/spf13/pflag v1.0.5 // indirect + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.8.0 // indirect diff --git a/pkg/forwarders/discardforwarder.go b/pkg/forwarders/discardforwarder.go new file mode 100644 index 0000000..c762015 --- /dev/null +++ b/pkg/forwarders/discardforwarder.go @@ -0,0 +1,17 @@ +package forwarders + +type discardForwarder struct{} + +// NewDiscardForwarder creates a new discardForwarder which discards all received +// data. +func NewDiscardForwarder() *discardForwarder { + return &discardForwarder{} +} + +func (df *discardForwarder) Name() string { + return "DiscardForwarder" +} + +func (df *discardForwarder) Forward(payload []byte) error { + return nil +} diff --git a/pkg/forwarders/logforwarder.go b/pkg/forwarders/logforwarder.go new file mode 100644 index 0000000..3dc8feb --- /dev/null +++ b/pkg/forwarders/logforwarder.go @@ -0,0 +1,26 @@ +package forwarders + +import ( + "github.com/go-logr/logr" +) + +type logForwarder struct { + log logr.Logger +} + +// NewLogForwarder creates new logForwarded which uses the provided logger to +// print all the received data. +func NewLogForwarder(log logr.Logger) *logForwarder { + return &logForwarder{ + log: log, + } +} + +func (lf *logForwarder) Name() string { + return "LogForwarder" +} + +func (lf *logForwarder) Forward(payload []byte) error { + lf.log.Info("got a report", "report", payload) + return nil +} diff --git a/pkg/forwarders/tlsforwarder.go b/pkg/forwarders/tlsforwarder.go new file mode 100644 index 0000000..30f1253 --- /dev/null +++ b/pkg/forwarders/tlsforwarder.go @@ -0,0 +1,83 @@ +package forwarders + +import ( + "crypto/tls" + "fmt" + "net" + "time" + + "github.com/go-logr/logr" + "github.com/sirupsen/logrus" +) + +const ( + defaultTimeout = time.Second * 30 + defaultDeadline = time.Minute +) + +var tlsConf = tls.Config{ + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, +} + +// TODO: Address logging levels and library to be used. +// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893 +const ( + logrusrDiff = 4 + + // InfoLevel is the converted logging level from logrus to go-logr for + // information level logging. Note that the logrusr middleware technically + // flattens all levels prior to this level into this level as well. + InfoLevel = int(logrus.InfoLevel) - logrusrDiff + + // DebugLevel is the converted logging level from logrus to go-logr for + // debug level logging. + DebugLevel = int(logrus.DebugLevel) - logrusrDiff + + // WarnLevel is the converted logrus level to go-logr for warnings. + WarnLevel = int(logrus.WarnLevel) - logrusrDiff +) + +type tlsForwarder struct { + log logr.Logger + conn *tls.Conn +} + +// NewTLSForwarder creates a TLS forwarder which forwards received serialized reports +// to a TLS endpoint specified by the provided address. +func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder { + conn, err := tls.DialWithDialer( + &net.Dialer{ + Timeout: defaultTimeout, + }, + "tcp", + address, + &tlsConf, + ) + if err != nil { + log.V(DebugLevel).Info("failed to connect to reporting server", "error", err) + return nil + } + + return &tlsForwarder{ + log: log, + conn: conn, + } +} + +func (tf *tlsForwarder) Name() string { + return "TLSForwarder" +} + +func (tf *tlsForwarder) Forward(payload []byte) error { + err := tf.conn.SetDeadline(time.Now().Add(defaultDeadline)) + if err != nil { + return fmt.Errorf("failed to set report connection deadline: %w", err) + } + + _, err = tf.conn.Write(payload) + if err != nil { + return fmt.Errorf("failed to send report: %w", err) + } + return nil +} diff --git a/pkg/provider/k8scloudproviderprovider.go b/pkg/provider/k8scloudproviderprovider.go index f34d7cf..e5a0aa2 100644 --- a/pkg/provider/k8scloudproviderprovider.go +++ b/pkg/provider/k8scloudproviderprovider.go @@ -11,7 +11,7 @@ import ( const ( // ClusterProviderKey is report key under which the cluster provider will be provided. - ClusterProviderKey = "k8s-provider" + ClusterProviderKey = ReportKey("k8s_provider") // ClusterProviderKind represents cluster provider kind. ClusterProviderKind = Kind(ClusterProviderKey) ) @@ -24,6 +24,8 @@ const ( ClusterProviderGKE = ClusterProvider("GKE") // ClusterProviderAWS identifies Amazon's AWS cluster provider. ClusterProviderAWS = ClusterProvider("AWS") + // ClusterProviderKubernetesInDocker identifies kind (kubernetes in docker) as cluster provider. + ClusterProviderKubernetesInDocker = ClusterProvider("kind") // ClusterProviderUnknown represents an unknown cluster provider. ClusterProviderUnknown = ClusterProvider("UNKNOWN") ) @@ -116,8 +118,9 @@ func getClusterProviderFromNodes(nodeList *corev1.NodeList) (ClusterProvider, bo func getClusterProviderFromNodesProviderID(nodeList *corev1.NodeList) (ClusterProvider, bool) { const ( // Nodes on GKE are provided by GCE (Google Compute Engine) - providerIDPrefixGKE = "gce" - providerIDPrefixAWS = "aws" + providerIDPrefixGKE = "gce" + providerIDPrefixAWS = "aws" + providerIDPrefixKind = "kind" ) d := make(clusterProviderDistribution) for _, n := range nodeList.Items { @@ -129,6 +132,10 @@ func getClusterProviderFromNodesProviderID(nodeList *corev1.NodeList) (ClusterPr d[ClusterProviderAWS]++ continue } + if strings.HasPrefix(n.Spec.ProviderID, providerIDPrefixKind) { + d[ClusterProviderKubernetesInDocker]++ + continue + } } if p, ok := getMostCommonClusterProviderFromDistribution(d); ok { return p, true diff --git a/pkg/provider/k8scloudproviderprovider_test.go b/pkg/provider/k8scloudproviderprovider_test.go index af362a1..abeed8d 100644 --- a/pkg/provider/k8scloudproviderprovider_test.go +++ b/pkg/provider/k8scloudproviderprovider_test.go @@ -153,7 +153,7 @@ func TestClusterProvider(t *testing.T) { return clientgo_fake.NewSimpleClientset( &corev1.Node{ Spec: corev1.NodeSpec{ - ProviderID: "providerID: aws:///eu-west-1b/i-0fa11111111111111", + ProviderID: "aws:///eu-west-1b/i-0fa11111111111111", }, ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -250,6 +250,31 @@ func TestClusterProvider(t *testing.T) { }, expected: ClusterProviderAWS, }, + // kind + { + name: "kind gets inferred from provider ID prefix", + clientFunc: func() *clientgo_fake.Clientset { + return clientgo_fake.NewSimpleClientset( + &corev1.Node{ + Spec: corev1.NodeSpec{ + ProviderID: "kind://docker/kong/kong-control-plane", + }, + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "beta.kubernetes.io/arch": "arm64", + "beta.kubernetes.io/os": "linux", + "kubernetes.io/arch": "arm64", + "kubernetes.io/hostname": "kong-control-plane", + "kubernetes.io/os": "linux", + "node-role.kubernetes.io/control-plane": "", + "node.kubernetes.io/exclude-from-external-load-balancers": "", + }, + }, + }, + ) + }, + expected: ClusterProviderKubernetesInDocker, + }, } for _, tc := range testcases { diff --git a/pkg/provider/k8sclusterarchprovider.go b/pkg/provider/k8sclusterarchprovider.go index ee2de96..0d310c7 100644 --- a/pkg/provider/k8sclusterarchprovider.go +++ b/pkg/provider/k8sclusterarchprovider.go @@ -10,7 +10,7 @@ import ( const ( // ClusterArchKey is report key under which cluster architecture will be provided. - ClusterArchKey = "k8s-cluster-arch" + ClusterArchKey = ReportKey("k8s_arch") // ClusterArchKind represents cluster arch provider kind. ClusterArchKind = Kind(ClusterArchKey) ) diff --git a/pkg/provider/k8sclusterversionprovider.go b/pkg/provider/k8sclusterversionprovider.go index 41b0fc1..b7c656b 100644 --- a/pkg/provider/k8sclusterversionprovider.go +++ b/pkg/provider/k8sclusterversionprovider.go @@ -13,10 +13,10 @@ import ( const ( // ClusterVersionKey is the report key under which cluster k8s version will // be provided as returned by the /version API. - ClusterVersionKey = "k8s-cluster-version" + ClusterVersionKey = ReportKey("k8sv") // ClusterVersionSemverKey is the report key under which cluster k8s semver // version will be provided. - ClusterVersionSemverKey = "k8s-cluster-version-semver" + ClusterVersionSemverKey = ReportKey("k8sv_semver") // ClusterVersionKind represents cluster version provider kind. ClusterVersionKind = Kind(ClusterVersionKey) ) diff --git a/pkg/provider/k8sobjectcountprovider.go b/pkg/provider/k8sobjectcountprovider.go index a263fdf..cbe9d73 100644 --- a/pkg/provider/k8sobjectcountprovider.go +++ b/pkg/provider/k8sobjectcountprovider.go @@ -4,6 +4,7 @@ import ( "context" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" ) @@ -12,6 +13,7 @@ import ( // // Example: Use {Group: "", Version: "v1", Resource: "pods"} to get a Provider that counts all Pods in the cluster. type k8sObjectCount struct { + gvk schema.GroupVersionResource resource dynamic.NamespaceableResourceInterface base @@ -34,7 +36,7 @@ func (k *k8sObjectCount) Provide(ctx context.Context) (Report, error) { Continue: continueToken, }) if err != nil { - return Report{}, err + return Report{}, k.WrapError(err) } count += len(list.Items) @@ -44,6 +46,6 @@ func (k *k8sObjectCount) Provide(ctx context.Context) (Report, error) { } return Report{ - k.Name(): count, + ReportKey("k8s_" + k.gvk.Resource + "_count"): count, }, nil } diff --git a/pkg/provider/k8spodscountprovider.go b/pkg/provider/k8spodscountprovider.go index c54a654..dc9fd5d 100644 --- a/pkg/provider/k8spodscountprovider.go +++ b/pkg/provider/k8spodscountprovider.go @@ -8,7 +8,7 @@ import ( const ( // PodCountKey is report key under which the number of pods in the cluster // will be provided. - PodCountKey = "k8s-pod-count" + PodCountKey = ReportKey("k8s_pods_count") // PodCountKind represents the pod count provider kind. PodCountKind = Kind(PodCountKey) ) @@ -17,12 +17,18 @@ const ( // configured k8s cluster - using the provided client - to get a pod count from // the cluster. func NewK8sPodCountProvider(name string, d dynamic.Interface) (Provider, error) { + gvk := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + } + return &k8sObjectCount{ - resource: d.Resource(schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "pods", - }), - base: base{name: name, kind: PodCountKind}, + resource: d.Resource(gvk), + gvk: gvk, + base: base{ + name: name, + kind: PodCountKind, + }, }, nil } diff --git a/pkg/provider/k8sservicecountprovider.go b/pkg/provider/k8sservicecountprovider.go index 8406278..1e9f95d 100644 --- a/pkg/provider/k8sservicecountprovider.go +++ b/pkg/provider/k8sservicecountprovider.go @@ -8,7 +8,7 @@ import ( const ( // ServiceCountKey is report key under which the number of services in the cluster // will be provided. - ServiceCountKey = "k8s-service-count" + ServiceCountKey = ReportKey("k8s_services_count") // ServiceCountKind represents the service count provider kind. ServiceCountKind = Kind(ServiceCountKey) ) @@ -17,12 +17,18 @@ const ( // configured k8s cluster - using the provided client - to get a service count from // the cluster. func NewK8sServiceCountProvider(name string, d dynamic.Interface) (Provider, error) { + gvk := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + } + return &k8sObjectCount{ - resource: d.Resource(schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "services", - }), - base: base{name: name, kind: PodCountKind}, + resource: d.Resource(gvk), + gvk: gvk, + base: base{ + name: name, + kind: ServiceCountKind, + }, }, nil } diff --git a/pkg/provider/report.go b/pkg/provider/report.go index 817201c..f0cfe90 100644 --- a/pkg/provider/report.go +++ b/pkg/provider/report.go @@ -1,7 +1,10 @@ package provider +// ReportKey represents a key type for providers' reports. +type ReportKey string + // Report represents a report from a provider. -type Report map[string]any +type Report map[ReportKey]any // Merge merges the report with a different report overriding already existing // entries if there's a collision. diff --git a/pkg/provider/uptimeprovider.go b/pkg/provider/uptimeprovider.go new file mode 100644 index 0000000..8f65eae --- /dev/null +++ b/pkg/provider/uptimeprovider.go @@ -0,0 +1,22 @@ +package provider + +import ( + "time" +) + +// NewUptimeProvider provides new uptime provider which will return uptime counted +// since the provider creation time. +func NewUptimeProvider(name string) (Provider, error) { + start := time.Now() + return &functor{ + f: func() (Report, error) { + return Report{ + "uptime": int(time.Since(start).Truncate(time.Second).Seconds()), + }, nil + }, + base: base{ + name: name, + kind: "uptime", + }, + }, nil +} diff --git a/pkg/serializers/semicolondelimited.go b/pkg/serializers/semicolondelimited.go new file mode 100644 index 0000000..c49eef2 --- /dev/null +++ b/pkg/serializers/semicolondelimited.go @@ -0,0 +1,50 @@ +package serializers + +import ( + "fmt" + "sort" + "strings" + + "github.com/kong/kubernetes-telemetry/pkg/provider" + "github.com/kong/kubernetes-telemetry/pkg/types" +) + +type semicolonDelimited struct { + signal string +} + +// NewSemicolonDelimited creates a new serializer that will serialize telemetry +// reports into a semicolon delimited format. +func NewSemicolonDelimited(signal string) semicolonDelimited { + return semicolonDelimited{ + signal: signal, + } +} + +func (s semicolonDelimited) Serialize(report types.Report) ([]byte, error) { + out := make([]string, 0, len(report)) + for _, v := range report { + out = append(out, serializeReport(v)) + } + + // Should this prefix go to TLSForwarder instead? + prefix := "<14>signal=" + s.signal + ";" + + sort.Strings(out) + return []byte(prefix + strings.Join(out, "") + "\n"), nil +} + +func serializeReport(report provider.Report) string { + var out []string + for k, v := range report { + switch vv := v.(type) { + case provider.Report: + out = append(out, serializeReport(vv)) + default: + out = append(out, fmt.Sprintf("%v=%v;", k, v)) + } + } + + sort.Strings(out) + return strings.Join(out, "") +} diff --git a/pkg/serializers/semicolondelimited_test.go b/pkg/serializers/semicolondelimited_test.go new file mode 100644 index 0000000..5086e9d --- /dev/null +++ b/pkg/serializers/semicolondelimited_test.go @@ -0,0 +1,33 @@ +package serializers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-telemetry/pkg/provider" + "github.com/kong/kubernetes-telemetry/pkg/types" +) + +func TestSemicolonDelimited(t *testing.T) { + t.Run("basic", func(t *testing.T) { + s := NewSemicolonDelimited("kic-ping") + + out, err := s.Serialize(types.Report{ + "cluster-state": provider.Report{ + "k8s_pods_count": 1, + "k8s_services_count": 2, + }, + "identify-platform": provider.Report{ + "k8s_arch": "linux/arm64", + "k8sv": "v1.2.3-gke-a1fdc32f", + "k8sv_semver": "v1.2.3", + "k8s_provider": provider.ClusterProviderGKE, + }, + }) + + require.NoError(t, err) + assert.EqualValues(t, "<14>signal=kic-ping;k8s_arch=linux/arm64;k8s_provider=GKE;k8sv=v1.2.3-gke-a1fdc32f;k8sv_semver=v1.2.3;k8s_pods_count=1;k8s_services_count=2;\n", string(out)) + }) +} diff --git a/pkg/telemetry/consumer.go b/pkg/telemetry/consumer.go new file mode 100644 index 0000000..bc31d5b --- /dev/null +++ b/pkg/telemetry/consumer.go @@ -0,0 +1,67 @@ +package telemetry + +import ( + "sync" + + "github.com/go-logr/logr" + + "github.com/kong/kubernetes-telemetry/pkg/types" +) + +type consumer struct { + logger logr.Logger + once sync.Once + ch chan types.Report + done chan struct{} +} + +// Forwarder is used to forward telemetry reports to configured destination(s). +type Forwarder interface { + Name() string + Forward([]byte) error +} + +// NewConsumer creates a new consumer which will use the provided serializer to +// serialize the data and then forward it using the provided forwarder. +func NewConsumer(s Serializer, f Forwarder) *consumer { + var ( + ch = make(chan types.Report) + done = make(chan struct{}) + logger = defaultLogger() // TODO: allow configuration + ) + + go func() { + for { + select { + case <-done: + return + case r := <-ch: + b, err := s.Serialize(r) + if err != nil { + logger.Error(err, "failed to serialize report") + continue + } + + if err := f.Forward(b); err != nil { + logger.Error(err, "failed to forward report using forwarder: %s", f.Name()) + } + } + } + }() + + return &consumer{ + logger: logger, + ch: ch, + done: done, + } +} + +func (c *consumer) Intake() chan<- types.Report { + return c.ch +} + +func (c *consumer) Close() { + c.once.Do(func() { + close(c.done) + }) +} diff --git a/pkg/telemetry/manager.go b/pkg/telemetry/manager.go index 62ab656..62b654f 100644 --- a/pkg/telemetry/manager.go +++ b/pkg/telemetry/manager.go @@ -12,6 +12,7 @@ import ( "github.com/puzpuzpuz/xsync" "github.com/kong/kubernetes-telemetry/pkg/provider" + "github.com/kong/kubernetes-telemetry/pkg/types" ) type managerErr string @@ -29,10 +30,6 @@ const ( ErrCantAddConsumersAfterStart = managerErr("can't add consumers after start") ) -// Report represents a report that is returned by executing managers workflows. -// This is also the type that is being sent out to consumers. -type Report map[string]provider.Report - const ( // DefaultWorkflowTickPeriod is the default tick period with which the manager // will trigger configured workflows execution. @@ -49,9 +46,9 @@ type manager struct { // consumers is a slice of channels that will consume reports produced by // execution of workflows. - consumers []chan<- Report + consumers []chan<- types.Report - ch chan Report + ch chan types.Report once sync.Once logger logr.Logger done chan struct{} @@ -76,12 +73,13 @@ type Manager interface { Stop() // AddConsumer adds a consumer of telemetry data provided by configured // workflows' providers. - AddConsumer(ch chan<- Report) error + // AddConsumer(ch chan<- Report) error + AddConsumer(c Consumer) error // AddWorkflow adds a workflow with providers which will provide telemetry data. AddWorkflow(Workflow) // Execute executes all workflows and returns an aggregated report from those // workflows. - Execute(context.Context) (Report, error) + Execute(context.Context) (types.Report, error) } // NewManager creates a new manager configured via the provided options. @@ -89,8 +87,8 @@ func NewManager(opts ...OptManager) (Manager, error) { m := &manager{ workflows: xsync.NewMapOf[Workflow](), period: DefaultWorkflowTickPeriod, - consumers: []chan<- Report{}, - ch: make(chan Report), + consumers: []chan<- types.Report{}, + ch: make(chan types.Report), logger: defaultLogger(), done: make(chan struct{}), } @@ -130,12 +128,20 @@ func (m *manager) Stop() { }) } +// Consumer is an entity that can consume telemetry reports on a channel returned +// by Intake(). +type Consumer interface { + Intake() chan<- types.Report + Close() +} + // AddConsumer adds a consumer. -func (m *manager) AddConsumer(ch chan<- Report) error { +func (m *manager) AddConsumer(c Consumer) error { + // func (m *manager) AddConsumer(ch chan<- Report) error { if atomic.LoadInt32(&m.started) > 0 { return ErrCantAddConsumersAfterStart } - m.consumers = append(m.consumers, ch) + m.consumers = append(m.consumers, c.Intake()) return nil } @@ -174,10 +180,10 @@ func (m *manager) workflowsLoop() { // Execute executes all configures workflows and returns an aggregated report // from all the underying providers. -func (m *manager) Execute(ctx context.Context) (Report, error) { +func (m *manager) Execute(ctx context.Context) (types.Report, error) { var ( err error - report = Report{} + report = types.Report{} ) m.workflows.Range(func(name string, v Workflow) bool { diff --git a/pkg/telemetry/manager_test.go b/pkg/telemetry/manager_test.go index b3bea76..93e1090 100644 --- a/pkg/telemetry/manager_test.go +++ b/pkg/telemetry/manager_test.go @@ -14,7 +14,10 @@ import ( clientgo_fake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + "github.com/kong/kubernetes-telemetry/pkg/forwarders" "github.com/kong/kubernetes-telemetry/pkg/provider" + "github.com/kong/kubernetes-telemetry/pkg/serializers" + "github.com/kong/kubernetes-telemetry/pkg/types" ) func TestManagerStartStopDoesntFail(t *testing.T) { @@ -51,20 +54,21 @@ func TestManagerBasicLogicWorks(t *testing.T) { m.AddWorkflow(w) } - ch := make(chan Report) - require.NoError(t, m.AddConsumer(ch)) + consumer := NewConsumer(serializers.NewSemicolonDelimited("ping"), forwarders.NewDiscardForwarder()) + + require.NoError(t, m.AddConsumer(consumer)) require.NoError(t, m.Start()) require.ErrorIs(t, m.Start(), ErrManagerAlreadyStarted, "subsequent starts of the manager should return an error", ) - require.ErrorIs(t, m.AddConsumer(make(chan<- Report)), + require.ErrorIs(t, m.AddConsumer(consumer), ErrCantAddConsumersAfterStart, "cannot add consumers after start", ) - report := <-ch + report := <-consumer.ch m.Stop() - require.EqualValues(t, Report{ + require.EqualValues(t, types.Report{ "basic1": provider.Report{ "constant1": "value1", "constant2": "value2", @@ -118,20 +122,22 @@ func TestManagerWithMultilpleWorkflows(t *testing.T) { m.AddWorkflow(w) } - ch := make(chan Report) - require.NoError(t, m.AddConsumer(ch)) + consumer := NewConsumer(serializers.NewSemicolonDelimited("ping"), forwarders.NewDiscardForwarder()) + require.NoError(t, m.AddConsumer(consumer)) + require.NoError(t, m.Start()) require.ErrorIs(t, m.Start(), ErrManagerAlreadyStarted, "subsequent starts of the manager should return an error", ) - require.ErrorIs(t, m.AddConsumer(make(chan<- Report)), + require.ErrorIs(t, m.AddConsumer(consumer), ErrCantAddConsumersAfterStart, "cannot add consumers after start", ) + ch := consumer.ch report := <-ch m.Stop() - require.EqualValues(t, Report{ + require.EqualValues(t, types.Report{ "basic1": provider.Report{ "constant1": "value1", "constant2": "value2", @@ -191,23 +197,23 @@ func TestManagerWithCatalogWorkflows(t *testing.T) { m.AddWorkflow(clusterStateWorkflow) m.AddWorkflow(identifyPlatformWorkflow) - ch := make(chan Report) - require.NoError(t, m.AddConsumer(ch)) + consumer := NewConsumer(serializers.NewSemicolonDelimited("ping"), forwarders.NewDiscardForwarder()) + require.NoError(t, m.AddConsumer(consumer)) require.NoError(t, m.Start()) - report := <-ch + report := <-consumer.ch m.Stop() - require.EqualValues(t, Report{ + require.EqualValues(t, types.Report{ "cluster-state": provider.Report{ - "k8s-pod-count": 1, - "k8s-service-count": 2, + "k8s_pods_count": 1, + "k8s_services_count": 2, }, "identify-platform": provider.Report{ - "k8s-cluster-arch": fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), - "k8s-cluster-version": "v0.0.0-master+$Format:%H$", - "k8s-cluster-version-semver": "v0.0.0", - "k8s-provider": provider.ClusterProviderUnknown, + "k8s_arch": fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), + "k8sv": "v0.0.0-master+$Format:%H$", + "k8sv_semver": "v0.0.0", + "k8s_provider": provider.ClusterProviderUnknown, }, }, report) }) diff --git a/pkg/telemetry/serializer.go b/pkg/telemetry/serializer.go new file mode 100644 index 0000000..424a69e --- /dev/null +++ b/pkg/telemetry/serializer.go @@ -0,0 +1,8 @@ +package telemetry + +import "github.com/kong/kubernetes-telemetry/pkg/types" + +// Serializer serializes telemetry reports into byte slices. +type Serializer interface { + Serialize(report types.Report) ([]byte, error) +} diff --git a/pkg/telemetry/workflowsbase.go b/pkg/telemetry/workflowsbase.go new file mode 100644 index 0000000..49f9254 --- /dev/null +++ b/pkg/telemetry/workflowsbase.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "github.com/kong/kubernetes-telemetry/pkg/provider" +) + +const ( + // StateWorkflowName is the name assigned to state workflow. + StateWorkflowName = "state" +) + +// NewStateWorkflow creates a new 'state' workflow, based +// on a predefined set of providers that will deliver telemetry date about the +// state of the system. +func NewStateWorkflow() (Workflow, error) { + uptimeProvider, err := provider.NewUptimeProvider("uptime") + if err != nil { + return nil, err + } + + w := NewWorkflow(StateWorkflowName) + w.AddProvider(uptimeProvider) + + return w, nil +} diff --git a/pkg/telemetry/workflowsbase_test.go b/pkg/telemetry/workflowsbase_test.go new file mode 100644 index 0000000..df239f4 --- /dev/null +++ b/pkg/telemetry/workflowsbase_test.go @@ -0,0 +1,21 @@ +package telemetry + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-telemetry/pkg/provider" +) + +func TestWorkflowState(t *testing.T) { + w, err := NewStateWorkflow() + require.NoError(t, err) + + r, err := w.Execute(context.Background()) + require.NoError(t, err) + require.EqualValues(t, provider.Report{ + "uptime": 0, + }, r) +} diff --git a/pkg/telemetry/k8s.go b/pkg/telemetry/workflowsk8s.go similarity index 76% rename from pkg/telemetry/k8s.go rename to pkg/telemetry/workflowsk8s.go index fb84638..d332a80 100644 --- a/pkg/telemetry/k8s.go +++ b/pkg/telemetry/workflowsk8s.go @@ -19,27 +19,27 @@ const ( // Exemplar report produced: // // { -// "k8s-cluster-arch": "linux/amd64", -// "k8s-cluster-version": "v1.24.1-gke.1400", -// "k8s-cluster-version-semver": "v1.24.1", -// "k8s-provider": "GKE" +// "k8sv": "linux/amd64", +// "k8sv": "v1.24.1-gke.1400", +// "k8sv_semver": "v1.24.1", +// "k8s_provider": "GKE" // } func NewIdentifyPlatformWorkflow(kc kubernetes.Interface) (Workflow, error) { if kc == nil { return nil, ErrNilKubernetesInterfaceProvided } - pClusterArch, err := provider.NewK8sClusterArchProvider(provider.ClusterArchKey, kc) + pClusterArch, err := provider.NewK8sClusterArchProvider(string(provider.ClusterArchKey), kc) if err != nil { return nil, err } - pClusterVersion, err := provider.NewK8sClusterVersionProvider(provider.ClusterVersionKey, kc) + pClusterVersion, err := provider.NewK8sClusterVersionProvider(string(provider.ClusterVersionKey), kc) if err != nil { return nil, err } - pClusterProvider, err := provider.NewK8sClusterProviderProvider(provider.ClusterProviderKey, kc) + pClusterProvider, err := provider.NewK8sClusterProviderProvider(string(provider.ClusterProviderKey), kc) if err != nil { return nil, err } @@ -63,19 +63,19 @@ const ( // Exemplar report produced: // // { -// "k8s-pod-count": 21, -// "k8s-service-count": 3 +// "k8s_pods_count": 21, +// "k8s_services_count": 3 // } func NewClusterStateWorkflow(d dynamic.Interface) (Workflow, error) { if d == nil { return nil, ErrNilDynClientProvided } - providerPodCount, err := provider.NewK8sPodCountProvider(provider.PodCountKey, d) + providerPodCount, err := provider.NewK8sPodCountProvider(string(provider.PodCountKey), d) if err != nil { return nil, err } - providerServiceCount, err := provider.NewK8sServiceCountProvider(provider.ServiceCountKey, d) + providerServiceCount, err := provider.NewK8sServiceCountProvider(string(provider.ServiceCountKey), d) if err != nil { return nil, err } diff --git a/pkg/telemetry/k8s_test.go b/pkg/telemetry/workflowsk8s_test.go similarity index 100% rename from pkg/telemetry/k8s_test.go rename to pkg/telemetry/workflowsk8s_test.go diff --git a/pkg/types/types.go b/pkg/types/types.go new file mode 100644 index 0000000..8273c4d --- /dev/null +++ b/pkg/types/types.go @@ -0,0 +1,9 @@ +package types + +import "github.com/kong/kubernetes-telemetry/pkg/provider" + +// This package was needed to resolve circular dependency. + +// Report represents a report that is returned by executing managers workflows. +// This is also the type that is being sent out to consumers. +type Report map[string]provider.Report