diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index b51ef1ed57..5923d89a37 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -16,13 +16,16 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry- import ( "fmt" + + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" ) // Config defines the configuration for the various elements of the receiver agent. type Config struct { - extensionID string - Port int `mapstructure:"port"` - Types []string `mapstructure:"types"` + extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` + MetricsBuilderConfig metadata.MetricsBuilderConfig `mapstructure:",squash"` } // Validate validates the configuration by checking for missing or invalid fields diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 025846e8cd..3d11a1696c 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -19,6 +19,7 @@ import ( "path/filepath" "testing" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap/confmaptest" @@ -30,9 +31,10 @@ func TestLoadConfig(t *testing.T) { // Helper function to create expected Config createExpectedConfig := func(types []string) *Config { return &Config{ - extensionID: "extensionID", - Port: 12345, - Types: types, + extensionID: "extensionID", + Port: 12345, + Types: types, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), } } diff --git a/collector/receiver/telemetryapireceiver/doc.go b/collector/receiver/telemetryapireceiver/doc.go index 59bb3fc485..b3063dadcc 100644 --- a/collector/receiver/telemetryapireceiver/doc.go +++ b/collector/receiver/telemetryapireceiver/doc.go @@ -12,5 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mdatagen metadata.yaml + // Package telemetryapireceiver generates telemetry in response to events from the Telemetry API. package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" diff --git a/collector/receiver/telemetryapireceiver/documentation.md b/collector/receiver/telemetryapireceiver/documentation.md new file mode 100644 index 0000000000..39127fbcd6 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/documentation.md @@ -0,0 +1,45 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# telemetryapi + +## Default Metrics + +The following metrics are emitted by default. Each of them can be disabled by applying the following configuration: + +```yaml +metrics: + : + enabled: false +``` + +### faas.coldstarts + +Number of invocation cold starts + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {coldstart} | Sum | Int | Delta | true | + +### faas.errors + +Number of invocation errors + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {error} | Sum | Int | Delta | true | + +### faas.invocations + +Number of successful invocations + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {invocation} | Sum | Int | Delta | true | + +### faas.timeouts + +Number of invocation timeouts + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {timeout} | Sum | Int | Delta | true | diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 455d7aa7e7..880e637a15 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -18,6 +18,7 @@ import ( "context" "errors" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -41,9 +42,10 @@ func NewFactory(extensionID string) receiver.Factory { component.MustNewType(typeStr), func() component.Config { return &Config{ - extensionID: extensionID, - Port: defaultPort, - Types: []string{platform, function, extension}, + extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), } }, receiver.WithTraces(createTracesReceiver, stability), diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index eff29e92f3..bf3b94a99a 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" @@ -41,7 +42,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig()} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/generated_component_test.go b/collector/receiver/telemetryapireceiver/generated_component_test.go new file mode 100644 index 0000000000..b362d755a3 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/generated_component_test.go @@ -0,0 +1,71 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package telemetryapireceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestComponentFactoryType(t *testing.T) { + require.Equal(t, "telemetryapi", NewFactory("extensionID").Type().String()) +} + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(NewFactory("extensionID").CreateDefaultConfig())) +} + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory("extensionID") + + tests := []struct { + name string + createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "metrics", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "traces", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateTracesReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/generated_package_test.go b/collector/receiver/telemetryapireceiver/generated_package_test.go new file mode 100644 index 0000000000..817af6e8d5 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/generated_package_test.go @@ -0,0 +1,12 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package telemetryapireceiver + +import ( + "go.uber.org/goleak" + "testing" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 68d8adc891..6cd30bb002 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -8,7 +8,8 @@ replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 - github.com/google/uuid v1.6.0 + github.com/google/go-cmp v0.6.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0 github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.107.0 @@ -18,6 +19,7 @@ require ( go.opentelemetry.io/collector/pdata v1.13.0 go.opentelemetry.io/collector/receiver v0.107.0 go.opentelemetry.io/collector/semconv v0.107.0 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -29,6 +31,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -39,6 +42,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 4998268d81..89268a2878 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -49,6 +49,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.107.0 h1:Zo2UJk4AOxU5M6q3LNiFgTZfQicY9zz/BDqJWsmWzY4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.107.0/go.mod h1:qj9lEtkVjQUzZ7FdJTeDqqTUq9xVU9kE4F8zZnHFB9M= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0 h1:g1pkpFfe+dnhpfvo+f9yFIkbvTdiOvNmFOUFNzVAgvk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0/go.mod h1:oG/PliNiIOUHVARyDrFdvxFvG8DUPEjMGlmxjEqeoKM= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0 h1:zTeRh4V3rMlXgNvfbDBnET6nvhOeZpYIbKTjVbSl9Ws= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0/go.mod h1:/RtBag3LuHIkqN4bo8Erd3jCzA3gea70l9WyJ9TncXM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/generated_config.go b/collector/receiver/telemetryapireceiver/internal/metadata/generated_config.go new file mode 100644 index 0000000000..85dc9b9510 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/generated_config.go @@ -0,0 +1,62 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/confmap" +) + +// MetricConfig provides common config for a particular metric. +type MetricConfig struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsConfig provides config for telemetryapi metrics. +type MetricsConfig struct { + FaasColdstarts MetricConfig `mapstructure:"faas.coldstarts"` + FaasErrors MetricConfig `mapstructure:"faas.errors"` + FaasInvocations MetricConfig `mapstructure:"faas.invocations"` + FaasTimeouts MetricConfig `mapstructure:"faas.timeouts"` +} + +func DefaultMetricsConfig() MetricsConfig { + return MetricsConfig{ + FaasColdstarts: MetricConfig{ + Enabled: true, + }, + FaasErrors: MetricConfig{ + Enabled: true, + }, + FaasInvocations: MetricConfig{ + Enabled: true, + }, + FaasTimeouts: MetricConfig{ + Enabled: true, + }, + } +} + +// MetricsBuilderConfig is a configuration for telemetryapi metrics builder. +type MetricsBuilderConfig struct { + Metrics MetricsConfig `mapstructure:"metrics"` +} + +func DefaultMetricsBuilderConfig() MetricsBuilderConfig { + return MetricsBuilderConfig{ + Metrics: DefaultMetricsConfig(), + } +} diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/generated_config_test.go b/collector/receiver/telemetryapireceiver/internal/metadata/generated_config_test.go new file mode 100644 index 0000000000..51221d325f --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/generated_config_test.go @@ -0,0 +1,65 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestMetricsBuilderConfig(t *testing.T) { + tests := []struct { + name string + want MetricsBuilderConfig + }{ + { + name: "default", + want: DefaultMetricsBuilderConfig(), + }, + { + name: "all_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + FaasColdstarts: MetricConfig{Enabled: true}, + FaasErrors: MetricConfig{Enabled: true}, + FaasInvocations: MetricConfig{Enabled: true}, + FaasTimeouts: MetricConfig{Enabled: true}, + }, + }, + }, + { + name: "none_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + FaasColdstarts: MetricConfig{Enabled: false}, + FaasErrors: MetricConfig{Enabled: false}, + FaasInvocations: MetricConfig{Enabled: false}, + FaasTimeouts: MetricConfig{Enabled: false}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadMetricsBuilderConfig(t, tt.name) + if diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{})); diff != "" { + t.Errorf("Config mismatch (-expected +actual):\n%s", diff) + } + }) + } +} + +func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsBuilderConfig() + require.NoError(t, sub.Unmarshal(&cfg)) + return cfg +} diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics.go b/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics.go new file mode 100644 index 0000000000..0a0033ed86 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics.go @@ -0,0 +1,361 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" +) + +type metricFaasColdstarts struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills faas.coldstarts metric with initial data. +func (m *metricFaasColdstarts) init() { + m.data.SetName("faas.coldstarts") + m.data.SetDescription("Number of invocation cold starts") + m.data.SetUnit("{coldstart}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) +} + +func (m *metricFaasColdstarts) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricFaasColdstarts) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricFaasColdstarts) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricFaasColdstarts(cfg MetricConfig) metricFaasColdstarts { + m := metricFaasColdstarts{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricFaasErrors struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills faas.errors metric with initial data. +func (m *metricFaasErrors) init() { + m.data.SetName("faas.errors") + m.data.SetDescription("Number of invocation errors") + m.data.SetUnit("{error}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) +} + +func (m *metricFaasErrors) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricFaasErrors) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricFaasErrors) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricFaasErrors(cfg MetricConfig) metricFaasErrors { + m := metricFaasErrors{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricFaasInvocations struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills faas.invocations metric with initial data. +func (m *metricFaasInvocations) init() { + m.data.SetName("faas.invocations") + m.data.SetDescription("Number of successful invocations") + m.data.SetUnit("{invocation}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) +} + +func (m *metricFaasInvocations) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricFaasInvocations) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricFaasInvocations) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricFaasInvocations(cfg MetricConfig) metricFaasInvocations { + m := metricFaasInvocations{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricFaasTimeouts struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills faas.timeouts metric with initial data. +func (m *metricFaasTimeouts) init() { + m.data.SetName("faas.timeouts") + m.data.SetDescription("Number of invocation timeouts") + m.data.SetUnit("{timeout}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) +} + +func (m *metricFaasTimeouts) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricFaasTimeouts) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricFaasTimeouts) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricFaasTimeouts(cfg MetricConfig) metricFaasTimeouts { + m := metricFaasTimeouts{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user config. +type MetricsBuilder struct { + config MetricsBuilderConfig // config of the metrics builder. + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information. + metricFaasColdstarts metricFaasColdstarts + metricFaasErrors metricFaasErrors + metricFaasInvocations metricFaasInvocations + metricFaasTimeouts metricFaasTimeouts +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + config: mbc, + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + metricFaasColdstarts: newMetricFaasColdstarts(mbc.Metrics.FaasColdstarts), + metricFaasErrors: newMetricFaasErrors(mbc.Metrics.FaasErrors), + metricFaasInvocations: newMetricFaasInvocations(mbc.Metrics.FaasInvocations), + metricFaasTimeouts: newMetricFaasTimeouts(mbc.Metrics.FaasTimeouts), + } + + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(pmetric.ResourceMetrics) + +// WithResource sets the provided resource on the emitted ResourceMetrics. +// It's recommended to use ResourceBuilder to create the resource. +func WithResource(res pcommon.Resource) ResourceMetricsOption { + return func(rm pmetric.ResourceMetrics) { + res.CopyTo(rm.Resource()) + } +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricFaasColdstarts.emit(ils.Metrics()) + mb.metricFaasErrors.emit(ils.Metrics()) + mb.metricFaasInvocations.emit(ils.Metrics()) + mb.metricFaasTimeouts.emit(ils.Metrics()) + + for _, op := range rmo { + op(rm) + } + + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user config, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := mb.metricsBuffer + mb.metricsBuffer = pmetric.NewMetrics() + return metrics +} + +// RecordFaasColdstartsDataPoint adds a data point to faas.coldstarts metric. +func (mb *MetricsBuilder) RecordFaasColdstartsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricFaasColdstarts.recordDataPoint(mb.startTime, ts, val) +} + +// RecordFaasErrorsDataPoint adds a data point to faas.errors metric. +func (mb *MetricsBuilder) RecordFaasErrorsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricFaasErrors.recordDataPoint(mb.startTime, ts, val) +} + +// RecordFaasInvocationsDataPoint adds a data point to faas.invocations metric. +func (mb *MetricsBuilder) RecordFaasInvocationsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricFaasInvocations.recordDataPoint(mb.startTime, ts, val) +} + +// RecordFaasTimeoutsDataPoint adds a data point to faas.timeouts metric. +func (mb *MetricsBuilder) RecordFaasTimeoutsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricFaasTimeouts.recordDataPoint(mb.startTime, ts, val) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics_test.go b/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics_test.go new file mode 100644 index 0000000000..cfc3d144e9 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/generated_metrics_test.go @@ -0,0 +1,160 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testDataSet int + +const ( + testDataSetDefault testDataSet = iota + testDataSetAll + testDataSetNone +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + metricsSet testDataSet + resAttrsSet testDataSet + expectEmpty bool + }{ + { + name: "default", + }, + { + name: "all_set", + metricsSet: testDataSetAll, + resAttrsSet: testDataSetAll, + }, + { + name: "none_set", + metricsSet: testDataSetNone, + resAttrsSet: testDataSetNone, + expectEmpty: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, test.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordFaasColdstartsDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordFaasErrorsDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordFaasInvocationsDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordFaasTimeoutsDataPoint(ts, 1) + + res := pcommon.NewResource() + metrics := mb.Emit(WithResource(res)) + + if test.expectEmpty { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + assert.Equal(t, res, rm.Resource()) + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if test.metricsSet == testDataSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if test.metricsSet == testDataSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "faas.coldstarts": + assert.False(t, validatedMetrics["faas.coldstarts"], "Found a duplicate in the metrics slice: faas.coldstarts") + validatedMetrics["faas.coldstarts"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Number of invocation cold starts", ms.At(i).Description()) + assert.Equal(t, "{coldstart}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "faas.errors": + assert.False(t, validatedMetrics["faas.errors"], "Found a duplicate in the metrics slice: faas.errors") + validatedMetrics["faas.errors"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Number of invocation errors", ms.At(i).Description()) + assert.Equal(t, "{error}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "faas.invocations": + assert.False(t, validatedMetrics["faas.invocations"], "Found a duplicate in the metrics slice: faas.invocations") + validatedMetrics["faas.invocations"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Number of successful invocations", ms.At(i).Description()) + assert.Equal(t, "{invocation}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "faas.timeouts": + assert.False(t, validatedMetrics["faas.timeouts"], "Found a duplicate in the metrics slice: faas.timeouts") + validatedMetrics["faas.timeouts"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Number of invocation timeouts", ms.At(i).Description()) + assert.Equal(t, "{timeout}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + } + } + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/generated_status.go b/collector/receiver/telemetryapireceiver/internal/metadata/generated_status.go new file mode 100644 index 0000000000..6c4730a6d3 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/generated_status.go @@ -0,0 +1,18 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("telemetryapi") + ScopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +) + +const ( + TracesStability = component.StabilityLevelAlpha + MetricsStability = component.StabilityLevelAlpha + LogsStability = component.StabilityLevelAlpha +) diff --git a/collector/receiver/telemetryapireceiver/internal/metadata/testdata/config.yaml b/collector/receiver/telemetryapireceiver/internal/metadata/testdata/config.yaml new file mode 100644 index 0000000000..5c2da193b8 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/metadata/testdata/config.yaml @@ -0,0 +1,21 @@ +default: +all_set: + metrics: + faas.coldstarts: + enabled: true + faas.errors: + enabled: true + faas.invocations: + enabled: true + faas.timeouts: + enabled: true +none_set: + metrics: + faas.coldstarts: + enabled: false + faas.errors: + enabled: false + faas.invocations: + enabled: false + faas.timeouts: + enabled: false diff --git a/collector/receiver/telemetryapireceiver/metadata.yaml b/collector/receiver/telemetryapireceiver/metadata.yaml new file mode 100644 index 0000000000..e0dce4628d --- /dev/null +++ b/collector/receiver/telemetryapireceiver/metadata.yaml @@ -0,0 +1,41 @@ +type: telemetryapi + +status: + class: receiver + stability: + alpha: [traces, metrics, logs] + distributions: [] + +metrics: + faas.coldstarts: + enabled: true + description: Number of invocation cold starts + unit: '{coldstart}' + sum: + value_type: int + monotonic: true + aggregation_temporality: delta + faas.errors: + enabled: true + description: Number of invocation errors + unit: '{error}' + sum: + value_type: int + monotonic: true + aggregation_temporality: delta + faas.invocations: + enabled: true + description: Number of successful invocations + unit: '{invocation}' + sum: + value_type: int + monotonic: true + aggregation_temporality: delta + faas.timeouts: + enabled: true + description: Number of invocation timeouts + unit: '{timeout}' + sum: + value_type: int + monotonic: true + aggregation_temporality: delta diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index a5a18ee98a..283af29e8a 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -30,7 +30,7 @@ import ( "time" "github.com/golang-collections/go-datastructures/queue" - "github.com/google/uuid" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -60,11 +60,7 @@ type telemetryAPIReceiver struct { port int types []telemetryapi.EventType resource pcommon.Resource - metricsStartTime time.Time - coldStartCounter int64 - errorsCounter int64 - invocationsCounter int64 - timeoutsCounter int64 + metricsBuilder *metadata.MetricsBuilder } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { @@ -145,7 +141,7 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ // metrics if r.nextMetrics != nil { if metrics, err := r.createMetrics(slice); err == nil { - if metrics.DataPointCount() > 0 { + if metrics.ResourceMetrics().Len() > 0 { err := r.nextMetrics.ConsumeMetrics(context.Background(), metrics) if err != nil { r.logger.Error("error receiving metrics", zap.Error(err)) @@ -216,21 +212,8 @@ func (r *telemetryAPIReceiver) createTraces(slice []event) (ptrace.Traces, error } func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, error) { - metrics := pmetric.NewMetrics() - resourceMetric := metrics.ResourceMetrics().AppendEmpty() - r.resource.CopyTo(resourceMetric.Resource()) - scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() - scopeMetric.Scope().SetName(scopeName) for _, el := range slice { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) - if r.metricsStartTime.IsZero() { - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - r.metricsStartTime = t - } else { - return pmetric.Metrics{}, err - } - - } switch el.Type { case string(telemetryapi.PlatformInitReport): jsonStr, err := json.Marshal(el.Record) @@ -242,21 +225,11 @@ func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, er return pmetric.Metrics{}, err } else { if report.Phase == initPhaseInit { - r.coldStartCounter++ - metrics := scopeMetric.Metrics().AppendEmpty() - metrics.Metadata().PutStr("type", el.Type) - metrics.SetName(semconv.AttributeFaaSColdstart) - sum := metrics.SetEmptySum() - sumHelper(sum, r.coldStartCounter, r.metricsStartTime) + r.metricsBuilder.RecordFaasColdstartsDataPoint(pcommon.NewTimestampFromTime(time.Now()), 1) } } case string(telemetryapi.PlatformReport): - r.invocationsCounter++ - metrics := scopeMetric.Metrics().AppendEmpty() - metrics.Metadata().PutStr("type", el.Type) - metrics.SetName("faas.invocations") - sum := metrics.SetEmptySum() - sumHelper(sum, r.invocationsCounter, r.metricsStartTime) + r.metricsBuilder.RecordFaasInvocationsDataPoint(pcommon.NewTimestampFromTime(time.Now()), 1) jsonStr, err := json.Marshal(el.Record) if err != nil { return pmetric.Metrics{}, err @@ -266,25 +239,15 @@ func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, er return pmetric.Metrics{}, err } else { if report.Status != statusSuccess { - r.errorsCounter++ - metrics := scopeMetric.Metrics().AppendEmpty() - metrics.Metadata().PutStr("type", el.Type) - metrics.SetName("faas.errors") - sum := metrics.SetEmptySum() - sumHelper(sum, r.errorsCounter, r.metricsStartTime) + r.metricsBuilder.RecordFaasErrorsDataPoint(pcommon.NewTimestampFromTime(time.Now()), 1) } if report.Status == statusTimeout { - r.timeoutsCounter++ - metrics := scopeMetric.Metrics().AppendEmpty() - metrics.Metadata().PutStr("type", el.Type) - metrics.SetName("faas.timeouts") - sum := metrics.SetEmptySum() - sumHelper(sum, r.timeoutsCounter, r.metricsStartTime) + r.metricsBuilder.RecordFaasTimeoutsDataPoint(pcommon.NewTimestampFromTime(time.Now()), 1) } } } } - + metrics := r.metricsBuilder.Emit(metadata.WithResource(r.resource)) return metrics, nil } @@ -337,16 +300,6 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { return logs, nil } -func sumHelper(sum pmetric.Sum, count int64, metricsStartTime time.Time) { - sum.SetIsMonotonic(true) - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - dp := sum.DataPoints().AppendEmpty() - dp.SetIntValue(count) - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(metricsStartTime)) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - dp.Attributes().PutStr(semconv.AttributeFaaSTrigger, semconv.AttributeFaaSTriggerOther) -} - func severityTextToNumber(severityText string) plog.SeverityNumber { mapping := map[string]plog.SeverityNumber{ "TRACE": plog.SeverityNumberTrace, @@ -424,7 +377,7 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace func newTelemetryAPIReceiver( cfg *Config, - set receiver.Settings, + settings receiver.Settings, ) (*telemetryAPIReceiver, error) { envResourceMap := map[string]string{ "AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion, @@ -447,8 +400,6 @@ func newTelemetryAPIReceiver( r.Attributes().PutInt(semconv.AttributeFaaSMaxMemory, int64(mb)*1024*1024) } } - // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer - r.Attributes().PutStr(semconv.AttributeServiceInstanceID, uuid.New().String()) for env, resourceAttribute := range envResourceMap { if val, ok := os.LookupEnv(env); ok { @@ -469,16 +420,13 @@ func newTelemetryAPIReceiver( } return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - extensionID: cfg.extensionID, - port: cfg.Port, - types: subscribedTypes, - resource: r, - coldStartCounter: 0, - errorsCounter: 0, - invocationsCounter: 0, - timeoutsCounter: 0, + logger: settings.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, + metricsBuilder: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings), }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index c19562734f..7517f2bb6a 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -21,11 +21,12 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/metadata" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" semconv "go.opentelemetry.io/collector/semconv/v1.25.0" @@ -176,15 +177,16 @@ func TestCreateMetrics(t *testing.T) { t.Parallel() testCases := []struct { - desc string - slice []event - expectedType string - expectedMetrics []map[string]any - expectError bool + desc string + slice []event + expectedResourceMetrics int + expectedMetrics map[string]int + expectError bool }{ { - desc: "no slice", - expectError: false, + desc: "no slice", + expectError: false, + expectedResourceMetrics: 0, }, { desc: "platform.initReport", @@ -209,14 +211,11 @@ func TestCreateMetrics(t *testing.T) { }, }, }, - expectedMetrics: []map[string]any{ - { - "Name": semconv.AttributeFaaSColdstart, - "Value": int64(1), - }, + expectedResourceMetrics: 1, + expectedMetrics: map[string]int{ + "faas.coldstarts": 1, }, - expectedType: "platform.initReport", - expectError: false, + expectError: false, }, { desc: "platform.Report success", @@ -244,12 +243,9 @@ func TestCreateMetrics(t *testing.T) { }, }, }, - expectedType: "platform.report", - expectedMetrics: []map[string]any{ - { - "Name": "faas.invocations", - "Value": int64(1), - }, + expectedResourceMetrics: 1, + expectedMetrics: map[string]int{ + "faas.invocations": 1, }, expectError: false, }, @@ -274,16 +270,10 @@ func TestCreateMetrics(t *testing.T) { }, }, }, - expectedType: "platform.report", - expectedMetrics: []map[string]any{ - { - "Name": "faas.invocations", - "Value": int64(1), - }, - { - "Name": "faas.errors", - "Value": int64(1), - }, + expectedResourceMetrics: 1, + expectedMetrics: map[string]int{ + "faas.errors": 1, + "faas.invocations": 1, }, expectError: false, }, @@ -308,16 +298,10 @@ func TestCreateMetrics(t *testing.T) { }, }, }, - expectedType: "platform.report", - expectedMetrics: []map[string]any{ - { - "Name": "faas.invocations", - "Value": int64(1), - }, - { - "Name": "faas.errors", - "Value": int64(1), - }, + expectedResourceMetrics: 1, + expectedMetrics: map[string]int{ + "faas.errors": 1, + "faas.invocations": 1, }, expectError: false, }, @@ -341,28 +325,22 @@ func TestCreateMetrics(t *testing.T) { }, }, }, - expectedType: "platform.report", - expectedMetrics: []map[string]any{ - { - "Name": "faas.invocations", - "Value": int64(1), - }, - { - "Name": "faas.errors", - "Value": int64(1), - }, - { - "Name": "faas.timeouts", - "Value": int64(1), - }, + expectedResourceMetrics: 1, + expectedMetrics: map[string]int{ + "faas.errors": 1, + "faas.invocations": 1, + "faas.timeouts": 1, }, expectError: false, }, } + for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( - &Config{}, + &Config{ + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + }, receivertest.NewNopSettings(), ) require.NoError(t, err) @@ -370,23 +348,33 @@ func TestCreateMetrics(t *testing.T) { if tc.expectError { require.Error(t, err) } else { - require.Equal(t, 1, metrics.ResourceMetrics().Len()) - resourceMetric := metrics.ResourceMetrics().At(0) - require.Equal(t, 1, resourceMetric.ScopeMetrics().Len()) - scopeMetric := resourceMetric.ScopeMetrics().At(0) - require.Equal(t, scopeName, scopeMetric.Scope().Name()) - require.Equal(t, len(tc.expectedMetrics), scopeMetric.Metrics().Len()) - for idx, m := range tc.expectedMetrics { - metric := scopeMetric.Metrics().At(idx) - attr, ok := metric.Metadata().Get("type") - require.True(t, ok) - require.Equal(t, tc.expectedType, attr.Str()) - require.Equal(t, m["Name"], metric.Name()) - require.True(t, metric.Sum().IsMonotonic()) - require.Equal(t, pmetric.AggregationTemporalityCumulative, metric.Sum().AggregationTemporality()) - require.Equal(t, 1, metric.Sum().DataPoints().Len()) - require.Equal(t, m["Value"], metric.Sum().DataPoints().At(0).IntValue()) + now := pcommon.NewTimestampFromTime(time.Now().UTC()) + expectedMB := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopSettings()) + for k, v := range tc.expectedMetrics { + switch k { + case "faas.coldstarts": + for _ = range v { + expectedMB.RecordFaasColdstartsDataPoint(now, 1) + } + case "faas.errors": + for _ = range v { + expectedMB.RecordFaasErrorsDataPoint(now, 1) + } + case "faas.invocations": + for _ = range v { + expectedMB.RecordFaasInvocationsDataPoint(now, 1) + } + case "faas.timeouts": + for _ = range v { + expectedMB.RecordFaasTimeoutsDataPoint(now, 1) + } + default: + + } } + expectedMB.EmitForResource(metadata.WithResource(r.resource)) + expectedMetrics := expectedMB.Emit() + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, metrics, pmetrictest.IgnoreResourceMetricsOrder(), pmetrictest.IgnoreMetricDataPointsOrder(), pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp())) } }) } @@ -613,34 +601,3 @@ func TestSeverityTextToNumber(t *testing.T) { require.Equal(t, plog.SeverityNumberUnspecified, severityTextToNumber(level)) } } - -func TestSumHelper(t *testing.T) { - t.Parallel() - testCases := []struct { - count int64 - time time.Time - }{ - { - count: 12345, - time: time.Date(2024, time.July, 5, 21, 12, 37, 0, time.UTC), - }, - { - count: 678910, - time: time.Date(2024, time.July, 9, 10, 53, 34, 689*1000*1000, time.UTC), - }, - } - for _, tc := range testCases { - sum := pmetric.NewSum() - sumHelper(sum, tc.count, tc.time) - require.True(t, sum.IsMonotonic()) - require.Equal(t, pmetric.AggregationTemporalityCumulative, sum.AggregationTemporality()) - require.Equal(t, 1, sum.DataPoints().Len()) - dp := sum.DataPoints().At(0) - require.Equal(t, tc.count, dp.IntValue()) - require.Equal(t, tc.time, dp.StartTimestamp().AsTime()) - require.Equal(t, 1, dp.Attributes().Len()) - trigger, ok := dp.Attributes().Get(semconv.AttributeFaaSTrigger) - require.True(t, ok) - require.Equal(t, semconv.AttributeFaaSTriggerOther, trigger.AsString()) - } -}