From bd5dd05d4b199c610a5e168a32f5172354ea3a82 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 16:44:30 -0700 Subject: [PATCH 01/10] Added WithLogs and its handling --- collector/internal/lifecycle/manager.go | 2 +- collector/internal/telemetryapi/client.go | 8 +- .../receiver/telemetryapireceiver/README.md | 10 +- .../receiver/telemetryapireceiver/factory.go | 23 ++- .../sharedcomponent/sharedcomponent.go | 76 ++++++++ .../sharedcomponent/sharedcomponent_test.go | 72 ++++++++ .../receiver/telemetryapireceiver/receiver.go | 166 +++++++++++++++--- .../telemetryapireceiver/receiver_test.go | 164 ++++++++++++++++- .../receiver/telemetryapireceiver/types.go | 6 +- 9 files changed, 483 insertions(+), 44 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go create mode 100644 collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 68f6939190..c6d9eb9305 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex } telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) if err != nil { logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) } diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index db57573fc2..08a0c67f05 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -46,13 +46,7 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { - eventTypes := []EventType{ - Platform, - // Function, - // Extension, - } - +func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { bufferingConfig := BufferingCfg{ MaxItems: 1000, MaxBytes: 256 * 1024, diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index fe2c2745f1..e51b420f2d 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -1,10 +1,10 @@ # Telemetry API Receiver -| Status | | -| ------------------------ |-----------------| -| Stability | [alpha] | -| Supported pipeline types | traces | -| Distributions | [extension] | +| Status | | +| ------------------------ |--------------| +| Stability | [alpha] | +| Supported pipeline types | traces, logs | +| Distributions | [extension] | This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup. diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index c89dcbeac8..63c8b2ce23 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -18,6 +18,7 @@ import ( "context" "errors" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -39,7 +40,8 @@ func NewFactory(extensionID string) receiver.Factory { extensionID: extensionID, } }, - receiver.WithTraces(createTracesReceiver, stability)) + receiver.WithTraces(createTracesReceiver, stability), + receiver.WithLogs(createLogsReceiver, stability)) } func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { @@ -47,6 +49,23 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r if !ok { return nil, errConfigNotTelemetryAPI } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) + return r, nil +} - return newTelemetryAPIReceiver(cfg, next, params) +func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + r := receivers.GetOrAdd(cfg, func() component.Component { + return newTelemetryAPIReceiver(cfg, params) + }) + r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) + return r, nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go new file mode 100644 index 0000000000..b297d81586 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package sharedcomponent exposes util functionality for receivers and exporters +// that need to share state between different signal types instances such as net.Listener or os.File. +package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// SharedComponents a map that keeps reference of all created instances for a given configuration, +// and ensures that the shared state is started and stopped only once. +type SharedComponents struct { + comps map[any]*SharedComponent +} + +// NewSharedComponents returns a new empty SharedComponents. +func NewSharedComponents() *SharedComponents { + return &SharedComponents{ + comps: make(map[any]*SharedComponent), + } +} + +// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// and adds it to the map of references. +func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent { + if c, ok := scs.comps[key]; ok { + return c + } + newComp := &SharedComponent{ + Component: create(), + removeFunc: func() { + delete(scs.comps, key) + }, + } + scs.comps[key] = newComp + return newComp +} + +// SharedComponent ensures that the wrapped component is started and stopped only once. +// When stopped it is removed from the SharedComponents map. +type SharedComponent struct { + component.Component + + startOnce sync.Once + stopOnce sync.Once + removeFunc func() +} + +// Unwrap returns the original component. +func (r *SharedComponent) Unwrap() component.Component { + return r.Component +} + +// Start implements component.Component. +func (r *SharedComponent) Start(ctx context.Context, host component.Host) error { + var err error + r.startOnce.Do(func() { + err = r.Component.Start(ctx, host) + }) + return err +} + +// Shutdown implements component.Component. +func (r *SharedComponent) Shutdown(ctx context.Context) error { + var err error + r.stopOnce.Do(func() { + err = r.Component.Shutdown(ctx) + r.removeFunc() + }) + return err +} diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go new file mode 100644 index 0000000000..dad4886c17 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sharedcomponent + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +var id = component.MustNewID("test") + +func TestNewSharedComponents(t *testing.T) { + comps := NewSharedComponents() + assert.Len(t, comps.comps, 0) +} + +type mockComponent struct { + component.StartFunc + component.ShutdownFunc +} + +func TestSharedComponents_GetOrAdd(t *testing.T) { + nop := &mockComponent{} + createNop := func() component.Component { return nop } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createNop) + assert.Len(t, comps.comps, 1) + assert.Same(t, nop, got.Unwrap()) + assert.Same(t, got, comps.GetOrAdd(id, createNop)) + + // Shutdown nop will remove + assert.NoError(t, got.Shutdown(context.Background())) + assert.Len(t, comps.comps, 0) + assert.NotSame(t, got, comps.GetOrAdd(id, createNop)) +} + +func TestSharedComponent(t *testing.T) { + wantErr := errors.New("my error") + calledStart := 0 + calledStop := 0 + comp := &mockComponent{ + StartFunc: func(_ context.Context, _ component.Host) error { + calledStart++ + return wantErr + }, + ShutdownFunc: func(_ context.Context) error { + calledStop++ + return wantErr + }, + } + createComp := func() component.Component { return comp } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createComp) + assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // Second time is not called anymore. + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + assert.Equal(t, wantErr, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) + // Second time is not called anymore. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) +} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4df8b7764d..87e5623053 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,15 +24,17 @@ import ( "math/rand" "net/http" "os" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" @@ -40,12 +42,15 @@ import ( const defaultListenerPort = "4325" const initialQueueSize = 5 +const timeFormatLayout = "2006-01-02T15:04:05.000Z" +const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPIReceiver struct { httpServer *http.Server logger *zap.Logger queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later - nextConsumer consumer.Traces + nextTraces consumer.Traces + nextLogs consumer.Logs lastPlatformStartTime string lastPlatformEndTime string extensionID string @@ -64,7 +69,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -147,12 +152,26 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { - err := r.nextConsumer.ConsumeTraces(context.Background(), td) - if err == nil { - r.lastPlatformEndTime = "" - r.lastPlatformStartTime = "" - } else { - r.logger.Error("error receiving traces", zap.Error(err)) + if r.nextTraces != nil { + err := r.nextTraces.ConsumeTraces(context.Background(), td) + if err == nil { + r.lastPlatformEndTime = "" + r.lastPlatformStartTime = "" + } else { + r.logger.Error("error receiving traces", zap.Error(err)) + } + } + } + } + + // Logs + if r.nextLogs != nil { + if logs, err := r.createLogs(slice); err == nil { + if logs.LogRecordCount() > 0 { + err := r.nextLogs.ConsumeLogs(context.Background(), logs) + if err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } } } } @@ -161,26 +180,133 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(scopeName) + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(timeFormatLayout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + } + if level, ok := record["level"].(string); ok { + level = strings.ToUpper(level) + logRecord.SetSeverityText(level) + switch level { + case "TRACE": + logRecord.SetSeverityNumber(1) + case "TRACE2": + logRecord.SetSeverityNumber(2) + case "TRACE3": + logRecord.SetSeverityNumber(3) + case "TRACE4": + logRecord.SetSeverityNumber(4) + case "DEBUG": + logRecord.SetSeverityNumber(5) + case "DEBUG2": + logRecord.SetSeverityNumber(6) + case "DEBUG3": + logRecord.SetSeverityNumber(7) + case "DEBUG4": + logRecord.SetSeverityNumber(8) + case "INFO": + logRecord.SetSeverityNumber(9) + case "INFO2": + logRecord.SetSeverityNumber(10) + case "INFO3": + logRecord.SetSeverityNumber(11) + case "INFO4": + logRecord.SetSeverityNumber(12) + case "WARN": + logRecord.SetSeverityNumber(13) + case "WARN2": + logRecord.SetSeverityNumber(14) + case "WARN3": + logRecord.SetSeverityNumber(15) + case "WARN4": + logRecord.SetSeverityNumber(16) + case "ERROR": + logRecord.SetSeverityNumber(17) + case "ERROR2": + logRecord.SetSeverityNumber(18) + case "ERROR3": + logRecord.SetSeverityNumber(19) + case "ERROR4": + logRecord.SetSeverityNumber(20) + case "FATAL": + logRecord.SetSeverityNumber(21) + case "FATAL2": + logRecord.SetSeverityNumber(22) + case "FATAL3": + logRecord.SetSeverityNumber(23) + case "FATAL4": + logRecord.SetSeverityNumber(24) + default: + } + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } + } + } + } + return log, nil +} + +func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { + r.nextTraces = next +} + +func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) { + r.nextLogs = next +} + func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) { traceData := ptrace.NewTraces() rs := traceData.ResourceSpans().AppendEmpty() r.resource.CopyTo(rs.Resource()) ss := rs.ScopeSpans().AppendEmpty() - ss.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + ss.Scope().SetName(scopeName) span := ss.Spans().AppendEmpty() span.SetTraceID(newTraceID()) span.SetSpanID(newSpanID()) span.SetName("platform.initRuntimeDone") span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) - layout := "2006-01-02T15:04:05.000Z" - startTime, err := time.Parse(layout, start) + startTime, err := time.Parse(timeFormatLayout, start) if err != nil { return ptrace.Traces{}, err } span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - endTime, err := time.Parse(layout, end) + endTime, err := time.Parse(timeFormatLayout, end) if err != nil { return ptrace.Traces{}, err } @@ -190,9 +316,8 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace func newTelemetryAPIReceiver( cfg *Config, - next consumer.Traces, set receiver.CreateSettings, -) (*telemetryAPIReceiver, error) { +) *telemetryAPIReceiver { envResourceMap := map[string]string{ "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": semconv.AttributeFaaSMaxMemory, "AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion, @@ -213,12 +338,11 @@ func newTelemetryAPIReceiver( } } return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - nextConsumer: next, - extensionID: cfg.extensionID, - resource: r, - }, nil + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + resource: r, + } } func listenOnAddress() string { diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 7f7b46572f..c47ed95459 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -19,9 +19,12 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -99,12 +102,11 @@ func TestHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { consumer := mockConsumer{} - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - &consumer, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) + r.registerTracesConsumer(consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -146,9 +148,8 @@ func TestCreatePlatformInitSpan(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - r, err := newTelemetryAPIReceiver( + r := newTelemetryAPIReceiver( &Config{}, - nil, receivertest.NewNopCreateSettings(), ) require.NoError(t, err) @@ -161,3 +162,156 @@ func TestCreatePlatformInitSpan(t *testing.T) { }) } } + +func TestCreateLogs(t *testing.T) { + testCases := []struct { + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedContainsRequestId bool + expectedRequestId string + expectedSeverityNumber plog.SeverityNumber + expectError bool + }{ + { + desc: "no slice", + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "Invalid Timestamp", + slice: []event{ + { + Time: "invalid", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectError: true, + }, + { + desc: "function text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "function json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "extension json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r := newTelemetryAPIReceiver( + &Config{}, + receivertest.NewNopCreateSettings(), + ) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, tc.expectedContainsRequestId, ok) + if ok { + require.Equal(t, tc.expectedRequestId, requestId.Str()) + } + require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) + require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/types.go b/collector/receiver/telemetryapireceiver/types.go index 40bbc6ff94..fdcb4e3f28 100644 --- a/collector/receiver/telemetryapireceiver/types.go +++ b/collector/receiver/telemetryapireceiver/types.go @@ -15,7 +15,7 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" type event struct { - Time string `json:"time"` - Type string `json:"type"` - Record map[string]any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` } From 63558a84ad65c52701a5c7af8b45c5d58da542df Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 17:47:47 -0700 Subject: [PATCH 02/10] nits --- collector/receiver/telemetryapireceiver/receiver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 87e5623053..00b9cb050b 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -201,8 +201,8 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { - if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + if t, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) } else { r.logger.Error("error parsing time", zap.Error(err)) return plog.Logs{}, err From 1ae4fddcc9f5fe1a15af9f88f9fd95ae86d9de2b Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 24 May 2024 19:26:15 -0700 Subject: [PATCH 03/10] Added extensions --- collector/receiver/telemetryapireceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 00b9cb050b..4b8d3a97c2 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -69,7 +69,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err From 0f74cbe0934b4bf1b42a6c59a85d9b97728ddf11 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 27 May 2024 17:54:18 -0700 Subject: [PATCH 04/10] Fixed unit tests --- collector/receiver/telemetryapireceiver/receiver.go | 2 +- .../receiver/telemetryapireceiver/receiver_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4b8d3a97c2..2d791ddba3 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -210,7 +210,6 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } if level, ok := record["level"].(string); ok { level = strings.ToUpper(level) - logRecord.SetSeverityText(level) switch level { case "TRACE": logRecord.SetSeverityNumber(1) @@ -262,6 +261,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.SetSeverityNumber(24) default: } + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } if requestId, ok := record["requestId"].(string); ok { logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index c47ed95459..6fb953565c 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" ) func TestListenOnAddress(t *testing.T) { @@ -64,6 +65,10 @@ func (c *mockConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro return nil } +func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { + return nil +} + func (c *mockConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } @@ -106,7 +111,7 @@ func TestHandler(t *testing.T) { &Config{}, receivertest.NewNopCreateSettings(), ) - r.registerTracesConsumer(consumer) + r.registerTracesConsumer(&consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -152,7 +157,6 @@ func TestCreatePlatformInitSpan(t *testing.T) { &Config{}, receivertest.NewNopCreateSettings(), ) - require.NoError(t, err) td, err := r.createPlatformInitSpan(tc.start, tc.end) if tc.expectError { require.Error(t, err) @@ -231,7 +235,7 @@ func TestCreateLogs(t *testing.T) { expectedBody: "Hello world, I am a function!", expectedContainsRequestId: true, expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", - expectedSeverityText: "INFO", + expectedSeverityText: "Info", expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, @@ -273,7 +277,7 @@ func TestCreateLogs(t *testing.T) { expectedBody: "Hello world, I am an extension!", expectedContainsRequestId: true, expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "INFO", + expectedSeverityText: "Info", expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, From 0c5429261c72340f85f5744c31ee1986e0a2ac09 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 27 May 2024 17:56:05 -0700 Subject: [PATCH 05/10] Added unit test cases --- .../telemetryapireceiver/receiver_test.go | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 6fb953565c..d43d17dfa9 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -281,6 +281,30 @@ func TestCreateLogs(t *testing.T) { expectedSeverityNumber: plog.SeverityNumberInfo, expectError: false, }, + { + desc: "extension json anything", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "anything", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Unspecified", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { From dd2e317eb5ad6c391b6a06bd20889266bab5eff1 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 20 Jun 2024 14:18:44 -0700 Subject: [PATCH 06/10] Added config (#26) --- .../receiver/telemetryapireceiver/README.md | 16 ++- .../receiver/telemetryapireceiver/config.go | 11 ++ .../telemetryapireceiver/config_test.go | 128 ++++++++++++++++++ .../receiver/telemetryapireceiver/factory.go | 10 +- .../telemetryapireceiver/factory_test.go | 2 +- .../receiver/telemetryapireceiver/receiver.go | 29 +++- .../telemetryapireceiver/receiver_test.go | 4 +- .../telemetryapireceiver/testdata/config.yaml | 34 +++++ 8 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/testdata/config.yaml diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index e51b420f2d..9a94a96244 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -15,11 +15,25 @@ Supported events: ## Configuration -There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration: +| Field | Default | Description | +|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| `port` | 4235 | HTTP server port to receive Telemetry API data. | +| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | + ```yaml receivers: telemetryapi: + telemetryapi/1: + port: 4326 + telemetryapi/2: + port: 4326 + types: + - platform + - function + telemetryapi/3: + port: 4326 + types: ["platform", "function"] ``` [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 86b5250196..b51ef1ed57 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -14,12 +14,23 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import ( + "fmt" +) + // Config defines the configuration for the various elements of the receiver agent. type Config struct { extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { + for _, t := range cfg.Types { + if t != platform && t != function && t != extension { + return fmt.Errorf("unknown extension type: %s", t) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 7f3969dd7c..fbee9ed924 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -15,12 +15,133 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" ) +func TestLoadConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + id component.ID + expected component.Config + }{ + { + id: component.NewID(component.MustNewType("telemetryapi")), + expected: NewFactory("extensionID").CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.id.String(), func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory("extensionID") + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + require.Equal(t, tt.expected, cfg) + }) + } +} + func TestValidate(t *testing.T) { + t.Parallel() + testCases := []struct { desc string cfg *Config @@ -31,6 +152,13 @@ func TestValidate(t *testing.T) { cfg: &Config{}, expectedErr: nil, }, + { + desc: "invalid config", + cfg: &Config{ + Types: []string{"invalid"}, + }, + expectedErr: fmt.Errorf("unknown extension type: invalid"), + }, } for _, tc := range testCases { diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 63c8b2ce23..ced897d9f9 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -25,8 +25,12 @@ import ( ) const ( - typeStr = "telemetryapi" - stability = component.StabilityLevelDevelopment + typeStr = "telemetryapi" + stability = component.StabilityLevelDevelopment + defaultPort = 4325 + platform = "platform" + function = "function" + extension = "extension" ) var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") @@ -38,6 +42,8 @@ func NewFactory(extensionID string) receiver.Factory { func() component.Config { return &Config{ extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, } }, receiver.WithTraces(createTracesReceiver, stability), diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index 97961ea437..e0cfcd25eb 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test"} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 2d791ddba3..237a5b2922 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "os" + "strconv" "strings" "time" @@ -40,7 +41,6 @@ import ( "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const defaultListenerPort = "4325" const initialQueueSize = 5 const timeFormatLayout = "2006-01-02T15:04:05.000Z" const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" @@ -54,11 +54,13 @@ type telemetryAPIReceiver struct { lastPlatformStartTime string lastPlatformEndTime string extensionID string + port int + types []telemetryapi.EventType resource pcommon.Resource } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress() + address := listenOnAddress(r.port) r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() @@ -69,7 +71,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -337,21 +339,36 @@ func newTelemetryAPIReceiver( r.Attributes().PutStr(resourceAttribute, val) } } + + subscribedTypes := []telemetryapi.EventType{} + for _, val := range cfg.Types { + switch val { + case "platform": + subscribedTypes = append(subscribedTypes, telemetryapi.Platform) + case "function": + subscribedTypes = append(subscribedTypes, telemetryapi.Function) + case "extension": + subscribedTypes = append(subscribedTypes, telemetryapi.Extension) + } + } + return &telemetryAPIReceiver{ logger: set.Logger, queue: queue.New(initialQueueSize), extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, resource: r, } } -func listenOnAddress() string { +func listenOnAddress(port int) string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") var addr string if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort + addr = ":" + strconv.Itoa(port) } else { - addr = "sandbox.localdomain:" + defaultListenerPort + addr = "sandbox.localdomain:" + strconv.Itoa(port) } return addr diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index d43d17dfa9..fd1bb5e2a3 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -38,7 +38,7 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, "sandbox.localdomain:4325", addr) }, }, @@ -46,7 +46,7 @@ func TestListenOnAddress(t *testing.T) { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, ":4325", addr) }, }, diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml new file mode 100644 index 0000000000..b0935f779c --- /dev/null +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -0,0 +1,34 @@ +telemetryapi: +telemetryapi/1: + port: 12345 +telemetryapi/2: + port: "12345" +telemetryapi/3: + port: 12345 + types: ["platform"] +telemetryapi/4: + port: 12345 + types: ["function"] +telemetryapi/5: + port: 12345 + types: ["extension"] +telemetryapi/6: + port: 12345 + types: ["platform", "function"] +telemetryapi/7: + port: 12345 + types: ["platform", "extension"] +telemetryapi/8: + port: 12345 + types: ["function", "extension"] +telemetryapi/9: + port: 12345 + types: [] +telemetryapi/10: + port: 12345 + types: + - function + - extension +telemetryapi/11: + port: 12345 + types: [function, extension] From 4ff5876659f5234562bb8ec10d4d2e43cb891001 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 20 Jun 2024 16:26:51 -0700 Subject: [PATCH 07/10] Added severityTextToNumber function --- .../telemetryapireceiver/config_test.go | 2 - .../receiver/telemetryapireceiver/receiver.go | 87 ++++++------- .../telemetryapireceiver/receiver_test.go | 114 ++++++++++++++++++ 3 files changed, 149 insertions(+), 54 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index fbee9ed924..f6500bebde 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -140,8 +140,6 @@ func TestLoadConfig(t *testing.T) { } func TestValidate(t *testing.T) { - t.Parallel() - testCases := []struct { desc string cfg *Config diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 237a5b2922..b1c9eca5b9 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -211,58 +211,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } if level, ok := record["level"].(string); ok { - level = strings.ToUpper(level) - switch level { - case "TRACE": - logRecord.SetSeverityNumber(1) - case "TRACE2": - logRecord.SetSeverityNumber(2) - case "TRACE3": - logRecord.SetSeverityNumber(3) - case "TRACE4": - logRecord.SetSeverityNumber(4) - case "DEBUG": - logRecord.SetSeverityNumber(5) - case "DEBUG2": - logRecord.SetSeverityNumber(6) - case "DEBUG3": - logRecord.SetSeverityNumber(7) - case "DEBUG4": - logRecord.SetSeverityNumber(8) - case "INFO": - logRecord.SetSeverityNumber(9) - case "INFO2": - logRecord.SetSeverityNumber(10) - case "INFO3": - logRecord.SetSeverityNumber(11) - case "INFO4": - logRecord.SetSeverityNumber(12) - case "WARN": - logRecord.SetSeverityNumber(13) - case "WARN2": - logRecord.SetSeverityNumber(14) - case "WARN3": - logRecord.SetSeverityNumber(15) - case "WARN4": - logRecord.SetSeverityNumber(16) - case "ERROR": - logRecord.SetSeverityNumber(17) - case "ERROR2": - logRecord.SetSeverityNumber(18) - case "ERROR3": - logRecord.SetSeverityNumber(19) - case "ERROR4": - logRecord.SetSeverityNumber(20) - case "FATAL": - logRecord.SetSeverityNumber(21) - case "FATAL2": - logRecord.SetSeverityNumber(22) - case "FATAL3": - logRecord.SetSeverityNumber(23) - case "FATAL4": - logRecord.SetSeverityNumber(24) - default: - } + logRecord.SetSeverityNumber(severityTextToNumber(level)) logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } if requestId, ok := record["requestId"].(string); ok { @@ -282,6 +231,40 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { return log, nil } +func severityTextToNumber(severityText string) plog.SeverityNumber { + mapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + } + if ans, ok := mapping[strings.ToUpper(severityText)]; ok { + return ans + } else { + return plog.SeverityNumberUnspecified + } +} + func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { r.nextTraces = next } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index fd1bb5e2a3..e6ffe4571c 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -74,6 +74,8 @@ func (c *mockConsumer) Capabilities() consumer.Capabilities { } func TestHandler(t *testing.T) { + t.Parallel() + testCases := []struct { desc string body string @@ -168,6 +170,8 @@ func TestCreatePlatformInitSpan(t *testing.T) { } func TestCreateLogs(t *testing.T) { + t.Parallel() + testCases := []struct { desc string slice []event @@ -343,3 +347,113 @@ func TestCreateLogs(t *testing.T) { }) } } + +func TestSeverityTextToNumber(t *testing.T) { + t.Parallel() + + testCases := []struct { + level string + number plog.SeverityNumber + }{ + { + level: "TRACE", + number: plog.SeverityNumberTrace, + }, + { + level: "TRACE2", + number: plog.SeverityNumberTrace2, + }, + { + level: "TRACE3", + number: plog.SeverityNumberTrace3, + }, + { + level: "TRACE4", + number: plog.SeverityNumberTrace4, + }, + { + level: "DEBUG2", + number: plog.SeverityNumberDebug2, + }, + { + level: "DEBUG3", + number: plog.SeverityNumberDebug3, + }, + { + level: "DEBUG4", + number: plog.SeverityNumberDebug4, + }, + { + level: "INFO", + number: plog.SeverityNumberInfo, + }, + { + level: "INFO2", + number: plog.SeverityNumberInfo2, + }, + { + level: "INFO3", + number: plog.SeverityNumberInfo3, + }, + { + level: "INFO4", + number: plog.SeverityNumberInfo4, + }, + { + level: "WARN", + number: plog.SeverityNumberWarn, + }, + { + level: "WARN2", + number: plog.SeverityNumberWarn2, + }, + { + level: "WARN3", + number: plog.SeverityNumberWarn3, + }, + { + level: "WARN4", + number: plog.SeverityNumberWarn4, + }, + { + level: "ERROR", + number: plog.SeverityNumberError, + }, + { + level: "ERROR2", + number: plog.SeverityNumberError2, + }, + { + level: "ERROR3", + number: plog.SeverityNumberError3, + }, + { + level: "ERROR4", + number: plog.SeverityNumberError4, + }, + { + level: "FATAL", + number: plog.SeverityNumberFatal, + }, + { + level: "FATAL2", + number: plog.SeverityNumberFatal2, + }, + { + level: "FATAL3", + number: plog.SeverityNumberFatal3, + }, + { + level: "FATAL4", + number: plog.SeverityNumberFatal4, + }, + { + level: "UNKNOWN", + number: plog.SeverityNumberUnspecified, + }, + } + for _, tc := range testCases { + require.Equal(t, tc.number, severityTextToNumber(tc.level)) + + } +} From 730cfc92f33b3f3f55eec8f4dcbf94028aeabd16 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 15:29:09 -0700 Subject: [PATCH 08/10] Corrected README.md --- collector/receiver/telemetryapireceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index 9a94a96244..ebd7b9b4c0 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -17,7 +17,7 @@ Supported events: | Field | Default | Description | |---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| -| `port` | 4235 | HTTP server port to receive Telemetry API data. | +| `port` | 4325 | HTTP server port to receive Telemetry API data. | | `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | From 999a7cea6db1af6a25919f7d6fef5a81b15173af Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 16:08:48 -0700 Subject: [PATCH 09/10] Handled empty types array --- collector/receiver/telemetryapireceiver/receiver.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index b1c9eca5b9..c09f77798d 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -71,10 +71,12 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err + if len(r.types) > 0 { + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } } return nil } From 1190cb5f015506f9a9bdd75a4a5a9c76339ad2de Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 27 Jun 2024 16:13:40 -0700 Subject: [PATCH 10/10] Added CRITICAL & ALL --- .../receiver/telemetryapireceiver/receiver.go | 50 ++++++++++--------- .../telemetryapireceiver/receiver_test.go | 8 +++ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index c09f77798d..9bc30b3393 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -235,30 +235,32 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { func severityTextToNumber(severityText string) plog.SeverityNumber { mapping := map[string]plog.SeverityNumber{ - "TRACE": plog.SeverityNumberTrace, - "TRACE2": plog.SeverityNumberTrace2, - "TRACE3": plog.SeverityNumberTrace3, - "TRACE4": plog.SeverityNumberTrace4, - "DEBUG": plog.SeverityNumberDebug, - "DEBUG2": plog.SeverityNumberDebug2, - "DEBUG3": plog.SeverityNumberDebug3, - "DEBUG4": plog.SeverityNumberDebug4, - "INFO": plog.SeverityNumberInfo, - "INFO2": plog.SeverityNumberInfo2, - "INFO3": plog.SeverityNumberInfo3, - "INFO4": plog.SeverityNumberInfo4, - "WARN": plog.SeverityNumberWarn, - "WARN2": plog.SeverityNumberWarn2, - "WARN3": plog.SeverityNumberWarn3, - "WARN4": plog.SeverityNumberWarn4, - "ERROR": plog.SeverityNumberError, - "ERROR2": plog.SeverityNumberError2, - "ERROR3": plog.SeverityNumberError3, - "ERROR4": plog.SeverityNumberError4, - "FATAL": plog.SeverityNumberFatal, - "FATAL2": plog.SeverityNumberFatal2, - "FATAL3": plog.SeverityNumberFatal3, - "FATAL4": plog.SeverityNumberFatal4, + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, } if ans, ok := mapping[strings.ToUpper(severityText)]; ok { return ans diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index e6ffe4571c..6c4df40b68 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -447,6 +447,14 @@ func TestSeverityTextToNumber(t *testing.T) { level: "FATAL4", number: plog.SeverityNumberFatal4, }, + { + level: "CRITICAL", + number: plog.SeverityNumberFatal, + }, + { + level: "ALL", + number: plog.SeverityNumberTrace, + }, { level: "UNKNOWN", number: plog.SeverityNumberUnspecified,