From 0aeca5963d82d63be1c596d210bdc53a44959f27 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Wed, 4 Oct 2023 10:45:45 -0400 Subject: [PATCH] Add faro.receiver component (#5314) This commit adds a component for faro.reciever, an equivalent to the app_agent_receiver integration from static mode. This is not 100% a straight port from the existing integration. I have done some minor refactoring to make it possible for this component to be dynamically updateable at runtime. However, the implementation remains mostly the same. During my porting, I have noticed bugs in integration, particularly around sourcemaps. I have not fixed these, but I have added TODO comments in the code for them to be resolved down the line. Unfortunately, the resulting code is quite complex. Future changes to refactor it to make it easier to read are warranted, but it would likely require more code and would drift further from the original integration. In the spirit of this being a "port," such changes are not included here. Supersedes grafana/agent#5243 Closes grafana/agent#2320 Co-authored-by: CRA Keishi Kawada Co-authored-by: mattdurham Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> Co-authored-by: Paschalis Tsilias --- CHANGELOG.md | 19 +- component/all/all.go | 1 + component/faro/receiver/arguments.go | 93 +++ component/faro/receiver/exporters.go | 247 ++++++++ component/faro/receiver/exporters_test.go | 55 ++ component/faro/receiver/handler.go | 134 +++++ component/faro/receiver/handler_test.go | 289 ++++++++++ .../faro/receiver/internal/payload/payload.go | 418 ++++++++++++++ .../receiver/internal/payload/payload_test.go | 141 +++++ .../faro/receiver/internal/payload/utils.go | 73 +++ component/faro/receiver/receiver.go | 232 ++++++++ component/faro/receiver/receiver_test.go | 152 +++++ component/faro/receiver/server.go | 117 ++++ component/faro/receiver/sourcemaps.go | 374 +++++++++++++ component/faro/receiver/sourcemaps_test.go | 528 ++++++++++++++++++ component/faro/receiver/testdata/foo.js | 39 ++ component/faro/receiver/testdata/foo.js.map | 1 + component/faro/receiver/testdata/payload.json | 330 +++++++++++ .../faro/receiver/testdata/payload_2.json | 393 +++++++++++++ .../reference/components/faro.receiver.md | 268 +++++++++ 20 files changed, 3896 insertions(+), 8 deletions(-) create mode 100644 component/faro/receiver/arguments.go create mode 100644 component/faro/receiver/exporters.go create mode 100644 component/faro/receiver/exporters_test.go create mode 100644 component/faro/receiver/handler.go create mode 100644 component/faro/receiver/handler_test.go create mode 100644 component/faro/receiver/internal/payload/payload.go create mode 100644 component/faro/receiver/internal/payload/payload_test.go create mode 100644 component/faro/receiver/internal/payload/utils.go create mode 100644 component/faro/receiver/receiver.go create mode 100644 component/faro/receiver/receiver_test.go create mode 100644 component/faro/receiver/server.go create mode 100644 component/faro/receiver/sourcemaps.go create mode 100644 component/faro/receiver/sourcemaps_test.go create mode 100644 component/faro/receiver/testdata/foo.js create mode 100644 component/faro/receiver/testdata/foo.js.map create mode 100644 component/faro/receiver/testdata/payload.json create mode 100644 component/faro/receiver/testdata/payload_2.json create mode 100644 docs/sources/flow/reference/components/faro.receiver.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 74400a75459d..787dd4946f7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,9 @@ Main (unreleased) - `discovery.serverset` discovers Serversets stored in Zookeeper. (@thampiotr) - `discovery.scaleway` discovers scrape targets from Scaleway virtual instances and bare-metal machines. (@rfratto) + - `faro.receiver` accepts Grafana Faro-formatted telemetry data over the + network and forwards it to other components. (@megumish, @rfratto) + - `prometheus.exporter.azure` collects metrics from Azure. (@wildum) - `discovery.dockerswarm` discovers scrape targets from Docker Swarm. (@wildum) - `otelcol.connector.servicegraph` creates service graph metrics from spans. It is the flow mode equivalent to static mode's `service_graphs` processor. (@ptodev) @@ -48,8 +51,8 @@ Main (unreleased) - `otelcol.processor.k8sattributes` adds Kubernetes metadata as resource attributes to spans, logs, and metrics. (@acr92) - `otelcol.processor.probabilistic_sampler` samples logs and traces based on configuration options. (@mar4uk) - - `otelcol.processor.transform` transforms OTLP telemetry data using the - OpenTelemetry Transformation Language (OTTL). It is most commonly used + - `otelcol.processor.transform` transforms OTLP telemetry data using the + OpenTelemetry Transformation Language (OTTL). It is most commonly used for transformations on attributes. - `remote.kubernetes.configmap` loads a configmap's data for use in other components (@captncraig) - `remote.kubernetes.secret` loads a secret's data for use in other components (@captncraig) @@ -105,7 +108,7 @@ Main (unreleased) - Flow: add `randomization_factor` and `multiplier` to retry settings in `otelcol` components. (@rfratto) - + - Add support for `windows_certificate_filter` under http tls config block. (@mattdurham) - Add `openstack` config converter to convert OpenStack yaml config (static mode) to river config (flow mode). (@wildum) @@ -133,15 +136,15 @@ Main (unreleased) - Promtail converter will now treat `global positions configuration is not supported` as a Warning instead of Error. (@erikbaranowski) -- Add new `agent_component_dependencies_wait_seconds` histogram metric and a dashboard panel +- Add new `agent_component_dependencies_wait_seconds` histogram metric and a dashboard panel that measures how long components wait to be evaluated after their dependency is updated (@thampiotr) - Add additional endpoint to debug scrape configs generated inside `prometheus.operator.*` components (@captncraig) -- Components evaluation is now performed in parallel, reducing the impact of - slow components potentially blocking the entire telemetry pipeline. - The `agent_component_evaluation_seconds` metric now measures evaluation time - of each node separately, instead of all the directly and indirectly +- Components evaluation is now performed in parallel, reducing the impact of + slow components potentially blocking the entire telemetry pipeline. + The `agent_component_evaluation_seconds` metric now measures evaluation time + of each node separately, instead of all the directly and indirectly dependant nodes. (@thampiotr) ### Bugfixes diff --git a/component/all/all.go b/component/all/all.go index 5c222893282f..4d8d0fe8275d 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -30,6 +30,7 @@ import ( _ "github.com/grafana/agent/component/discovery/serverset" // Import discovery.serverset _ "github.com/grafana/agent/component/discovery/triton" // Import discovery.triton _ "github.com/grafana/agent/component/discovery/uyuni" // Import discovery.uyuni + _ "github.com/grafana/agent/component/faro/receiver" // Import faro.receiver _ "github.com/grafana/agent/component/local/file" // Import local.file _ "github.com/grafana/agent/component/local/file_match" // Import local.file_match _ "github.com/grafana/agent/component/loki/echo" // Import loki.echo diff --git a/component/faro/receiver/arguments.go b/component/faro/receiver/arguments.go new file mode 100644 index 000000000000..65fc6f29fb99 --- /dev/null +++ b/component/faro/receiver/arguments.go @@ -0,0 +1,93 @@ +package receiver + +import ( + "time" + + "github.com/alecthomas/units" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/river" + "github.com/grafana/river/rivertypes" +) + +// Defaults for various arguments. +var ( + DefaultArguments = Arguments{ + Server: DefaultServerArguments, + SourceMaps: DefaultSourceMapsArguments, + } + + DefaultServerArguments = ServerArguments{ + Host: "127.0.0.1", + Port: 12347, + RateLimiting: DefaultRateLimitingArguments, + MaxAllowedPayloadSize: 5 * units.MiB, + } + + DefaultRateLimitingArguments = RateLimitingArguments{ + Enabled: true, + Rate: 50, + BurstSize: 100, + } + + DefaultSourceMapsArguments = SourceMapsArguments{ + Download: true, + DownloadFromOrigins: []string{"*"}, + DownloadTimeout: time.Second, + } +) + +// Arguments configures the app_agent_receiver component. +type Arguments struct { + LogLabels map[string]string `river:"extra_log_labels,attr,optional"` + + Server ServerArguments `river:"server,block,optional"` + SourceMaps SourceMapsArguments `river:"sourcemaps,block,optional"` + Output OutputArguments `river:"output,block"` +} + +var _ river.Defaulter = (*Arguments)(nil) + +// SetToDefault applies default settings. +func (args *Arguments) SetToDefault() { *args = DefaultArguments } + +// ServerArguments configures the HTTP server where telemetry information will +// be sent from Faro clients. +type ServerArguments struct { + Host string `river:"listen_address,attr,optional"` + Port int `river:"listen_port,attr,optional"` + CORSAllowedOrigins []string `river:"cors_allowed_origins,attr,optional"` + APIKey rivertypes.Secret `river:"api_key,attr,optional"` + MaxAllowedPayloadSize units.Base2Bytes `river:"max_allowed_payload_size,attr,optional"` + + RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"` +} + +// RateLimitingArguments configures rate limiting for the HTTP server. +type RateLimitingArguments struct { + Enabled bool `river:"enabled,attr,optional"` + Rate float64 `river:"rate,attr,optional"` + BurstSize float64 `river:"burst_size,attr,optional"` +} + +// SourceMapsArguments configures how app_agent_receiver will retrieve source +// maps for transforming stack traces. +type SourceMapsArguments struct { + Download bool `river:"download,attr,optional"` + DownloadFromOrigins []string `river:"download_from_origins,attr,optional"` + DownloadTimeout time.Duration `river:"download_timeout,attr,optional"` + Locations []LocationArguments `river:"location,block,optional"` +} + +// LocationArguments specifies an individual location where source maps will be loaded. +type LocationArguments struct { + Path string `river:"path,attr"` + MinifiedPathPrefix string `river:"minified_path_prefix,attr"` +} + +// OutputArguments configures where to send emitted logs and traces. Metrics +// emitted by app_agent_receiver are exported as targets to be scraped. +type OutputArguments struct { + Logs []loki.LogsReceiver `river:"logs,attr,optional"` + Traces []otelcol.Consumer `river:"traces,attr,optional"` +} diff --git a/component/faro/receiver/exporters.go b/component/faro/receiver/exporters.go new file mode 100644 index 000000000000..731779372dde --- /dev/null +++ b/component/faro/receiver/exporters.go @@ -0,0 +1,247 @@ +package receiver + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/go-logfmt/logfmt" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +type exporter interface { + Name() string + Export(ctx context.Context, payload payload.Payload) error +} + +// +// Metrics +// + +type metricsExporter struct { + totalLogs prometheus.Counter + totalMeasurements prometheus.Counter + totalExceptions prometheus.Counter + totalEvents prometheus.Counter +} + +var _ exporter = (*metricsExporter)(nil) + +func newMetricsExporter(reg prometheus.Registerer) *metricsExporter { + exp := &metricsExporter{ + totalLogs: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "faro_receiver_logs_total", + Help: "Total number of ingested logs", + }), + totalMeasurements: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "faro_receiver_measurements_total", + Help: "Total number of ingested measurements", + }), + totalExceptions: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "faro_receiver_exceptions_total", + Help: "Total number of ingested exceptions", + }), + totalEvents: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "faro_receiver_events_total", + Help: "Total number of ingested events", + }), + } + + reg.MustRegister(exp.totalLogs, exp.totalExceptions, exp.totalMeasurements, exp.totalEvents) + + return exp +} + +func (exp *metricsExporter) Name() string { return "receiver metrics exporter" } + +func (exp *metricsExporter) Export(ctx context.Context, p payload.Payload) error { + exp.totalExceptions.Add(float64(len(p.Exceptions))) + exp.totalLogs.Add(float64(len(p.Logs))) + exp.totalMeasurements.Add(float64(len(p.Measurements))) + exp.totalEvents.Add(float64(len(p.Events))) + return nil +} + +// +// Logs +// + +type logsExporter struct { + log log.Logger + sourceMaps sourceMapsStore + + receiversMut sync.RWMutex + receivers []loki.LogsReceiver + + labelsMut sync.RWMutex + labels model.LabelSet +} + +var _ exporter = (*logsExporter)(nil) + +func newLogsExporter(log log.Logger, sourceMaps sourceMapsStore) *logsExporter { + return &logsExporter{ + log: log, + sourceMaps: sourceMaps, + } +} + +// SetReceivers updates the set of logs receivers which will receive logs +// emitted by the exporter. +func (exp *logsExporter) SetReceivers(receivers []loki.LogsReceiver) { + exp.receiversMut.Lock() + defer exp.receiversMut.Unlock() + + exp.receivers = receivers +} + +func (exp *logsExporter) Name() string { return "logs exporter" } + +func (exp *logsExporter) Export(ctx context.Context, p payload.Payload) error { + meta := p.Meta.KeyVal() + + var errs []error + + // log events + for _, logItem := range p.Logs { + kv := logItem.KeyVal() + payload.MergeKeyVal(kv, meta) + errs = append(errs, exp.sendKeyValsToLogsPipeline(ctx, kv)) + } + + // exceptions + for _, exception := range p.Exceptions { + transformedException := transformException(exp.log, exp.sourceMaps, &exception, p.Meta.App.Release) + kv := transformedException.KeyVal() + payload.MergeKeyVal(kv, meta) + errs = append(errs, exp.sendKeyValsToLogsPipeline(ctx, kv)) + } + + // measurements + for _, measurement := range p.Measurements { + kv := measurement.KeyVal() + payload.MergeKeyVal(kv, meta) + errs = append(errs, exp.sendKeyValsToLogsPipeline(ctx, kv)) + } + + // events + for _, event := range p.Events { + kv := event.KeyVal() + payload.MergeKeyVal(kv, meta) + errs = append(errs, exp.sendKeyValsToLogsPipeline(ctx, kv)) + } + + return errors.Join(errs...) +} + +func (exp *logsExporter) sendKeyValsToLogsPipeline(ctx context.Context, kv *payload.KeyVal) error { + // Grab the current value of exp.receivers so sendKeyValsToLogsPipeline + // doesn't block updating receivers. + exp.receiversMut.RLock() + var ( + receivers = exp.receivers + ) + exp.receiversMut.RUnlock() + + line, err := logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + if err != nil { + level.Error(exp.log).Log("msg", "failed to logfmt a frontend log event", "err", err) + return err + } + + ent := loki.Entry{ + Labels: exp.labelSet(), + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: string(line), + }, + } + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) // TODO(rfratto): potentially make this configurable + defer cancel() + + for _, receiver := range receivers { + select { + case <-ctx.Done(): + return err + case receiver.Chan() <- ent: + continue + } + } + + return nil +} + +func (exp *logsExporter) labelSet() model.LabelSet { + exp.labelsMut.RLock() + defer exp.labelsMut.RUnlock() + return exp.labels +} + +func (exp *logsExporter) SetLabels(newLabels map[string]string) { + exp.labelsMut.Lock() + defer exp.labelsMut.Unlock() + + ls := make(model.LabelSet, len(newLabels)) + for k, v := range newLabels { + ls[model.LabelName(k)] = model.LabelValue(v) + } + exp.labels = ls +} + +// +// Traces +// + +type tracesExporter struct { + log log.Logger + + mut sync.RWMutex + consumers []otelcol.Consumer +} + +var _ exporter = (*tracesExporter)(nil) + +func newTracesExporter(log log.Logger) *tracesExporter { + return &tracesExporter{ + log: log, + } +} + +// SetConsumers updates the set of OTLP consumers which will receive traces +// emitted by the exporter. +func (exp *tracesExporter) SetConsumers(consumers []otelcol.Consumer) { + exp.mut.Lock() + defer exp.mut.Unlock() + + exp.consumers = consumers +} + +func (exp *tracesExporter) Name() string { return "traces exporter" } + +func (exp *tracesExporter) Export(ctx context.Context, p payload.Payload) error { + if p.Traces == nil { + return nil + } + + var errs []error + for _, consumer := range exp.getTracesConsumers() { + errs = append(errs, consumer.ConsumeTraces(ctx, p.Traces.Traces)) + } + return errors.Join(errs...) +} + +func (exp *tracesExporter) getTracesConsumers() []otelcol.Consumer { + exp.mut.RLock() + defer exp.mut.RUnlock() + + return exp.consumers +} diff --git a/component/faro/receiver/exporters_test.go b/component/faro/receiver/exporters_test.go new file mode 100644 index 000000000000..84acf4aa27a8 --- /dev/null +++ b/component/faro/receiver/exporters_test.go @@ -0,0 +1,55 @@ +package receiver + +import ( + "context" + "strings" + "testing" + + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/prometheus/client_golang/prometheus" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +var metricNames = []string{ + "logs_total", + "measurements_total", + "exceptions_total", + "events_total", +} + +func Test_metricsExporter_Export(t *testing.T) { + var ( + reg = prometheus.NewRegistry() + exp = newMetricsExporter(reg) + ) + + expect := ` + # HELP faro_receiver_logs_total Total number of ingested logs + # TYPE faro_receiver_logs_total counter + faro_receiver_logs_total 2 + + # HELP faro_receiver_measurements_total Total number of ingested measurements + # TYPE faro_receiver_measurements_total counter + faro_receiver_measurements_total 3 + + # HELP faro_receiver_exceptions_total Total number of ingested exceptions + # TYPE faro_receiver_exceptions_total counter + faro_receiver_exceptions_total 4 + + # HELP faro_receiver_events_total Total number of ingested events + # TYPE faro_receiver_events_total counter + faro_receiver_events_total 5 + ` + + p := payload.Payload{ + Logs: make([]payload.Log, 2), + Measurements: make([]payload.Measurement, 3), + Exceptions: make([]payload.Exception, 4), + Events: make([]payload.Event, 5), + } + require.NoError(t, exp.Export(context.Background(), p)) + + err := promtestutil.CollectAndCompare(reg, strings.NewReader(expect), metricNames...) + require.NoError(t, err) +} diff --git a/component/faro/receiver/handler.go b/component/faro/receiver/handler.go new file mode 100644 index 000000000000..40aad51ef179 --- /dev/null +++ b/component/faro/receiver/handler.go @@ -0,0 +1,134 @@ +package receiver + +import ( + "crypto/subtle" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/prometheus/client_golang/prometheus" + "github.com/rs/cors" + "golang.org/x/time/rate" +) + +const apiKeyHeader = "x-api-key" + +type handler struct { + log log.Logger + rateLimiter *rate.Limiter + exporters []exporter + errorsTotal *prometheus.CounterVec + + argsMut sync.RWMutex + args ServerArguments + cors *cors.Cors +} + +var _ http.Handler = (*handler)(nil) + +func newHandler(l log.Logger, reg prometheus.Registerer, exporters []exporter) *handler { + errorsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "faro_receiver_exporter_errors_total", + Help: "Total number of errors produced by a receiver exporter", + }, []string{"exporter"}) + reg.MustRegister(errorsTotal) + + return &handler{ + log: l, + rateLimiter: rate.NewLimiter(rate.Inf, 0), + exporters: exporters, + errorsTotal: errorsTotal, + } +} + +func (h *handler) Update(args ServerArguments) { + h.argsMut.Lock() + defer h.argsMut.Unlock() + + h.args = args + + if args.RateLimiting.Enabled { + // Updating the rate limit to time.Now() would immediately fill the + // buckets. To allow requsts to immediately pass through, we adjust the + // time to set the limit/burst to to allow for both the normal rate and + // burst to be filled. + t := time.Now().Add(-time.Duration(float64(time.Second) * args.RateLimiting.Rate * args.RateLimiting.BurstSize)) + + h.rateLimiter.SetLimitAt(t, rate.Limit(args.RateLimiting.Rate)) + h.rateLimiter.SetBurstAt(t, int(args.RateLimiting.BurstSize)) + } else { + // Set to infinite rate limit. + h.rateLimiter.SetLimit(rate.Inf) + h.rateLimiter.SetBurst(0) // 0 burst is ignored when using rate.Inf. + } + + if len(args.CORSAllowedOrigins) > 0 { + h.cors = cors.New(cors.Options{ + AllowedOrigins: args.CORSAllowedOrigins, + AllowedHeaders: []string{apiKeyHeader, "content-type"}, + }) + } else { + h.cors = nil // Disable cors. + } +} + +func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + h.argsMut.RLock() + defer h.argsMut.RUnlock() + + if h.cors != nil { + h.cors.ServeHTTP(rw, req, h.handleRequest) + } else { + h.handleRequest(rw, req) + } +} + +func (h *handler) handleRequest(rw http.ResponseWriter, req *http.Request) { + if !h.rateLimiter.Allow() { + http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) + return + } + + // If an API key is configured, ensure the request has a matching key. + if len(h.args.APIKey) > 0 { + apiHeader := req.Header.Get(apiKeyHeader) + + if subtle.ConstantTimeCompare([]byte(apiHeader), []byte(h.args.APIKey)) != 1 { + http.Error(rw, "API key not provided or incorrect", http.StatusUnauthorized) + return + } + } + + // Validate content length. + if h.args.MaxAllowedPayloadSize > 0 && req.ContentLength > int64(h.args.MaxAllowedPayloadSize) { + http.Error(rw, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + var p payload.Payload + if err := json.NewDecoder(req.Body).Decode(&p); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + var wg sync.WaitGroup + for _, exp := range h.exporters { + wg.Add(1) + go func(exp exporter) { + defer wg.Done() + + if err := exp.Export(req.Context(), p); err != nil { + level.Error(h.log).Log("msg", "exporter failed with error", "exporter", exp.Name(), "err", err) + h.errorsTotal.WithLabelValues(exp.Name()).Inc() + } + }(exp) + } + wg.Wait() + + rw.WriteHeader(http.StatusAccepted) + _, _ = rw.Write([]byte("ok")) +} diff --git a/component/faro/receiver/handler_test.go b/component/faro/receiver/handler_test.go new file mode 100644 index 000000000000..28bc53159795 --- /dev/null +++ b/component/faro/receiver/handler_test.go @@ -0,0 +1,289 @@ +package receiver + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/alecthomas/units" + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const emptyPayload = `{ + "traces": { + "resourceSpans": [] + }, + "logs": [], + "exceptions": [], + "measurements": [], + "meta": {} +}` + +func TestMultipleExportersAllSucceed(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + require.Equal(t, http.StatusAccepted, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 1) + require.Len(t, exporter2.payloads, 1) +} + +func TestMultipleExportersOneFails(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", true, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + require.Equal(t, http.StatusAccepted, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 0) + require.Len(t, exporter2.payloads, 1) +} + +func TestMultipleExportersAllFail(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", true, nil} + exporter2 = &testExporter{"exporter2", true, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + require.Equal(t, http.StatusAccepted, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 0) + require.Len(t, exporter2.payloads, 0) +} + +func TestPayloadWithinLimit(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + MaxAllowedPayloadSize: units.Base2Bytes(len(emptyPayload)), + }) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + require.Equal(t, http.StatusAccepted, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 1) + require.Len(t, exporter2.payloads, 1) +} + +func TestPayloadTooLarge(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + MaxAllowedPayloadSize: units.Base2Bytes(len(emptyPayload) - 1), + }) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + require.Equal(t, http.StatusRequestEntityTooLarge, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 0) + require.Len(t, exporter2.payloads, 0) +} + +func TestMissingAPIKey(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + APIKey: "fakekey", + }) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + require.Equal(t, http.StatusUnauthorized, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 0) + require.Len(t, exporter2.payloads, 0) +} + +func TestInvalidAPIKey(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + APIKey: "fakekey", + }) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + req.Header.Set("x-api-key", "badkey") + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + require.Equal(t, http.StatusUnauthorized, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 0) + require.Len(t, exporter2.payloads, 0) +} + +func TestValidAPIKey(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + APIKey: "fakekey", + }) + + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + req.Header.Set("x-api-key", "fakekey") + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + require.Equal(t, http.StatusAccepted, rr.Result().StatusCode) + require.Len(t, exporter1.payloads, 1) + require.Len(t, exporter2.payloads, 1) +} + +func TestRateLimiter(t *testing.T) { + var ( + exporter1 = &testExporter{"exporter1", false, nil} + exporter2 = &testExporter{"exporter2", false, nil} + + h = newHandler( + util.TestLogger(t), + prometheus.NewRegistry(), + []exporter{exporter1, exporter2}, + ) + ) + + h.Update(ServerArguments{ + RateLimiting: RateLimitingArguments{ + Enabled: true, + Rate: 1, + BurstSize: 2, + }, + }) + + doRequest := func() *httptest.ResponseRecorder { + req, err := http.NewRequest(http.MethodPost, "/collect", strings.NewReader(emptyPayload)) + require.NoError(t, err) + + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + return rr + } + + reqs := make([]*httptest.ResponseRecorder, 5) + for i := range reqs { + reqs[i] = doRequest() + } + + // Only 1 request is allowed per second, with a burst of 2; meaning the third + // request and beyond should be rejected. + assert.Equal(t, http.StatusAccepted, reqs[0].Result().StatusCode) + assert.Equal(t, http.StatusAccepted, reqs[1].Result().StatusCode) + assert.Equal(t, http.StatusTooManyRequests, reqs[2].Result().StatusCode) + assert.Equal(t, http.StatusTooManyRequests, reqs[3].Result().StatusCode) + assert.Equal(t, http.StatusTooManyRequests, reqs[4].Result().StatusCode) +} + +type testExporter struct { + name string + broken bool + payloads []payload.Payload +} + +func (te *testExporter) Name() string { + return te.name +} + +func (te *testExporter) Export(ctx context.Context, payload payload.Payload) error { + if te.broken { + return errors.New("this exporter is broken") + } + te.payloads = append(te.payloads, payload) + return nil +} diff --git a/component/faro/receiver/internal/payload/payload.go b/component/faro/receiver/internal/payload/payload.go new file mode 100644 index 000000000000..9bc05f8c5bba --- /dev/null +++ b/component/faro/receiver/internal/payload/payload.go @@ -0,0 +1,418 @@ +package payload + +import ( + "fmt" + "sort" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/zeebo/xxh3" +) + +// Payload is the body of the receiver request +type Payload struct { + Exceptions []Exception `json:"exceptions,omitempty"` + Logs []Log `json:"logs,omitempty"` + Measurements []Measurement `json:"measurements,omitempty"` + Events []Event `json:"events,omitempty"` + Meta Meta `json:"meta,omitempty"` + Traces *Traces `json:"traces,omitempty"` +} + +// Frame struct represents a single stacktrace frame +type Frame struct { + Function string `json:"function,omitempty"` + Module string `json:"module,omitempty"` + Filename string `json:"filename,omitempty"` + Lineno int `json:"lineno,omitempty"` + Colno int `json:"colno,omitempty"` +} + +// String function converts a Frame into a human readable string +func (frame Frame) String() string { + module := "" + if len(frame.Module) > 0 { + module = frame.Module + "|" + } + return fmt.Sprintf("\n at %s (%s%s:%v:%v)", frame.Function, module, frame.Filename, frame.Lineno, frame.Colno) +} + +// Stacktrace is a collection of Frames +type Stacktrace struct { + Frames []Frame `json:"frames,omitempty"` +} + +// Exception struct controls all the data regarding an exception +type Exception struct { + Type string `json:"type,omitempty"` + Value string `json:"value,omitempty"` + Stacktrace *Stacktrace `json:"stacktrace,omitempty"` + Timestamp time.Time `json:"timestamp"` + Trace TraceContext `json:"trace,omitempty"` + Context ExceptionContext `json:"context,omitempty"` +} + +// Message string is concatenating of the Exception.Type and Exception.Value +func (e Exception) Message() string { + return fmt.Sprintf("%s: %s", e.Type, e.Value) +} + +// String is the string representation of an Exception +func (e Exception) String() string { + var stacktrace = e.Message() + if e.Stacktrace != nil { + for _, frame := range e.Stacktrace.Frames { + stacktrace += frame.String() + } + } + return stacktrace +} + +// KeyVal representation of the exception object +func (e Exception) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "timestamp", e.Timestamp.String()) + KeyValAdd(kv, "kind", "exception") + KeyValAdd(kv, "type", e.Type) + KeyValAdd(kv, "value", e.Value) + KeyValAdd(kv, "stacktrace", e.String()) + KeyValAdd(kv, "hash", strconv.FormatUint(xxh3.HashString(e.Value), 10)) + MergeKeyValWithPrefix(kv, KeyValFromMap(e.Context), "context_") + MergeKeyVal(kv, e.Trace.KeyVal()) + return kv +} + +// ExceptionContext is a string to string map structure that +// represents the context of an exception +type ExceptionContext map[string]string + +// TraceContext holds trace id and span id associated to an entity (log, exception, measurement...). +type TraceContext struct { + TraceID string `json:"trace_id"` + SpanID string `json:"span_id"` +} + +// KeyVal representation of the trace context object. +func (tc TraceContext) KeyVal() *KeyVal { + retv := NewKeyVal() + KeyValAdd(retv, "traceID", tc.TraceID) + KeyValAdd(retv, "spanID", tc.SpanID) + return retv +} + +// Traces wraps the otel traces model. +type Traces struct { + ptrace.Traces +} + +// UnmarshalJSON unmarshals Traces model. +func (t *Traces) UnmarshalJSON(b []byte) error { + unmarshaler := &ptrace.JSONUnmarshaler{} + td, err := unmarshaler.UnmarshalTraces(b) + if err != nil { + return err + } + *t = Traces{td} + return nil +} + +// MarshalJSON marshals Traces model to json. +func (t Traces) MarshalJSON() ([]byte, error) { + marshaler := &ptrace.JSONMarshaler{} + return marshaler.MarshalTraces(t.Traces) +} + +// SpanSlice unpacks Traces entity into a slice of Spans. +func (t Traces) SpanSlice() []ptrace.Span { + spans := make([]ptrace.Span, 0) + rss := t.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + rs := rss.At(i) + ilss := rs.ScopeSpans() + for j := 0; j < ilss.Len(); j++ { + s := ilss.At(j).Spans() + for si := 0; si < s.Len(); si++ { + spans = append(spans, s.At(si)) + } + } + } + return spans +} + +// SpanToKeyVal returns KeyVal representation of a Span. +func SpanToKeyVal(s ptrace.Span) *KeyVal { + kv := NewKeyVal() + if s.StartTimestamp() > 0 { + KeyValAdd(kv, "timestamp", s.StartTimestamp().AsTime().String()) + } + if s.EndTimestamp() > 0 { + KeyValAdd(kv, "end_timestamp", s.StartTimestamp().AsTime().String()) + } + KeyValAdd(kv, "kind", "span") + KeyValAdd(kv, "traceID", s.TraceID().String()) + KeyValAdd(kv, "spanID", s.SpanID().String()) + KeyValAdd(kv, "span_kind", s.Kind().String()) + KeyValAdd(kv, "name", s.Name()) + KeyValAdd(kv, "parent_spanID", s.ParentSpanID().String()) + s.Attributes().Range(func(k string, v pcommon.Value) bool { + KeyValAdd(kv, "attr_"+k, fmt.Sprintf("%v", v)) + return true + }) + + return kv +} + +// LogLevel is log level enum for incoming app logs +type LogLevel string + +const ( + // LogLevelTrace is "trace" + LogLevelTrace LogLevel = "trace" + // LogLevelDebug is "debug" + LogLevelDebug LogLevel = "debug" + // LogLevelInfo is "info" + LogLevelInfo LogLevel = "info" + // LogLevelWarning is "warning" + LogLevelWarning LogLevel = "warning" + // LogLevelError is "error" + LogLevelError LogLevel = "error" +) + +// LogContext is a string to string map structure that +// represents the context of a log message +type LogContext map[string]string + +// Log struct controls the data that come into a Log message +type Log struct { + Message string `json:"message,omitempty"` + LogLevel LogLevel `json:"level,omitempty"` + Context LogContext `json:"context,omitempty"` + Timestamp time.Time `json:"timestamp"` + Trace TraceContext `json:"trace,omitempty"` +} + +// KeyVal representation of a Log object +func (l Log) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "timestamp", l.Timestamp.String()) + KeyValAdd(kv, "kind", "log") + KeyValAdd(kv, "message", l.Message) + KeyValAdd(kv, "level", string(l.LogLevel)) + MergeKeyValWithPrefix(kv, KeyValFromMap(l.Context), "context_") + MergeKeyVal(kv, l.Trace.KeyVal()) + return kv +} + +// MeasurementContext is a string to string map structure that +// represents the context of a log message +type MeasurementContext map[string]string + +// Measurement holds the data for user provided measurements +type Measurement struct { + Values map[string]float64 `json:"values,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` + Trace TraceContext `json:"trace,omitempty"` + Context MeasurementContext `json:"context,omitempty"` +} + +// KeyVal representation of the exception object +func (m Measurement) KeyVal() *KeyVal { + kv := NewKeyVal() + + KeyValAdd(kv, "timestamp", m.Timestamp.String()) + KeyValAdd(kv, "kind", "measurement") + + keys := make([]string, 0, len(m.Values)) + for k := range m.Values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + KeyValAdd(kv, k, fmt.Sprintf("%f", m.Values[k])) + } + MergeKeyVal(kv, m.Trace.KeyVal()) + MergeKeyValWithPrefix(kv, KeyValFromMap(m.Context), "context_") + return kv +} + +// SDK holds metadata about the app agent that produced the event +type SDK struct { + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` + Integrations []SDKIntegration `json:"integrations,omitempty"` +} + +// KeyVal produces key->value representation of Sdk metadata +func (sdk SDK) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "name", sdk.Name) + KeyValAdd(kv, "version", sdk.Version) + + if len(sdk.Integrations) > 0 { + integrations := make([]string, len(sdk.Integrations)) + + for i, integration := range sdk.Integrations { + integrations[i] = integration.String() + } + + KeyValAdd(kv, "integrations", strings.Join(integrations, ",")) + } + + return kv +} + +// SDKIntegration holds metadata about a plugin/integration on the app agent that collected and sent the event +type SDKIntegration struct { + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` +} + +func (i SDKIntegration) String() string { + return fmt.Sprintf("%s:%s", i.Name, i.Version) +} + +// User holds metadata about the user related to an app event +type User struct { + Email string `json:"email,omitempty"` + ID string `json:"id,omitempty"` + Username string `json:"username,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +// KeyVal produces a key->value representation User metadata +func (u User) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "email", u.Email) + KeyValAdd(kv, "id", u.ID) + KeyValAdd(kv, "username", u.Username) + MergeKeyValWithPrefix(kv, KeyValFromMap(u.Attributes), "attr_") + return kv +} + +// Meta holds metadata about an app event +type Meta struct { + SDK SDK `json:"sdk,omitempty"` + App App `json:"app,omitempty"` + User User `json:"user,omitempty"` + Session Session `json:"session,omitempty"` + Page Page `json:"page,omitempty"` + Browser Browser `json:"browser,omitempty"` + View View `json:"view,omitempty"` +} + +// KeyVal produces key->value representation of the app event metadata +func (m Meta) KeyVal() *KeyVal { + kv := NewKeyVal() + MergeKeyValWithPrefix(kv, m.SDK.KeyVal(), "sdk_") + MergeKeyValWithPrefix(kv, m.App.KeyVal(), "app_") + MergeKeyValWithPrefix(kv, m.User.KeyVal(), "user_") + MergeKeyValWithPrefix(kv, m.Session.KeyVal(), "session_") + MergeKeyValWithPrefix(kv, m.Page.KeyVal(), "page_") + MergeKeyValWithPrefix(kv, m.Browser.KeyVal(), "browser_") + MergeKeyValWithPrefix(kv, m.View.KeyVal(), "view_") + return kv +} + +// Session holds metadata about the browser session the event originates from +type Session struct { + ID string `json:"id,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +// KeyVal produces key->value representation of the Session metadata +func (s Session) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "id", s.ID) + MergeKeyValWithPrefix(kv, KeyValFromMap(s.Attributes), "attr_") + return kv +} + +// Page holds metadata about the web page event originates from +type Page struct { + ID string `json:"id,omitempty"` + URL string `json:"url,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +// KeyVal produces key->val representation of Page metadata +func (p Page) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "id", p.ID) + KeyValAdd(kv, "url", p.URL) + MergeKeyValWithPrefix(kv, KeyValFromMap(p.Attributes), "attr_") + return kv +} + +// App holds metadata about the application event originates from +type App struct { + Name string `json:"name,omitempty"` + Release string `json:"release,omitempty"` + Version string `json:"version,omitempty"` + Environment string `json:"environment,omitempty"` +} + +// Event holds RUM event data +type Event struct { + Name string `json:"name"` + Domain string `json:"domain,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` + Trace TraceContext `json:"trace,omitempty"` +} + +// KeyVal produces key -> value representation of Event metadata +func (e Event) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "timestamp", e.Timestamp.String()) + KeyValAdd(kv, "kind", "event") + KeyValAdd(kv, "event_name", e.Name) + KeyValAdd(kv, "event_domain", e.Domain) + if e.Attributes != nil { + MergeKeyValWithPrefix(kv, KeyValFromMap(e.Attributes), "event_data_") + } + MergeKeyVal(kv, e.Trace.KeyVal()) + return kv +} + +// KeyVal produces key-> value representation of App metadata +func (a App) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "name", a.Name) + KeyValAdd(kv, "release", a.Release) + KeyValAdd(kv, "version", a.Version) + KeyValAdd(kv, "environment", a.Environment) + return kv +} + +// Browser holds metadata about a client's browser +type Browser struct { + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` + OS string `json:"os,omitempty"` + Mobile bool `json:"mobile,omitempty"` +} + +// KeyVal produces key->value representation of the Browser metadata +func (b Browser) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "name", b.Name) + KeyValAdd(kv, "version", b.Version) + KeyValAdd(kv, "os", b.OS) + KeyValAdd(kv, "mobile", fmt.Sprintf("%v", b.Mobile)) + return kv +} + +// View holds metadata about a view +type View struct { + Name string `json:"name,omitempty"` +} + +func (v View) KeyVal() *KeyVal { + kv := NewKeyVal() + KeyValAdd(kv, "name", v.Name) + return kv +} diff --git a/component/faro/receiver/internal/payload/payload_test.go b/component/faro/receiver/internal/payload/payload_test.go new file mode 100644 index 000000000000..b219e3d8ba8f --- /dev/null +++ b/component/faro/receiver/internal/payload/payload_test.go @@ -0,0 +1,141 @@ +package payload + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func loadTestData(t *testing.T, file string) []byte { + t.Helper() + // Safe to disable, this is a test. + // nolint:gosec + content, err := os.ReadFile(filepath.Join("../../testdata", file)) + require.NoError(t, err, "expected to be able to read file") + require.True(t, len(content) > 0) + return content +} + +func TestUnmarshalPayloadJSON(t *testing.T) { + content := loadTestData(t, "payload.json") + var payload Payload + err := json.Unmarshal(content, &payload) + require.NoError(t, err) + + now, err := time.Parse("2006-01-02T15:04:05Z0700", "2021-09-30T10:46:17.680Z") + require.NoError(t, err) + + require.Equal(t, Meta{ + SDK: SDK{ + Name: "grafana-frontend-agent", + Version: "1.0.0", + }, + App: App{ + Name: "testapp", + Release: "0.8.2", + Version: "abcdefg", + Environment: "production", + }, + User: User{ + Username: "domasx2", + ID: "123", + Email: "geralt@kaermorhen.org", + Attributes: map[string]string{"foo": "bar"}, + }, + Session: Session{ + ID: "abcd", + Attributes: map[string]string{"time_elapsed": "100s"}, + }, + Page: Page{ + URL: "https://example.com/page", + }, + Browser: Browser{ + Name: "chrome", + Version: "88.12.1", + OS: "linux", + Mobile: false, + }, + View: View{ + Name: "foobar", + }, + }, payload.Meta) + + require.Len(t, payload.Exceptions, 1) + require.Len(t, payload.Exceptions[0].Stacktrace.Frames, 26) + require.Equal(t, "Error", payload.Exceptions[0].Type) + require.Equal(t, "Cannot read property 'find' of undefined", payload.Exceptions[0].Value) + require.EqualValues(t, ExceptionContext{"ReactError": "Annoying Error", "component": "ReactErrorBoundary"}, payload.Exceptions[0].Context) + + require.Equal(t, []Log{ + { + Message: "opened pricing page", + LogLevel: LogLevelInfo, + Context: map[string]string{ + "component": "AppRoot", + "page": "Pricing", + }, + Timestamp: now, + Trace: TraceContext{ + TraceID: "abcd", + SpanID: "def", + }, + }, + { + Message: "loading price list", + LogLevel: LogLevelTrace, + Context: map[string]string{ + "component": "AppRoot", + "page": "Pricing", + }, + Timestamp: now, + Trace: TraceContext{ + TraceID: "abcd", + SpanID: "ghj", + }, + }, + }, payload.Logs) + + require.Equal(t, []Event{ + { + Name: "click_login_button", + Domain: "frontend", + Timestamp: now, + Attributes: map[string]string{ + "foo": "bar", + "one": "two", + }, + Trace: TraceContext{ + TraceID: "abcd", + SpanID: "def", + }, + }, + { + Name: "click_reset_password_button", + Timestamp: now, + }, + }, payload.Events) + + require.Len(t, payload.Measurements, 1) + + require.Equal(t, []Measurement{ + { + Values: map[string]float64{ + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14, + }, + Timestamp: now, + Trace: TraceContext{ + TraceID: "abcd", + SpanID: "def", + }, + Context: MeasurementContext{ + "hello": "world", + }, + }, + }, payload.Measurements) +} diff --git a/component/faro/receiver/internal/payload/utils.go b/component/faro/receiver/internal/payload/utils.go new file mode 100644 index 000000000000..dc6e58c90760 --- /dev/null +++ b/component/faro/receiver/internal/payload/utils.go @@ -0,0 +1,73 @@ +package payload + +import ( + "fmt" + "sort" + + om "github.com/wk8/go-ordered-map" +) + +// KeyVal is an ordered map of string to interface +type KeyVal = om.OrderedMap + +// NewKeyVal creates new empty KeyVal +func NewKeyVal() *KeyVal { + return om.New() +} + +// KeyValFromMap will instantiate KeyVal from a map[string]string +func KeyValFromMap(m map[string]string) *KeyVal { + kv := NewKeyVal() + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + KeyValAdd(kv, k, m[k]) + } + return kv +} + +// MergeKeyVal will merge source in target +func MergeKeyVal(target *KeyVal, source *KeyVal) { + for el := source.Oldest(); el != nil; el = el.Next() { + target.Set(el.Key, el.Value) + } +} + +// MergeKeyValWithPrefix will merge source in target, adding a prefix to each key being merged in +func MergeKeyValWithPrefix(target *KeyVal, source *KeyVal, prefix string) { + for el := source.Oldest(); el != nil; el = el.Next() { + target.Set(fmt.Sprintf("%s%s", prefix, el.Key), el.Value) + } +} + +// KeyValAdd adds a key + value string pair to kv +func KeyValAdd(kv *KeyVal, key string, value string) { + if len(value) > 0 { + kv.Set(key, value) + } +} + +// KeyValToInterfaceSlice converts KeyVal to []interface{}, typically used for logging +func KeyValToInterfaceSlice(kv *KeyVal) []interface{} { + slice := make([]interface{}, kv.Len()*2) + idx := 0 + for el := kv.Oldest(); el != nil; el = el.Next() { + slice[idx] = el.Key + idx++ + slice[idx] = el.Value + idx++ + } + return slice +} + +// KeyValToInterfaceMap converts KeyVal to map[string]interface +func KeyValToInterfaceMap(kv *KeyVal) map[string]interface{} { + retv := make(map[string]interface{}) + for el := kv.Oldest(); el != nil; el = el.Next() { + retv[fmt.Sprint(el.Key)] = el.Value + } + return retv +} diff --git a/component/faro/receiver/receiver.go b/component/faro/receiver/receiver.go new file mode 100644 index 000000000000..838d8827c5a8 --- /dev/null +++ b/component/faro/receiver/receiver.go @@ -0,0 +1,232 @@ +package receiver + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/go-sourcemap/sourcemap" + "github.com/grafana/agent/component" +) + +func init() { + component.Register(component.Registration{ + Name: "faro.receiver", + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Component struct { + log log.Logger + handler *handler + lazySourceMaps *varSourceMapsStore + sourceMapsMetrics *sourceMapMetrics + serverMetrics *serverMetrics + + argsMut sync.RWMutex + args Arguments + + metrics *metricsExporter + logs *logsExporter + traces *tracesExporter + + actorCh chan func(context.Context) + + healthMut sync.RWMutex + health component.Health +} + +var _ component.HealthComponent = (*Component)(nil) + +func New(o component.Options, args Arguments) (*Component, error) { + var ( + // The source maps store changes at runtime based on settings, so we create + // a lazy store to pass to the logs exporter. + varStore = &varSourceMapsStore{} + + metrics = newMetricsExporter(o.Registerer) + logs = newLogsExporter(log.With(o.Logger, "exporter", "logs"), varStore) + traces = newTracesExporter(log.With(o.Logger, "exporter", "traces")) + ) + + c := &Component{ + log: o.Logger, + handler: newHandler( + log.With(o.Logger, "subcomponent", "handler"), + o.Registerer, + []exporter{metrics, logs, traces}, + ), + lazySourceMaps: varStore, + sourceMapsMetrics: newSourceMapMetrics(o.Registerer), + serverMetrics: newServerMetrics(o.Registerer), + + metrics: metrics, + logs: logs, + traces: traces, + + actorCh: make(chan func(context.Context), 1), + } + + if err := c.Update(args); err != nil { + return nil, err + } + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + var wg sync.WaitGroup + defer wg.Wait() + + var ( + cancelCurrentActor context.CancelFunc + ) + defer func() { + if cancelCurrentActor != nil { + cancelCurrentActor() + } + }() + + for { + select { + case <-ctx.Done(): + return nil + + case newActor := <-c.actorCh: + // Terminate old actor (if any), and wait for it to return. + if cancelCurrentActor != nil { + cancelCurrentActor() + wg.Wait() + } + + // Run the new actor. + actorCtx, actorCancel := context.WithCancel(ctx) + cancelCurrentActor = actorCancel + + wg.Add(1) + go func() { + defer wg.Done() + newActor(actorCtx) + }() + } + } +} + +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + + c.argsMut.Lock() + c.args = newArgs + c.argsMut.Unlock() + + c.logs.SetLabels(newArgs.LogLabels) + + c.handler.Update(newArgs.Server) + + c.lazySourceMaps.SetInner(newSourceMapsStore( + log.With(c.log, "subcomponent", "handler"), + newArgs.SourceMaps, + c.sourceMapsMetrics, + nil, // Use default HTTP client. + nil, // Use default FS implementation. + )) + + c.logs.SetReceivers(newArgs.Output.Logs) + c.traces.SetConsumers(newArgs.Output.Traces) + + // Create a new server actor to run. + makeNewServer := func(ctx context.Context) { + // NOTE(rfratto): we don't use newArgs here, since it's not guaranteed that + // our actor runs (we may be skipped for an existing scheduled function). + // Instead, we load the most recent args. + + c.argsMut.RLock() + var ( + args = c.args + ) + c.argsMut.RUnlock() + + srv := newServer( + log.With(c.log, "subcomponent", "server"), + args.Server, + c.serverMetrics, + c.handler, + ) + + // Reset health status. + c.setServerHealth(nil) + + err := srv.Run(ctx) + if err != nil { + level.Error(c.log).Log("msg", "server exited with error", "err", err) + c.setServerHealth(err) + } + } + + select { + case c.actorCh <- makeNewServer: + // Actor has been scheduled to run. + default: + // An actor is already scheduled to run. Don't do anything. + } + + return nil +} + +func (c *Component) setServerHealth(err error) { + c.healthMut.Lock() + defer c.healthMut.Unlock() + + if err == nil { + c.health = component.Health{ + Health: component.HealthTypeHealthy, + Message: "component is ready to receive telemetry over the network", + UpdateTime: time.Now(), + } + } else { + c.health = component.Health{ + Health: component.HealthTypeUnhealthy, + Message: fmt.Sprintf("server has terminated: %s", err), + UpdateTime: time.Now(), + } + } +} + +// CurrentHealth implements component.HealthComponent. It returns an unhealthy +// status if the server has terminated. +func (c *Component) CurrentHealth() component.Health { + c.healthMut.RLock() + defer c.healthMut.RUnlock() + return c.health +} + +type varSourceMapsStore struct { + mut sync.RWMutex + inner sourceMapsStore +} + +var _ sourceMapsStore = (*varSourceMapsStore)(nil) + +func (vs *varSourceMapsStore) GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) { + vs.mut.RLock() + defer vs.mut.RUnlock() + + if vs.inner != nil { + return vs.inner.GetSourceMap(sourceURL, release) + } + + return nil, fmt.Errorf("no sourcemap available") +} + +func (vs *varSourceMapsStore) SetInner(inner sourceMapsStore) { + vs.mut.Lock() + defer vs.mut.Unlock() + + vs.inner = inner +} diff --git a/component/faro/receiver/receiver_test.go b/component/faro/receiver/receiver_test.go new file mode 100644 index 000000000000..45ffa25cfa35 --- /dev/null +++ b/component/faro/receiver/receiver_test.go @@ -0,0 +1,152 @@ +package receiver + +import ( + "fmt" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/grafana/loki/pkg/logproto" + "github.com/phayes/freeport" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +// Test performs an end-to-end test of the component. +func Test(t *testing.T) { + ctx := componenttest.TestContext(t) + + ctrl, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "faro.receiver", + ) + require.NoError(t, err) + + freePort, err := freeport.GetFreePort() + require.NoError(t, err) + + lr := newFakeLogsReceiver(t) + + go func() { + err := ctrl.Run(ctx, Arguments{ + LogLabels: map[string]string{ + "foo": "bar", + }, + + Server: ServerArguments{ + Host: "127.0.0.1", + Port: freePort, + }, + + Output: OutputArguments{ + Logs: []loki.LogsReceiver{lr}, + Traces: []otelcol.Consumer{}, + }, + }) + require.NoError(t, err) + }() + + // Wait for the server to be running. + util.Eventually(t, func(t require.TestingT) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", freePort)) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + }) + + // Send a sample payload to the server. + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/collect", freePort), + "application/json", + strings.NewReader(`{ + "traces": { + "resourceSpans": [] + }, + "logs": [{ + "message": "hello, world", + "level": "info", + "context": {"env": "dev"}, + "timestamp": "2021-01-01T00:00:00Z", + "trace": { + "trace_id": "0", + "span_id": "0" + } + }], + "exceptions": [], + "measurements": [], + "meta": {} + }`), + ) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusAccepted, resp.StatusCode) + require.Len(t, lr.GetEntries(), 1) + + expect := loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-01-01 00:00:00 +0000 UTC" kind=log message="hello, world" level=info context_env=dev traceID=0 spanID=0 browser_mobile=false`, + }, + } + require.Equal(t, expect, lr.entries[0]) +} + +type fakeLogsReceiver struct { + ch chan loki.Entry + + entriesMut sync.RWMutex + entries []loki.Entry +} + +var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil) + +func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { + ctx := componenttest.TestContext(t) + + lr := &fakeLogsReceiver{ + ch: make(chan loki.Entry, 1), + } + + go func() { + defer close(lr.ch) + + select { + case <-ctx.Done(): + return + case ent := <-lr.Chan(): + + lr.entriesMut.Lock() + lr.entries = append(lr.entries, loki.Entry{ + Labels: ent.Labels, + Entry: logproto.Entry{ + Timestamp: time.Time{}, // Use consistent time for testing. + Line: ent.Line, + StructuredMetadata: ent.StructuredMetadata, + }, + }) + lr.entriesMut.Unlock() + } + }() + + return lr +} + +func (lr *fakeLogsReceiver) Chan() chan loki.Entry { + return lr.ch +} + +func (lr *fakeLogsReceiver) GetEntries() []loki.Entry { + lr.entriesMut.RLock() + defer lr.entriesMut.RUnlock() + return lr.entries +} diff --git a/component/faro/receiver/server.go b/component/faro/receiver/server.go new file mode 100644 index 000000000000..16d756a780bc --- /dev/null +++ b/component/faro/receiver/server.go @@ -0,0 +1,117 @@ +package receiver + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gorilla/mux" + "github.com/grafana/dskit/instrument" + "github.com/grafana/dskit/middleware" + "github.com/prometheus/client_golang/prometheus" +) + +type serverMetrics struct { + requestDuration *prometheus.HistogramVec + rxMessageSize *prometheus.HistogramVec + txMessageSize *prometheus.HistogramVec + inflightRequests *prometheus.GaugeVec +} + +func newServerMetrics(reg prometheus.Registerer) *serverMetrics { + m := &serverMetrics{ + requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "faro_receiver_request_duration_seconds", + Help: "Time (in seconds) spent serving HTTP requests.", + Buckets: instrument.DefBuckets, + }, []string{"method", "route", "status_code", "ws"}), + + rxMessageSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "faro_receiver_request_message_bytes", + Help: "Size (in bytes) of messages received in the request.", + Buckets: middleware.BodySizeBuckets, + }, []string{"method", "route"}), + + txMessageSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "faro_receiver_response_message_bytes", + Help: "Size (in bytes) of messages sent in response.", + Buckets: middleware.BodySizeBuckets, + }, []string{"method", "route"}), + + inflightRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "faro_receiver_inflight_requests", + Help: "Current number of inflight requests.", + }, []string{"method", "route"}), + } + reg.MustRegister(m.requestDuration, m.rxMessageSize, m.txMessageSize, m.inflightRequests) + + return m +} + +// server represents the HTTP server for which the receiver receives metrics. +// server is not dynamically updatable. To update server, shut down the old +// server and start a new one. +type server struct { + log log.Logger + args ServerArguments + handler http.Handler + metrics *serverMetrics +} + +func newServer(l log.Logger, args ServerArguments, metrics *serverMetrics, h http.Handler) *server { + return &server{ + log: l, + args: args, + handler: h, + metrics: metrics, + } +} + +func (s *server) Run(ctx context.Context) error { + r := mux.NewRouter() + r.Handle("/collect", s.handler).Methods(http.MethodPost, http.MethodOptions) + + r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ready")) + }) + + mw := middleware.Instrument{ + RouteMatcher: r, + Duration: s.metrics.requestDuration, + RequestBodySize: s.metrics.rxMessageSize, + ResponseBodySize: s.metrics.txMessageSize, + InflightRequests: s.metrics.inflightRequests, + } + + srv := &http.Server{ + Addr: fmt.Sprintf("%s:%d", s.args.Host, s.args.Port), + Handler: mw.Wrap(r), + } + + errCh := make(chan error, 1) + go func() { + level.Info(s.log).Log("msg", "starting server", "addr", srv.Addr) + errCh <- srv.ListenAndServe() + }() + + select { + case <-ctx.Done(): + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + level.Info(s.log).Log("msg", "terminating server") + + if err := srv.Shutdown(ctx); err != nil { + level.Error(s.log).Log("msg", "failed to gracefully terminate server", "err", err) + } + + case err := <-errCh: + return err + } + + return nil +} diff --git a/component/faro/receiver/sourcemaps.go b/component/faro/receiver/sourcemaps.go new file mode 100644 index 000000000000..49476c7efa4d --- /dev/null +++ b/component/faro/receiver/sourcemaps.go @@ -0,0 +1,374 @@ +package receiver + +import ( + "bytes" + "fmt" + "io" + "io/fs" + "net/http" + "net/url" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + "text/template" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/go-sourcemap/sourcemap" + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/minio/pkg/wildcard" + "github.com/prometheus/client_golang/prometheus" + "github.com/vincent-petithory/dataurl" +) + +// sourceMapsStore is an interface for a sourcemap service capable of +// transforming minified source locations to the original source location. +type sourceMapsStore interface { + GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) +} + +// Stub interfaces for easier mocking. +type ( + httpClient interface { + Get(url string) (*http.Response, error) + } + + fileService interface { + Stat(name string) (fs.FileInfo, error) + ReadFile(name string) ([]byte, error) + } +) + +type osFileService struct{} + +func (fs osFileService) Stat(name string) (fs.FileInfo, error) { return os.Stat(name) } +func (fs osFileService) ReadFile(name string) ([]byte, error) { return os.ReadFile(name) } + +type sourceMapMetrics struct { + cacheSize *prometheus.CounterVec + downloads *prometheus.CounterVec + fileReads *prometheus.CounterVec +} + +func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics { + m := &sourceMapMetrics{ + cacheSize: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "faro_receiver_sourcemap_cache_size", + Help: "number of items in source map cache, per origin", + }, []string{"origin"}), + downloads: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "faro_receiver_sourcemap_downloads_total", + Help: "downloads by the source map service", + }, []string{"origin", "http_status"}), + fileReads: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "faro_receiver_sourcemap_file_reads_total", + Help: "source map file reads from file system, by origin and status", + }, []string{"origin", "status"}), + } + + reg.MustRegister(m.cacheSize, m.downloads, m.fileReads) + return m +} + +type sourcemapFileLocation struct { + LocationArguments + pathTemplate *template.Template +} + +type sourceMapsStoreImpl struct { + log log.Logger + cli httpClient + fs fileService + args SourceMapsArguments + metrics *sourceMapMetrics + locs []*sourcemapFileLocation + + cacheMut sync.Mutex + cache map[string]*sourcemap.Consumer +} + +// newSourceMapStore creates an implementation of sourceMapsStore. The returned +// implementation is not dynamically updatable; create a new sourceMapsStore +// implementation if arguments change. +func newSourceMapsStore(log log.Logger, args SourceMapsArguments, metrics *sourceMapMetrics, cli httpClient, fs fileService) *sourceMapsStoreImpl { + // TODO(rfratto): it would be nice for this to be dynamically updatable, but + // that will require swapping out the http client (when the timeout changes) + // or to find a way to inject a download timeout without modifying the http + // client. + + if cli == nil { + cli = &http.Client{Timeout: args.DownloadTimeout} + } + if fs == nil { + fs = osFileService{} + } + + locs := []*sourcemapFileLocation{} + for _, loc := range args.Locations { + tpl, err := template.New(loc.Path).Parse(loc.Path) + if err != nil { + panic(err) // TODO(rfratto): why is this set to panic? + } + + locs = append(locs, &sourcemapFileLocation{ + LocationArguments: loc, + pathTemplate: tpl, + }) + } + + return &sourceMapsStoreImpl{ + log: log, + cli: cli, + fs: fs, + args: args, + cache: make(map[string]*sourcemap.Consumer), + metrics: metrics, + locs: locs, + } +} + +func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) { + // TODO(rfratto): GetSourceMap is weak to transient errors, since it always + // caches the result, even when there's an error. This means that transient + // errors will be cached forever, preventing source maps from being retrieved. + + store.cacheMut.Lock() + defer store.cacheMut.Unlock() + + cacheKey := fmt.Sprintf("%s__%s", sourceURL, release) + if sm, ok := store.cache[cacheKey]; ok { + return sm, nil + } + + content, sourceMapURL, err := store.getSourceMapContent(sourceURL, release) + if err != nil || content == nil { + store.cache[cacheKey] = nil + return nil, err + } + if content != nil { + consumer, err := sourcemap.Parse(sourceMapURL, content) + if err != nil { + store.cache[cacheKey] = nil + level.Debug(store.log).Log("msg", "failed to parse source map", "url", sourceMapURL, "release", release, "err", err) + return nil, err + } + level.Info(store.log).Log("msg", "successfully parsed source map", "url", sourceMapURL, "release", release) + store.cache[cacheKey] = consumer + store.metrics.cacheSize.WithLabelValues(getOrigin(sourceURL)).Inc() + return consumer, nil + } + + return nil, nil +} + +func (store *sourceMapsStoreImpl) getSourceMapContent(sourceURL string, release string) (content []byte, sourceMapURL string, err error) { + // Attempt to find the source map in the filesystem first. + for _, loc := range store.locs { + content, sourceMapURL, err = store.getSourceMapFromFileSystem(sourceURL, release, loc) + if content != nil || err != nil { + return content, sourceMapURL, err + } + } + + // Attempt to download the sourcemap. + // + // TODO(rfratto): check if downloading is enabled. + if strings.HasPrefix(sourceURL, "http") && urlMatchesOrigins(sourceURL, store.args.DownloadFromOrigins) { + return store.downloadSourceMapContent(sourceURL) + } + return nil, "", nil +} + +func (store *sourceMapsStoreImpl) getSourceMapFromFileSystem(sourceURL string, release string, loc *sourcemapFileLocation) (content []byte, sourceMapURL string, err error) { + if len(sourceURL) == 0 || !strings.HasPrefix(sourceURL, loc.MinifiedPathPrefix) || strings.HasSuffix(sourceURL, "/") { + return nil, "", nil + } + + var rootPath bytes.Buffer + + err = loc.pathTemplate.Execute(&rootPath, struct{ Release string }{Release: cleanFilePathPart(release)}) + if err != nil { + return nil, "", err + } + + pathParts := []string{rootPath.String()} + for _, part := range strings.Split(strings.TrimPrefix(strings.Split(sourceURL, "?")[0], loc.MinifiedPathPrefix), "/") { + if len(part) > 0 && part != "." && part != ".." { + pathParts = append(pathParts, part) + } + } + mapFilePath := filepath.Join(pathParts...) + ".map" + + if _, err := store.fs.Stat(mapFilePath); err != nil { + store.metrics.fileReads.WithLabelValues(getOrigin(sourceURL), "not_found").Inc() + level.Debug(store.log).Log("msg", "source map not found on filesystem", "url", sourceURL, "file_path", mapFilePath) + return nil, "", nil + } + level.Debug(store.log).Log("msg", "source map found on filesystem", "url", mapFilePath, "file_path", mapFilePath) + + content, err = store.fs.ReadFile(mapFilePath) + if err != nil { + store.metrics.fileReads.WithLabelValues(getOrigin(sourceURL), "error").Inc() + } else { + store.metrics.fileReads.WithLabelValues(getOrigin(sourceURL), "ok").Inc() + } + + return content, sourceURL, err +} + +func (store *sourceMapsStoreImpl) downloadSourceMapContent(sourceURL string) (content []byte, resolvedSourceMapURL string, err error) { + level.Debug(store.log).Log("msg", "attempting to download source file", "url", sourceURL) + + result, err := store.downloadFileContents(sourceURL) + if err != nil { + level.Debug(store.log).Log("msg", "failed to download source file", "url", sourceURL, "err", err) + return nil, "", err + } + + match := reSourceMap.FindAllStringSubmatch(string(result), -1) + if len(match) == 0 { + level.Debug(store.log).Log("msg", "no source map url found in source", "url", sourceURL) + return nil, "", nil + } + sourceMapURL := match[len(match)-1][2] + + // Inline sourcemap + if strings.HasPrefix(sourceMapURL, "data:") { + dataURL, err := dataurl.DecodeString(sourceMapURL) + if err != nil { + level.Debug(store.log).Log("msg", "failed to parse inline source map data url", "url", sourceURL, "err", err) + return nil, "", err + } + + level.Info(store.log).Log("msg", "successfully parsed inline source map data url", "url", sourceURL) + return dataURL.Data, sourceURL + ".map", nil + } + // Remote sourcemap + resolvedSourceMapURL = sourceMapURL + + // If the URL is relative, we need to attempt to resolve the absolute URL. + if !strings.HasPrefix(resolvedSourceMapURL, "http") { + base, err := url.Parse(sourceURL) + if err != nil { + level.Debug(store.log).Log("msg", "failed to parse source URL", "url", sourceURL, "err", err) + return nil, "", err + } + relative, err := url.Parse(sourceMapURL) + if err != nil { + level.Debug(store.log).Log("msg", "failed to parse source map URL", "url", sourceURL, "sourceMapURL", sourceMapURL, "err", err) + return nil, "", err + } + + resolvedSourceMapURL = base.ResolveReference(relative).String() + level.Debug(store.log).Log("msg", "resolved absolute source map URL", "url", sourceURL, "sourceMapURL", sourceMapURL) + } + + level.Debug(store.log).Log("msg", "attempting to download source map file", "url", resolvedSourceMapURL) + result, err = store.downloadFileContents(resolvedSourceMapURL) + if err != nil { + level.Debug(store.log).Log("msg", "failed to download source map file", "url", resolvedSourceMapURL, "err", err) + return nil, "", err + } + + return result, resolvedSourceMapURL, nil +} + +func (store *sourceMapsStoreImpl) downloadFileContents(url string) ([]byte, error) { + resp, err := store.cli.Get(url) + if err != nil { + store.metrics.downloads.WithLabelValues(getOrigin(url), "?").Inc() + return nil, err + } + defer resp.Body.Close() + + store.metrics.downloads.WithLabelValues(getOrigin(url), fmt.Sprint(resp.StatusCode)).Inc() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %v", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return body, nil +} + +var reSourceMap = regexp.MustCompile("//[#@]\\s(source(?:Mapping)?URL)=\\s*(?P\\S+)\r?\n?$") + +func getOrigin(URL string) string { + // TODO(rfratto): why are we parsing this every time? Let's parse it once. + + parsed, err := url.Parse(URL) + if err != nil { + return "?" // TODO(rfratto): should invalid URLs be permitted? + } + return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host) +} + +// urlMatchesOrigins returns true if URL matches at least one of origin prefix. Wildcard '*' and '?' supported +func urlMatchesOrigins(URL string, origins []string) bool { + for _, origin := range origins { + if origin == "*" || wildcard.Match(origin+"*", URL) { + return true + } + } + return false +} + +func cleanFilePathPart(x string) string { + return strings.TrimLeft(strings.ReplaceAll(strings.ReplaceAll(x, "\\", ""), "/", ""), ".") +} + +func transformException(log log.Logger, store sourceMapsStore, ex *payload.Exception, release string) *payload.Exception { + if ex.Stacktrace == nil { + return ex + } + + var frames []payload.Frame + for _, frame := range ex.Stacktrace.Frames { + mappedFrame, err := resolveSourceLocation(store, &frame, release) + if err != nil { + level.Error(log).Log("msg", "Error resolving stack trace frame source location", "err", err) + frames = append(frames, frame) + } else if mappedFrame != nil { + frames = append(frames, *mappedFrame) + } else { + frames = append(frames, frame) + } + } + + return &payload.Exception{ + Type: ex.Type, + Value: ex.Value, + Stacktrace: &payload.Stacktrace{Frames: frames}, + Timestamp: ex.Timestamp, + } +} + +func resolveSourceLocation(store sourceMapsStore, frame *payload.Frame, release string) (*payload.Frame, error) { + smap, err := store.GetSourceMap(frame.Filename, release) + if err != nil { + return nil, err + } + if smap == nil { + return nil, nil + } + + file, function, line, col, ok := smap.Source(frame.Lineno, frame.Colno) + if !ok { + return nil, nil + } + // unfortunately in many cases go-sourcemap fails to determine the original function name. + // not a big issue as long as file, line and column are correct + if len(function) == 0 { + function = "?" + } + return &payload.Frame{ + Filename: file, + Lineno: line, + Colno: col, + Function: function, + }, nil +} diff --git a/component/faro/receiver/sourcemaps_test.go b/component/faro/receiver/sourcemaps_test.go new file mode 100644 index 000000000000..63b1dedee179 --- /dev/null +++ b/component/faro/receiver/sourcemaps_test.go @@ -0,0 +1,528 @@ +package receiver + +import ( + "bytes" + "errors" + "io" + "io/fs" + "net/http" + "os" + "path/filepath" + "testing" + + "github.com/grafana/agent/component/faro/receiver/internal/payload" + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_sourceMapsStoreImpl_DownloadSuccess(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{ + responses: []struct { + *http.Response + error + }{ + {newResponseFromTestData(t, "foo.js"), nil}, + {newResponseFromTestData(t, "foo.js.map"), nil}, + }, + } + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: true, + DownloadFromOrigins: []string{"*"}, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + &mockFileService{}, + ) + ) + + expect := &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 37, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 6, + }, + { + Colno: 2, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 7, + }, + }, + }, + } + + actual := transformException(logger, store, mockException(), "123") + require.Equal(t, []string{"http://localhost:1234/foo.js", "http://localhost:1234/foo.js.map"}, httpClient.requests) + require.Equal(t, expect, actual) +} + +func Test_sourceMapsStoreImpl_DownloadError(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{ + responses: []struct { + *http.Response + error + }{ + { + &http.Response{StatusCode: 500, Body: io.NopCloser(bytes.NewReader(nil))}, + nil, + }, + }, + } + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: true, + DownloadFromOrigins: []string{"*"}, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + &mockFileService{}, + ) + ) + + expect := mockException() + actual := transformException(logger, store, expect, "123") + require.Equal(t, []string{"http://localhost:1234/foo.js"}, httpClient.requests) + require.Equal(t, expect, actual) +} + +func Test_sourceMapsStoreImpl_DownloadHTTPOriginFiltering(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{ + responses: []struct { + *http.Response + error + }{ + {newResponseFromTestData(t, "foo.js"), nil}, + {newResponseFromTestData(t, "foo.js.map"), nil}, + }, + } + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: true, + DownloadFromOrigins: []string{"http://bar.com/"}, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + &mockFileService{}, + ) + ) + + expect := &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://foo.com/foo.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 2, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 7, + }, + }, + }, + } + + actual := transformException(logger, store, &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://foo.com/foo.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 5, + Filename: "http://bar.com/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + }, + }, + }, "123") + + require.Equal(t, []string{"http://bar.com/foo.js", "http://bar.com/foo.js.map"}, httpClient.requests) + require.Equal(t, expect, actual) +} + +func Test_sourceMapsStoreImpl_ReadFromFileSystem(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{} + + fileService = &mockFileService{ + files: map[string][]byte{ + filepath.FromSlash("/var/build/latest/foo.js.map"): loadTestData(t, "foo.js.map"), + filepath.FromSlash("/var/build/123/foo.js.map"): loadTestData(t, "foo.js.map"), + }, + } + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: false, + Locations: []LocationArguments{ + { + MinifiedPathPrefix: "http://foo.com/", + Path: filepath.FromSlash("/var/build/latest/"), + }, + { + MinifiedPathPrefix: "http://bar.com/", + Path: filepath.FromSlash("/var/build/{{ .Release }}/"), + }, + }, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + fileService, + ) + ) + + expect := &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 37, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 6, + }, + { + Colno: 6, + Filename: "http://foo.com/bar.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 2, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 7, + }, + { + Colno: 5, + Filename: "http://baz.com/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + }, + }, + } + + actual := transformException(logger, store, &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://foo.com/foo.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 6, + Filename: "http://foo.com/bar.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 5, + Filename: "http://bar.com/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + { + Colno: 5, + Filename: "http://baz.com/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + }, + }, + }, "123") + + require.Equal(t, expect, actual) +} + +func Test_sourceMapsStoreImpl_ReadFromFileSystemAndDownload(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{ + responses: []struct { + *http.Response + error + }{ + {newResponseFromTestData(t, "foo.js"), nil}, + {newResponseFromTestData(t, "foo.js.map"), nil}, + }, + } + + fileService = &mockFileService{ + files: map[string][]byte{ + filepath.FromSlash("/var/build/latest/foo.js.map"): loadTestData(t, "foo.js.map"), + }, + } + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: true, + DownloadFromOrigins: []string{"*"}, + Locations: []LocationArguments{ + { + MinifiedPathPrefix: "http://foo.com/", + Path: filepath.FromSlash("/var/build/latest/"), + }, + }, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + fileService, + ) + ) + + expect := &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 37, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 6, + }, + { + Colno: 2, + Filename: "/__parcel_source_root/demo/src/actions.ts", + Function: "?", + Lineno: 7, + }, + }, + }, + } + + actual := transformException(logger, store, &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://foo.com/foo.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 5, + Filename: "http://bar.com/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + }, + }, + }, "123") + + require.Equal(t, []string{filepath.FromSlash("/var/build/latest/foo.js.map")}, fileService.stats) + require.Equal(t, []string{filepath.FromSlash("/var/build/latest/foo.js.map")}, fileService.reads) + require.Equal(t, []string{"http://bar.com/foo.js", "http://bar.com/foo.js.map"}, httpClient.requests) + require.Equal(t, expect, actual) +} + +func Test_sourceMapsStoreImpl_FilepathSanitized(t *testing.T) { + var ( + logger = util.TestLogger(t) + + httpClient = &mockHTTPClient{} + fileService = &mockFileService{} + + store = newSourceMapsStore( + logger, + SourceMapsArguments{ + Download: false, + Locations: []LocationArguments{ + { + MinifiedPathPrefix: "http://foo.com/", + Path: filepath.FromSlash("/var/build/latest/"), + }, + }, + }, + newSourceMapMetrics(prometheus.NewRegistry()), + httpClient, + fileService, + ) + ) + + input := &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://foo.com/../../../etc/passwd", + Function: "eval", + Lineno: 5, + }, + }, + }, + } + + actual := transformException(logger, store, input, "123") + require.Equal(t, input, actual) +} + +func Test_urlMatchesOrigins(t *testing.T) { + tt := []struct { + name string + url string + origins []string + shouldMatch bool + }{ + { + name: "wildcard always matches", + url: "https://example.com/static/foo.js", + origins: []string{"https://foo.com/", "*"}, + shouldMatch: true, + }, + { + name: "exact matches", + url: "http://example.com/static/foo.js", + origins: []string{"https://foo.com/", "http://example.com/"}, + shouldMatch: true, + }, + { + name: "matches with subdomain wildcard", + url: "http://foo.bar.com/static/foo.js", + origins: []string{"https://foo.com/", "http://*.bar.com/"}, + shouldMatch: true, + }, + { + name: "no exact match", + url: "http://example.com/static/foo.js", + origins: []string{"https://foo.com/", "http://test.com/"}, + shouldMatch: false, + }, + { + name: "no exact match with subdomain wildcard", + url: "http://foo.bar.com/static/foo.js", + origins: []string{"https://foo.com/", "http://*.baz.com/"}, + shouldMatch: false, + }, + { + name: "matches with wildcard without protocol", + url: "http://foo.bar.com/static/foo.js", + origins: []string{"https://foo.com/", "*.bar.com/"}, + shouldMatch: true, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + actual := urlMatchesOrigins(tc.url, tc.origins) + + if tc.shouldMatch { + require.True(t, actual, "expected %v to be matched from origin set %v", tc.url, tc.origins) + } else { + require.False(t, actual, "expected %v to not be matched from origin set %v", tc.url, tc.origins) + } + }) + } +} + +type mockHTTPClient struct { + responses []struct { + *http.Response + error + } + requests []string +} + +func (cl *mockHTTPClient) Get(url string) (resp *http.Response, err error) { + if len(cl.responses) > len(cl.requests) { + r := cl.responses[len(cl.requests)] + cl.requests = append(cl.requests, url) + return r.Response, r.error + } + return nil, errors.New("mockHTTPClient got more requests than expected") +} + +type mockFileService struct { + files map[string][]byte + stats []string + reads []string +} + +func (s *mockFileService) Stat(name string) (fs.FileInfo, error) { + s.stats = append(s.stats, name) + _, ok := s.files[name] + if !ok { + return nil, errors.New("file not found") + } + return nil, nil +} + +func (s *mockFileService) ReadFile(name string) ([]byte, error) { + s.reads = append(s.reads, name) + content, ok := s.files[name] + if ok { + return content, nil + } + return nil, errors.New("file not found") +} + +func newResponseFromTestData(t *testing.T, file string) *http.Response { + return &http.Response{ + Body: io.NopCloser(bytes.NewReader(loadTestData(t, file))), + StatusCode: 200, + } +} + +func mockException() *payload.Exception { + return &payload.Exception{ + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Colno: 6, + Filename: "http://localhost:1234/foo.js", + Function: "eval", + Lineno: 5, + }, + { + Colno: 5, + Filename: "http://localhost:1234/foo.js", + Function: "callUndefined", + Lineno: 6, + }, + }, + }, + } +} + +func loadTestData(t *testing.T, file string) []byte { + t.Helper() + // Safe to disable, this is a test. + // nolint:gosec + content, err := os.ReadFile(filepath.Join("testdata", file)) + require.NoError(t, err, "expected to be able to read file") + require.True(t, len(content) > 0) + return content +} diff --git a/component/faro/receiver/testdata/foo.js b/component/faro/receiver/testdata/foo.js new file mode 100644 index 000000000000..b38652a4eef6 --- /dev/null +++ b/component/faro/receiver/testdata/foo.js @@ -0,0 +1,39 @@ +function throwError() { + throw new Error('This is a thrown error'); +} +function callUndefined() { + // eslint-disable-next-line no-eval + eval('test();'); +} +function callConsole(method) { + // eslint-disable-next-line no-console + console[method](`This is a console ${method} message`); +} +function fetchError() { + fetch('http://localhost:12345', { + method: 'POST' + }); +} +function promiseReject() { + new Promise((_accept, reject)=>{ + reject('This is a rejected promise'); + }); +} +function fetchSuccess() { + fetch('http://localhost:1234'); +} +function sendCustomMetric() { + window.grafanaJavaScriptAgent.api.pushMeasurement({ + type: 'custom', + values: { + my_custom_metric: Math.random() + } + }); +} +window.addEventListener('load', ()=>{ + window.grafanaJavaScriptAgent.api.pushLog([ + 'Manual event from Home' + ]); +}); + +//# sourceMappingURL=foo.js.map diff --git a/component/faro/receiver/testdata/foo.js.map b/component/faro/receiver/testdata/foo.js.map new file mode 100644 index 000000000000..0cd49989742f --- /dev/null +++ b/component/faro/receiver/testdata/foo.js.map @@ -0,0 +1 @@ +{"mappings":"SAAS,UAAU,GAAG,CAAC;IACrB,KAAK,CAAC,GAAG,CAAC,KAAK,CAAC,CAAwB;AAC1C,CAAC;SAEQ,aAAa,GAAG,CAAC;IACxB,EAAmC,AAAnC,iCAAmC;IACnC,IAAI,CAAC,CAAS;AAChB,CAAC;SAEQ,WAAW,CAAC,MAAmD,EAAE,CAAC;IACzE,EAAsC,AAAtC,oCAAsC;IACtC,OAAO,CAAC,MAAM,GAAG,kBAAkB,EAAE,MAAM,CAAC,QAAQ;AACtD,CAAC;SAEQ,UAAU,GAAG,CAAC;IACrB,KAAK,CAAC,CAAwB,yBAAE,CAAC;QAC/B,MAAM,EAAE,CAAM;IAChB,CAAC;AACH,CAAC;SAEQ,aAAa,GAAG,CAAC;IACxB,GAAG,CAAC,OAAO,EAAE,OAAO,EAAE,MAAM,GAAK,CAAC;QAChC,MAAM,CAAC,CAA4B;IACrC,CAAC;AACH,CAAC;SAEQ,YAAY,GAAG,CAAC;IACvB,KAAK,CAAC,CAAuB;AAC/B,CAAC;SAEQ,gBAAgB,GAAG,CAAC;IAC1B,MAAM,CAAS,sBAAsB,CAAC,GAAG,CAAC,eAAe,CAAC,CAAC;QAC1D,IAAI,EAAE,CAAQ;QACd,MAAM,EAAE,CAAC;YACP,gBAAgB,EAAE,IAAI,CAAC,MAAM;QAC/B,CAAC;IACH,CAAC;AACH,CAAC;AAED,MAAM,CAAC,gBAAgB,CAAC,CAAM,WAAQ,CAAC;IACpC,MAAM,CAAS,sBAAsB,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;QAAA,CAAwB;IAAA,CAAC;AAC/E,CAAC","sources":["demo/src/actions.ts"],"sourcesContent":["function throwError() {\n throw new Error('This is a thrown error');\n}\n\nfunction callUndefined() {\n // eslint-disable-next-line no-eval\n eval('test();');\n}\n\nfunction callConsole(method: 'trace' | 'info' | 'log' | 'warn' | 'error') {\n // eslint-disable-next-line no-console\n console[method](`This is a console ${method} message`);\n}\n\nfunction fetchError() {\n fetch('http://localhost:12345', {\n method: 'POST',\n });\n}\n\nfunction promiseReject() {\n new Promise((_accept, reject) => {\n reject('This is a rejected promise');\n });\n}\n\nfunction fetchSuccess() {\n fetch('http://localhost:1234');\n}\n\nfunction sendCustomMetric() {\n (window as any).grafanaJavaScriptAgent.api.pushMeasurement({\n type: 'custom',\n values: {\n my_custom_metric: Math.random(),\n },\n });\n}\n\nwindow.addEventListener('load', () => {\n (window as any).grafanaJavaScriptAgent.api.pushLog(['Manual event from Home']);\n});\n"],"names":[],"version":3,"file":"index.28a7d598.js.map","sourceRoot":"/__parcel_source_root/"} \ No newline at end of file diff --git a/component/faro/receiver/testdata/payload.json b/component/faro/receiver/testdata/payload.json new file mode 100644 index 000000000000..5646a6b05294 --- /dev/null +++ b/component/faro/receiver/testdata/payload.json @@ -0,0 +1,330 @@ +{ + "logs": [ + { + "message": "opened pricing page", + "level": "info", + "context": { + "component": "AppRoot", + "page": "Pricing" + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + } + }, + { + "message": "loading price list", + "level": "trace", + "context": { + "component": "AppRoot", + "page": "Pricing" + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "ghj" + } + } + ], + "exceptions": [ + { + "type": "Error", + "value": "Cannot read property 'find' of undefined", + "stacktrace": { + "frames": [ + { + "colno": 42, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "in_app": true, + "lineno": 8639 + }, + { + "colno": 9, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "dispatchAction", + "in_app": true, + "lineno": 268095 + }, + { + "colno": 13, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "scheduleUpdateOnFiber", + "in_app": true, + "lineno": 273726 + }, + { + "colno": 7, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "flushSyncCallbackQueue", + "in_app": true, + "lineno": 263362 + }, + { + "colno": 13, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "flushSyncCallbackQueueImpl", + "in_app": true, + "lineno": 263374 + }, + { + "colno": 14, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "runWithPriority$1", + "lineno": 263325 + }, + { + "colno": 16, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "unstable_runWithPriority", + "lineno": 291265 + }, + { + "colno": 30, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "lineno": 263379 + }, + { + "colno": 22, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "performSyncWorkOnRoot", + "lineno": 274126 + }, + { + "colno": 11, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "renderRootSync", + "lineno": 274509 + }, + { + "colno": 9, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "workLoopSync", + "lineno": 274543 + }, + { + "colno": 16, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "performUnitOfWork", + "lineno": 274606 + }, + { + "colno": 18, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "beginWork$1", + "in_app": true, + "lineno": 275746 + }, + { + "colno": 20, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "beginWork", + "lineno": 270944 + }, + { + "colno": 24, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "updateFunctionComponent", + "lineno": 269291 + }, + { + "colno": 22, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "renderWithHooks", + "lineno": 266969 + }, + { + "colno": 74, + "filename": "http://fe:3002/static/js/main.chunk.js", + "function": "?", + "in_app": true, + "lineno": 2600 + }, + { + "colno": 65, + "filename": "http://fe:3002/static/js/main.chunk.js", + "function": "useGetBooksQuery", + "lineno": 1299 + }, + { + "colno": 85, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "Module.useQuery", + "lineno": 8495 + }, + { + "colno": 83, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "useBaseQuery", + "in_app": true, + "lineno": 8656 + }, + { + "colno": 14, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "useDeepMemo", + "lineno": 8696 + }, + { + "colno": 55, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "lineno": 8657 + }, + { + "colno": 47, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData.execute", + "in_app": true, + "lineno": 7883 + }, + { + "colno": 23, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData.getExecuteResult", + "lineno": 7944 + }, + { + "colno": 19, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData._this.getQueryResult", + "lineno": 7790 + }, + { + "colno": 24, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "new ApolloError", + "in_app": true, + "lineno": 5164 + } + ] + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + }, + "context": { + "component": "ReactErrorBoundary", + "ReactError": "Annoying Error" + } + } + ], + "measurements": [ + { + "values": { + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14 + }, + "type": "page load", + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + }, + "context": { + "hello": "world" + } + } + ], + "events": [ + { + "name": "click_login_button", + "domain": "frontend", + "attributes": { + "foo": "bar", + "one": "two" + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + } + }, + { + "name": "click_reset_password_button", + "timestamp": "2021-09-30T10:46:17.680Z" + } + ], + "meta": { + "sdk": { + "name": "grafana-frontend-agent", + "version": "1.0.0" + }, + "app": { + "name": "testapp", + "release": "0.8.2", + "version": "abcdefg", + "environment": "production" + }, + "user": { + "username": "domasx2", + "id": "123", + "email": "geralt@kaermorhen.org", + "attributes": { + "foo": "bar" + } + }, + "session": { + "id": "abcd", + "attributes": { + "time_elapsed": "100s" + } + }, + "page": { + "url": "https://example.com/page" + }, + "browser": { + "name": "chrome", + "version": "88.12.1", + "os": "linux", + "mobile": false + }, + "view": { + "name": "foobar" + } + }, + "traces": { + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + } + ] + }, + "instrumentationLibrarySpans": [ + { + "instrumentationLibrary": { + "name": "name", + "version": "version" + }, + "spans": [ + { + "traceId": "", + "spanId": "", + "parentSpanId": "", + "name": "testSpan", + "status": {} + }, + { + "traceId": "", + "spanId": "", + "parentSpanId": "", + "name": "testSpan2", + "status": {} + } + ] + } + ] + } + ] + } +} diff --git a/component/faro/receiver/testdata/payload_2.json b/component/faro/receiver/testdata/payload_2.json new file mode 100644 index 000000000000..eb8b18e56585 --- /dev/null +++ b/component/faro/receiver/testdata/payload_2.json @@ -0,0 +1,393 @@ +{ + "logs": [ + { + "message": "opened pricing page", + "level": "info", + "context": { + "component": "AppRoot", + "page": "Pricing" + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + } + }, + { + "message": "loading price list", + "level": "trace", + "context": { + "component": "AppRoot", + "page": "Pricing" + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "ghj" + } + } + ], + "exceptions": [ + { + "type": "Error", + "value": "Cannot read property 'find' of undefined", + "stacktrace": { + "frames": [ + { + "colno": 42, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "in_app": true, + "lineno": 8639 + }, + { + "colno": 9, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "dispatchAction", + "in_app": true, + "lineno": 268095 + }, + { + "colno": 13, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "scheduleUpdateOnFiber", + "in_app": true, + "lineno": 273726 + }, + { + "colno": 7, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "flushSyncCallbackQueue", + "in_app": true, + "lineno": 263362 + }, + { + "colno": 13, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "flushSyncCallbackQueueImpl", + "in_app": true, + "lineno": 263374 + }, + { + "colno": 14, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "runWithPriority$1", + "lineno": 263325 + }, + { + "colno": 16, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "unstable_runWithPriority", + "lineno": 291265 + }, + { + "colno": 30, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "lineno": 263379 + }, + { + "colno": 22, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "performSyncWorkOnRoot", + "lineno": 274126 + }, + { + "colno": 11, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "renderRootSync", + "lineno": 274509 + }, + { + "colno": 9, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "workLoopSync", + "lineno": 274543 + }, + { + "colno": 16, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "performUnitOfWork", + "lineno": 274606 + }, + { + "colno": 18, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "beginWork$1", + "in_app": true, + "lineno": 275746 + }, + { + "colno": 20, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "beginWork", + "lineno": 270944 + }, + { + "colno": 24, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "updateFunctionComponent", + "lineno": 269291 + }, + { + "colno": 22, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "renderWithHooks", + "lineno": 266969 + }, + { + "colno": 74, + "filename": "http://fe:3002/static/js/main.chunk.js", + "function": "?", + "in_app": true, + "lineno": 2600 + }, + { + "colno": 65, + "filename": "http://fe:3002/static/js/main.chunk.js", + "function": "useGetBooksQuery", + "lineno": 1299 + }, + { + "colno": 85, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "Module.useQuery", + "lineno": 8495 + }, + { + "colno": 83, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "useBaseQuery", + "in_app": true, + "lineno": 8656 + }, + { + "colno": 14, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "useDeepMemo", + "lineno": 8696 + }, + { + "colno": 55, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "?", + "lineno": 8657 + }, + { + "colno": 47, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData.execute", + "in_app": true, + "lineno": 7883 + }, + { + "colno": 23, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData.getExecuteResult", + "lineno": 7944 + }, + { + "colno": 19, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "QueryData._this.getQueryResult", + "lineno": 7790 + }, + { + "colno": 24, + "filename": "http://fe:3002/static/js/vendors~main.chunk.js", + "function": "new ApolloError", + "in_app": true, + "lineno": 5164 + } + ] + }, + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + } + } + ], + "measurements": [ + { + "values": { + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14 + }, + "type": "page load", + "timestamp": "2021-09-30T10:46:17.680Z", + "trace": { + "trace_id": "abcd", + "span_id": "def" + } + } + ], + "meta": { + "sdk": { + "name": "grafana-frontend-agent", + "version": "1.0.0" + }, + "app": { + "name": "testapp", + "release": "0.8.2", + "version": "abcdefg", + "environment": "production" + }, + "user": { + "username": "domasx2", + "attributes": { + "foo": "bar" + } + }, + "session": { + "id": "abcd", + "attributes": { + "time_elapsed": "100s" + } + }, + "page": { + "url": "https://example.com/page" + }, + "browser": { + "name": "chrome", + "version": "88.12.1", + "os": "linux", + "mobile": false + }, + "view": { + "name": "foobar" + } + }, + "traces": { + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "unknown_service" + } + }, + { + "key": "telemetry.sdk.language", + "value": { + "stringValue": "webjs" + } + }, + { + "key": "telemetry.sdk.name", + "value": { + "stringValue": "opentelemetry" + } + }, + { + "key": "telemetry.sdk.version", + "value": { + "stringValue": "1.0.1" + } + } + ], + "droppedAttributesCount": 0 + }, + "instrumentationLibrarySpans": [ + { + "spans": [ + { + "traceId": "2d6f18da2663c7e477df23d8a8ad95b7", + "spanId": "50e64e3fac969cbb", + "parentSpanId": "9d9da6529d56706c", + "name": "documentFetch", + "kind": 1, + "startTimeUnixNano": 1646228314336100000, + "endTimeUnixNano": 1646228314351000000, + "attributes": [ + { + "key": "component", + "value": { + "stringValue": "document-load" + } + }, + { + "key": "http.response_content_length", + "value": { + "intValue": 1326 + } + } + ], + "droppedAttributesCount": 0, + "events": [ + { + "timeUnixNano": 1646228314336100000, + "name": "fetchStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314342000000, + "name": "domainLookupStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314342000000, + "name": "domainLookupEnd", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314342000000, + "name": "connectStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314330100000, + "name": "secureConnectionStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314342500000, + "name": "connectEnd", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314342700000, + "name": "requestStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314347000000, + "name": "responseStart", + "attributes": [], + "droppedAttributesCount": 0 + }, + { + "timeUnixNano": 1646228314351000000, + "name": "responseEnd", + "attributes": [], + "droppedAttributesCount": 0 + } + ], + "droppedEventsCount": 0, + "status": { + "code": 0 + }, + "links": [], + "droppedLinksCount": 0 + } + ], + "instrumentationLibrary": { + "name": "@opentelemetry/instrumentation-document-load", + "version": "0.27.1" + } + } + ] + } + ] + } +} diff --git a/docs/sources/flow/reference/components/faro.receiver.md b/docs/sources/flow/reference/components/faro.receiver.md new file mode 100644 index 000000000000..2cd7e898b123 --- /dev/null +++ b/docs/sources/flow/reference/components/faro.receiver.md @@ -0,0 +1,268 @@ +--- +aliases: +- /docs/grafana-cloud/agent/flow/reference/components/faro.receiver/ +- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/faro.receiver/ +- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/faro.receiver/ +canonical: https://grafana.com/docs/agent/latest/flow/reference/components/faro.receiver/ +title: faro.receiver +description: Learn about the faro.receiver +--- + +# faro.receiver + +`faro.receiver` accepts web application telemetry data from the [Grafana Faro Web SDK][faro-sdk] +and forwards it to other components for future processing. + +[faro-sdk]: https://github.com/grafana/faro-web-sdk + +## Usage + +```river +faro.receiver "LABEL" { + output { + logs = [LOKI_RECEIVERS] + traces = [OTELCOL_COMPONENTS] + } +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`extra_log_labels` | `map(string)` | Extra labels to attach to emitted log lines. | `{}` | no + +## Blocks + +The following blocks are supported inside the definition of `faro.receiver`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +server | [server][] | Configures the HTTP server. | no +server > rate_limiting | [rate_limiting][] | Configures rate limiting for the HTTP server. | no +sourcemaps | [sourcemaps][] | Configures sourcemap retrieval. | no +sourcemaps > location | [location][] | Configures on-disk location for sourcemap retrieval. | no +output | [output][] | Configures where to send collected telemetry data. | yes + +[server]: #server-block +[rate_limiting]: #rate_limiting-block +[sourcemaps]: #sourcemaps-block +[location]: #location-block +[output]: #output-block + +### server block + +The `server` block configures the HTTP server managed by the `faro.receiver` +component. Clients using the [Grafana Faro Web SDK][faro-sdk] forward telemetry +data to this HTTP server for processing. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`listen_address` | `string` | Address to listen for HTTP traffic on. | `127.0.0.1` | no +`listen_port` | `number` | Port to listen for HTTP traffic on. | `12347` | no +`cors_allowed_origins` | `list(string)` | Origins for which cross-origin requests are permitted. | `[]` | no +`api_key` | `secret` | Optional API key to validate client requests with. | `""` | no +`max_allowed_payload_size` | `string` | Maximum size (in bytes) for client requests. | `"5MiB"` | no + +By default, telemetry data is only accepted from applications on the same local +network as the browser. To accept telemetry data from a wider set of clients, +modify the `listen_address` attribute to the IP address of the appropriate +network interface to use. + +The `cors_allowed_origins` argument determines what origins browser requests +may come from. The default value, `[]`, disables CORS support. To support +requests from all origins, set `cors_allowed_origins` to `["*"]`. The `*` +character indicates a wildcard. + +When the `api_key` argument is non-empty, client requests must have an HTTP +header called `X-API-Key` matching the value of the `api_key` argument. +Requests that are missing the header or have the wrong value are rejected with +an `HTTP 401 Unauthorized` status code. If the `api_key` argument is empty, no +authentication checks are performed, and the `X-API-Key` HTTP header is +ignored. + +### rate_limiting block + +The `rate_limiting` block configures rate limiting for client requests. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enabled` | `bool` | Whether to enable rate limiting. | `true` | no +`rate` | `number` | Rate of allowed requests per second. | `50` | no +`burst_size` | `number` | Allowed burst size of requests. | `100` | no + +Rate limiting functions as a [token bucket algorithm][token-bucket], where +a bucket has a maximum capacity for up to `burst_size` requests and refills at a +rate of `rate` per second. + +Each HTTP request drains the capacity of the bucket by one. Once the bucket is +empty, HTTP requests are rejected with an `HTTP 429 Too Many Requests` status +code until the bucket has more available capacity. + +Configuring the `rate` argument determines how fast the bucket refills, and +configuring the `burst_size` argument determines how many requests can be +received in a burst before the bucket is empty and starts rejecting requests. + +[token-bucket]: https://en.wikipedia.org/wiki/Token_bucket + +### sourcemaps block + +The `sourcemaps` block configures how to retrieve sourcemaps. Sourcemaps are +then used to transform file and line information from minified code into the +file and line information from the original source code. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`download` | `bool` | Whether to download sourcemaps. | `true` | no +`download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no +`download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no + +When exceptions are sent to the `faro.receiver` component, it can download +sourcemaps from the web application. You can disable this behavior by setting +the `download` argument to `false`. + +The `download_from_origins` argument determines which origins a sourcemap may +be downloaded from. The origin is attached to the URL that a browser is sending +telemetry data from. The default value, `["*"]`, enables downloading sourcemaps +from all origins. The `*` character indicates a wildcard. + +By default, sourcemap downloads are subject to a timeout of `"1s"`, specified +by the `download_timeout` argument. Setting `download_timeout` to `"0s"` +disables timeouts. + +To retrieve sourcemaps from disk instead of the network, specify one or more +[`location` blocks][location]. When `location` blocks are provided, they are +checked first for sourcemaps before falling back to downloading. + +### location block + +The `location` block declares a location where sourcemaps are stored on the +filesystem. The `location` block can be specified multiple times to declare +multiple locations where sourcemaps are stored. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`path` | `string` | The path on disk where sourcemaps are stored. | | yes +`minified_path_prefix` | `string` | The prefix of the minified path sent from browsers. | | yes + +The `minified_path_prefix` argument determines the prefix of paths to +Javascript files, such as `http://example.com/`. The `path` argument then +determines where to find the sourcemap for the file. + +For example, given the following location block: + +``` +location { + path = "/var/my-app/build" + minified_path_prefix = "http://example.com/" +} +``` + +To look up the sourcemaps for a file hosted at `http://example.com/foo.js`, the +`faro.receiver` component will: + +1. Remove the minified path prefix to extract the path to the file (`foo.js`). +2. Search for that file path with a `.map` extension (`foo.js.map`) in `path` + (`/var/my-app/build/foo.js.map`). + +Optionally, the value for the `path` argument may contain `{{ .Release }}` as a +template value, such as `/var/my-app/{{ .Release }}/build`. The template value +will be replaced with the release value provided by the [Faro Web App SDK][faro-sdk]. + +### output block + +The `output` block specifies where to forward collected logs and traces. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`logs` | `list(LogsReceiver)` | A list of `loki` components to forward logs to. | `[]` | no +`traces` | `list(otelcol.Consumer)` | A list of `otelcol` components to forward traces to. | `[]` | no + +## Exported fields + +`faro.receiver` does not export any fields. + +## Component health + +`faro.receiver` is reported as unhealthy when the integrated server fails to +start. + +## Debug information + +`faro.receiver` does not expose any component-specific debug information. + +### Debug metrics + +`faro.receiver` exposes the following metrics for monitoring the component: + +* `faro_receiver_logs_total` (counter): Total number of ingested logs. +* `faro_receiver_measurements_total` (counter): Total number of ingested measurements. +* `faro_receiver_exceptions_total` (counter): Total number of ingested exceptions. +* `faro_receiver_events_total` (counter): Total number of ingested events. +* `faro_receiver_exporter_errors_total` (counter): Total number of errors produced by an internal exporter. +* `faro_receiver_request_duration_seconds` (histogram): Time (in seconds) spent serving HTTP requests. +* `faro_receiver_request_message_bytes` (histogram): Size (in bytes) of HTTP requests received from clients. +* `faro_receiver_response_message_bytes` (histogram): Size (in bytes) of HTTP responses sent to clients. +* `faro_receiver_inflight_requests` (gauge): Current number of inflight requests. +* `faro_receiver_sourcemap_cache_size` (counter): Number of items in sourcemap cache per origin. +* `faro_receiver_sourcemap_downloads_total` (counter): Total number of sourcemap downloads performed per origin and status. +* `faro_receiver_sourcemap_file_reads_total` (counter): Total number of sourcemap retrievals using the filesystem per origin and status. + +## Example + +```river +faro.receiver "default" { + server { + listen_address = "NETWORK_ADDRESS" + } + + sourcemaps { + location { + path = "PATH_TO_SOURCEMAPS" + minified_path_prefix = "WEB_APP_PREFIX" + } + } + + output { + logs = [loki.write.default.receiver] + traces = [otelcol.exporter.otlp.traces.input] + } +} + +loki.write "default" { + endpoint { + url = "https://LOKI_ADDRESS/api/v1/push" + } +} + +otelcol.exporter.otlp "traces" { + client { + endpoint = "OTLP_ADDRESS" + } +} +``` + +Replace the following: + +* `NETWORK_ADDRESS`: IP address of the network interface to listen to traffic + on. This IP address must be reachable by browsers using the web application + to instrument. + +* `PATH_TO_SOURCEMAPS`: Path on disk where sourcemaps are located. + +* `WEB_APP_PREFIX`: Prefix of the web application being instrumented. + +* `LOKI_ADDRESS`: Address of the Loki server to send logs to. + + * If authentication is required to send logs to the Loki server, refer to the + documentation of [loki.write][] for more information. + +* `OTLP_ADDRESS`: The address of the OTLP-compatible server to send traces to. + + * If authentication is required to send logs to the Loki server, refer to the + documentation of [otelcol.exporter.otlp][] for more information. + +[loki.write]: {{< relref "./loki.write.md" >}} +[otelcol.exporter.otlp]: {{< relref "./otelcol.exporter.otlp.md" >}}