Skip to content

Commit

Permalink
Added config (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrytfleung authored Jun 20, 2024
1 parent 0c54292 commit dd2e317
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 12 deletions.
16 changes: 15 additions & 1 deletion collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,25 @@ Supported events:

## Configuration

There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration:
| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4235 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
telemetryapi/2:
port: 4326
types:
- platform
- function
telemetryapi/3:
port: 4326
types: ["platform", "function"]
```
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
Expand Down
11 changes: 11 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@

package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
)

// Config defines the configuration for the various elements of the receiver agent.
type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
}
}
return nil
}
128 changes: 128 additions & 0 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,133 @@
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewID(component.MustNewType("telemetryapi")),
expected: NewFactory("extensionID").CreateDefaultConfig(),
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory("extensionID")
cfg := factory.CreateDefaultConfig()
sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))
require.NoError(t, component.ValidateConfig(cfg))
require.Equal(t, tt.expected, cfg)
})
}
}

func TestValidate(t *testing.T) {
t.Parallel()

testCases := []struct {
desc string
cfg *Config
Expand All @@ -31,6 +152,13 @@ func TestValidate(t *testing.T) {
cfg: &Config{},
expectedErr: nil,
},
{
desc: "invalid config",
cfg: &Config{
Types: []string{"invalid"},
},
expectedErr: fmt.Errorf("unknown extension type: invalid"),
},
}

for _, tc := range testCases {
Expand Down
10 changes: 8 additions & 2 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import (
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
platform = "platform"
function = "function"
extension = "extension"
)

var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config")
Expand All @@ -38,6 +42,8 @@ func NewFactory(extensionID string) receiver.Factory {
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
}
},
receiver.WithTraces(createTracesReceiver, stability),
Expand Down
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test"}
var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
29 changes: 23 additions & 6 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand All @@ -40,7 +41,6 @@ import (
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
)

const defaultListenerPort = "4325"
const initialQueueSize = 5
const timeFormatLayout = "2006-01-02T15:04:05.000Z"
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
Expand All @@ -54,11 +54,13 @@ type telemetryAPIReceiver struct {
lastPlatformStartTime string
lastPlatformEndTime string
extensionID string
port int
types []telemetryapi.EventType
resource pcommon.Resource
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
address := listenOnAddress()
address := listenOnAddress(r.port)
r.logger.Info("Listening for requests", zap.String("address", address))

mux := http.NewServeMux()
Expand All @@ -69,7 +71,7 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e
}()

telemetryClient := telemetryapi.NewClient(r.logger)
_, err := telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function, telemetryapi.Extension}, r.extensionID, fmt.Sprintf("http://%s/", address))
_, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address))
if err != nil {
r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID))
return err
Expand Down Expand Up @@ -337,21 +339,36 @@ func newTelemetryAPIReceiver(
r.Attributes().PutStr(resourceAttribute, val)
}
}

subscribedTypes := []telemetryapi.EventType{}
for _, val := range cfg.Types {
switch val {
case "platform":
subscribedTypes = append(subscribedTypes, telemetryapi.Platform)
case "function":
subscribedTypes = append(subscribedTypes, telemetryapi.Function)
case "extension":
subscribedTypes = append(subscribedTypes, telemetryapi.Extension)
}
}

return &telemetryAPIReceiver{
logger: set.Logger,
queue: queue.New(initialQueueSize),
extensionID: cfg.extensionID,
port: cfg.Port,
types: subscribedTypes,
resource: r,
}
}

func listenOnAddress() string {
func listenOnAddress(port int) string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + defaultListenerPort
addr = ":" + strconv.Itoa(port)
} else {
addr = "sandbox.localdomain:" + defaultListenerPort
addr = "sandbox.localdomain:" + strconv.Itoa(port)
}

return addr
Expand Down
4 changes: 2 additions & 2 deletions collector/receiver/telemetryapireceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func TestListenOnAddress(t *testing.T) {
{
desc: "listen on address without AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
addr := listenOnAddress()
addr := listenOnAddress(4325)
require.EqualValues(t, "sandbox.localdomain:4325", addr)
},
},
{
desc: "listen on address with AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
t.Setenv("AWS_SAM_LOCAL", "true")
addr := listenOnAddress()
addr := listenOnAddress(4325)
require.EqualValues(t, ":4325", addr)
},
},
Expand Down
34 changes: 34 additions & 0 deletions collector/receiver/telemetryapireceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
telemetryapi:
telemetryapi/1:
port: 12345
telemetryapi/2:
port: "12345"
telemetryapi/3:
port: 12345
types: ["platform"]
telemetryapi/4:
port: 12345
types: ["function"]
telemetryapi/5:
port: 12345
types: ["extension"]
telemetryapi/6:
port: 12345
types: ["platform", "function"]
telemetryapi/7:
port: 12345
types: ["platform", "extension"]
telemetryapi/8:
port: 12345
types: ["function", "extension"]
telemetryapi/9:
port: 12345
types: []
telemetryapi/10:
port: 12345
types:
- function
- extension
telemetryapi/11:
port: 12345
types: [function, extension]

0 comments on commit dd2e317

Please sign in to comment.