Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please ignore #1413

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
Expand Down
8 changes: 1 addition & 7 deletions collector/internal/telemetryapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client {
}
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}

func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
Expand Down
26 changes: 20 additions & 6 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |-----------------|
| Stability | [alpha] |
| Supported pipeline types | traces |
| Distributions | [extension] |
| Status | |
| ------------------------ |--------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand All @@ -15,11 +15,25 @@ Supported events:

## Configuration

There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration:
| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
telemetryapi/2:
port: 4326
types:
- platform
- function
telemetryapi/3:
port: 4326
types: ["platform", "function"]
```

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
Expand Down
11 changes: 11 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@

package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
)

// Config defines the configuration for the various elements of the receiver agent.
type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
}
}
return nil
}
126 changes: 126 additions & 0 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,130 @@
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewID(component.MustNewType("telemetryapi")),
expected: NewFactory("extensionID").CreateDefaultConfig(),
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory("extensionID")
cfg := factory.CreateDefaultConfig()
sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))
require.NoError(t, component.ValidateConfig(cfg))
require.Equal(t, tt.expected, cfg)
})
}
}

func TestValidate(t *testing.T) {
testCases := []struct {
desc string
Expand All @@ -31,6 +150,13 @@ func TestValidate(t *testing.T) {
cfg: &Config{},
expectedErr: nil,
},
{
desc: "invalid config",
cfg: &Config{
Types: []string{"invalid"},
},
expectedErr: fmt.Errorf("unknown extension type: invalid"),
},
}

for _, tc := range testCases {
Expand Down
33 changes: 29 additions & 4 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
platform = "platform"
function = "function"
extension = "extension"
)

var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config")
Expand All @@ -37,16 +42,36 @@ func NewFactory(extensionID string) receiver.Factory {
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
}
},
receiver.WithTraces(createTracesReceiver, stability))
receiver.WithTraces(createTracesReceiver, stability),
receiver.WithLogs(createLogsReceiver, stability))
}

func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
return r, nil
}

return newTelemetryAPIReceiver(cfg, next, params)
func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next)
return r, nil
}

var receivers = sharedcomponent.NewSharedComponents()
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test"}
var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents struct {
comps map[any]*SharedComponent
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents() *SharedComponents {
return &SharedComponents{
comps: make(map[any]*SharedComponent),
}
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent {
if c, ok := scs.comps[key]; ok {
return c
}
newComp := &SharedComponent{
Component: create(),
removeFunc: func() {
delete(scs.comps, key)
},
}
scs.comps[key] = newComp
return newComp
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent struct {
component.Component

startOnce sync.Once
stopOnce sync.Once
removeFunc func()
}

// Unwrap returns the original component.
func (r *SharedComponent) Unwrap() component.Component {
return r.Component
}

// Start implements component.Component.
func (r *SharedComponent) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
err = r.Component.Start(ctx, host)
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = r.Component.Shutdown(ctx)
r.removeFunc()
})
return err
}
Loading
Loading