diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index e51b420f2d..9a94a96244 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -15,11 +15,25 @@ Supported events: ## Configuration -There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration: +| Field | Default | Description | +|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| `port` | 4235 | HTTP server port to receive Telemetry API data. | +| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | + ```yaml receivers: telemetryapi: + telemetryapi/1: + port: 4326 + telemetryapi/2: + port: 4326 + types: + - platform + - function + telemetryapi/3: + port: 4326 + types: ["platform", "function"] ``` [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 86b5250196..b51ef1ed57 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -14,12 +14,23 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import ( + "fmt" +) + // Config defines the configuration for the various elements of the receiver agent. type Config struct { extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { + for _, t := range cfg.Types { + if t != platform && t != function && t != extension { + return fmt.Errorf("unknown extension type: %s", t) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 7f3969dd7c..fbee9ed924 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -15,12 +15,133 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" ) +func TestLoadConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + id component.ID + expected component.Config + }{ + { + id: component.NewID(component.MustNewType("telemetryapi")), + expected: NewFactory("extensionID").CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, function}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{platform, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + { + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"), + expected: &Config{ + extensionID: "extensionID", + Port: 12345, + Types: []string{function, extension}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.id.String(), func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory("extensionID") + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + require.Equal(t, tt.expected, cfg) + }) + } +} + func TestValidate(t *testing.T) { + t.Parallel() + testCases := []struct { desc string cfg *Config @@ -31,6 +152,13 @@ func TestValidate(t *testing.T) { cfg: &Config{}, expectedErr: nil, }, + { + desc: "invalid config", + cfg: &Config{ + Types: []string{"invalid"}, + }, + expectedErr: fmt.Errorf("unknown extension type: invalid"), + }, } for _, tc := range testCases { diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 63c8b2ce23..ced897d9f9 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -25,8 +25,12 @@ import ( ) const ( - typeStr = "telemetryapi" - stability = component.StabilityLevelDevelopment + typeStr = "telemetryapi" + stability = component.StabilityLevelDevelopment + defaultPort = 4325 + platform = "platform" + function = "function" + extension = "extension" ) var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") @@ -38,6 +42,8 @@ func NewFactory(extensionID string) receiver.Factory { func() component.Config { return &Config{ extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, } }, receiver.WithTraces(createTracesReceiver, stability), diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index 97961ea437..e0cfcd25eb 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test"} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 2d791ddba3..237a5b2922 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "os" + "strconv" "strings" "time" @@ -40,7 +41,6 @@ import ( "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const defaultListenerPort = "4325" const initialQueueSize = 5 const timeFormatLayout = "2006-01-02T15:04:05.000Z" const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" @@ -54,11 +54,13 @@ type telemetryAPIReceiver struct { lastPlatformStartTime string lastPlatformEndTime string extensionID string + port int + types []telemetryapi.EventType resource pcommon.Resource } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress() + address := listenOnAddress(r.port) r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() @@ -69,7 +71,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -337,21 +339,36 @@ func newTelemetryAPIReceiver( r.Attributes().PutStr(resourceAttribute, val) } } + + subscribedTypes := []telemetryapi.EventType{} + for _, val := range cfg.Types { + switch val { + case "platform": + subscribedTypes = append(subscribedTypes, telemetryapi.Platform) + case "function": + subscribedTypes = append(subscribedTypes, telemetryapi.Function) + case "extension": + subscribedTypes = append(subscribedTypes, telemetryapi.Extension) + } + } + return &telemetryAPIReceiver{ logger: set.Logger, queue: queue.New(initialQueueSize), extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, resource: r, } } -func listenOnAddress() string { +func listenOnAddress(port int) string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") var addr string if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort + addr = ":" + strconv.Itoa(port) } else { - addr = "sandbox.localdomain:" + defaultListenerPort + addr = "sandbox.localdomain:" + strconv.Itoa(port) } return addr diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index d43d17dfa9..fd1bb5e2a3 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -38,7 +38,7 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, "sandbox.localdomain:4325", addr) }, }, @@ -46,7 +46,7 @@ func TestListenOnAddress(t *testing.T) { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, ":4325", addr) }, }, diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml new file mode 100644 index 0000000000..b0935f779c --- /dev/null +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -0,0 +1,34 @@ +telemetryapi: +telemetryapi/1: + port: 12345 +telemetryapi/2: + port: "12345" +telemetryapi/3: + port: 12345 + types: ["platform"] +telemetryapi/4: + port: 12345 + types: ["function"] +telemetryapi/5: + port: 12345 + types: ["extension"] +telemetryapi/6: + port: 12345 + types: ["platform", "function"] +telemetryapi/7: + port: 12345 + types: ["platform", "extension"] +telemetryapi/8: + port: 12345 + types: ["function", "extension"] +telemetryapi/9: + port: 12345 + types: [] +telemetryapi/10: + port: 12345 + types: + - function + - extension +telemetryapi/11: + port: 12345 + types: [function, extension]