From fa6a3738d07f5210adaffe654f56dfcc8740cd81 Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Thu, 12 Dec 2024 21:40:31 +0530 Subject: [PATCH 1/4] Add json format support for log export via faro receiver Update docs --- CHANGELOG.md | 1 + .../components/faro/faro.receiver.md | 8 + internal/component/faro/receiver/arguments.go | 34 +++ internal/component/faro/receiver/exporters.go | 19 +- .../component/faro/receiver/exporters_test.go | 282 ++++++++++++++++++ internal/component/faro/receiver/receiver.go | 2 +- .../component/faro/receiver/receiver_test.go | 209 ++++++------- .../component/faro/receiver/test_utils.go | 60 ++++ .../testdata-v2/integrations_v2.alloy | 1 + .../testdata-v2/unsupported.alloy | 1 + 10 files changed, 499 insertions(+), 118 deletions(-) create mode 100644 internal/component/faro/receiver/test_utils.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8279c67197..4b0f986478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Main (unreleased) - Add perf_schema quantile columns to collector - Live Debugging button should appear in UI only for supported components (@ravishankar15) +- Add json format support for log export via faro receiver (@ravishankar15) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) - Add `ignore_older_than` option for local.file_match (@ravishankar15) - Add livedebugging support for `discover.relabel` (@ravishankar15) diff --git a/docs/sources/reference/components/faro/faro.receiver.md b/docs/sources/reference/components/faro/faro.receiver.md index cdb70f9a7c..52f3b33622 100644 --- a/docs/sources/reference/components/faro/faro.receiver.md +++ b/docs/sources/reference/components/faro/faro.receiver.md @@ -30,6 +30,14 @@ The following arguments are supported: Name | Type | Description | Default | Required -------------------|---------------|----------------------------------------------|---------|--------- `extra_log_labels` | `map(string)` | Extra labels to attach to emitted log lines. | `{}` | no +`log_format` | `string` | Export format for the logs | `logfmt`| no + +### Log format + +The following strings are recognized as valid log line formats: + +* `"logfmt"`: Export logs as [logfmt][] lines. +* `"json"`: Export logs as JSON objects. ## Blocks diff --git a/internal/component/faro/receiver/arguments.go b/internal/component/faro/receiver/arguments.go index 915f472574..232f561a94 100644 --- a/internal/component/faro/receiver/arguments.go +++ b/internal/component/faro/receiver/arguments.go @@ -1,6 +1,8 @@ package receiver import ( + "encoding" + "fmt" "time" "github.com/alecthomas/units" @@ -13,6 +15,7 @@ import ( // Arguments configures the app_agent_receiver component. type Arguments struct { LogLabels map[string]string `alloy:"extra_log_labels,attr,optional"` + LogFormat LogFormat `alloy:"log_format,attr,optional"` Server ServerArguments `alloy:"server,block,optional"` SourceMaps SourceMapsArguments `alloy:"sourcemaps,block,optional"` @@ -23,6 +26,7 @@ var _ syntax.Defaulter = (*Arguments)(nil) // SetToDefault applies default settings. func (args *Arguments) SetToDefault() { + args.LogFormat = FormatDefault args.Server.SetToDefault() args.SourceMaps.SetToDefault() } @@ -93,3 +97,33 @@ type OutputArguments struct { Logs []loki.LogsReceiver `alloy:"logs,attr,optional"` Traces []otelcol.Consumer `alloy:"traces,attr,optional"` } + +type LogFormat string + +const ( + FormatLogfmt LogFormat = "logfmt" + FormatJSON LogFormat = "json" + + FormatDefault = FormatLogfmt +) + +var ( + _ encoding.TextMarshaler = FormatDefault + _ encoding.TextUnmarshaler = (*LogFormat)(nil) +) + +func (ll LogFormat) MarshalText() (text []byte, err error) { + return []byte(ll), nil +} + +func (ll *LogFormat) UnmarshalText(text []byte) error { + switch LogFormat(text) { + case "": + *ll = FormatDefault + case FormatLogfmt, FormatJSON: + *ll = LogFormat(text) + default: + return fmt.Errorf("unrecognized log format %q", string(text)) + } + return nil +} diff --git a/internal/component/faro/receiver/exporters.go b/internal/component/faro/receiver/exporters.go index 1543b126c7..c02b106083 100644 --- a/internal/component/faro/receiver/exporters.go +++ b/internal/component/faro/receiver/exporters.go @@ -2,6 +2,7 @@ package receiver import ( "context" + "encoding/json" "errors" "fmt" "sync" @@ -83,6 +84,7 @@ func (exp *metricsExporter) Export(ctx context.Context, p payload.Payload) error type logsExporter struct { log log.Logger sourceMaps sourceMapsStore + format LogFormat receiversMut sync.RWMutex receivers []loki.LogsReceiver @@ -93,10 +95,11 @@ type logsExporter struct { var _ exporter = (*logsExporter)(nil) -func newLogsExporter(log log.Logger, sourceMaps sourceMapsStore) *logsExporter { +func newLogsExporter(log log.Logger, sourceMaps sourceMapsStore, format LogFormat) *logsExporter { return &logsExporter{ log: log, sourceMaps: sourceMaps, + format: format, } } @@ -157,7 +160,19 @@ func (exp *logsExporter) sendKeyValsToLogsPipeline(ctx context.Context, kv *payl ) exp.receiversMut.RUnlock() - line, err := logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + var ( + line []byte + err error + ) + switch exp.format { + case FormatLogfmt: + line, err = logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + case FormatJSON: + line, err = json.Marshal(payload.KeyValToInterfaceMap(kv)) + default: + line, err = logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + } + if err != nil { level.Error(exp.log).Log("msg", "failed to logfmt a frontend log event", "err", err) return err diff --git a/internal/component/faro/receiver/exporters_test.go b/internal/component/faro/receiver/exporters_test.go index 7f7511827e..8f562f1de7 100644 --- a/internal/component/faro/receiver/exporters_test.go +++ b/internal/component/faro/receiver/exporters_test.go @@ -4,10 +4,16 @@ import ( "context" "strings" "testing" + "time" + "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/faro/receiver/internal/payload" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/prometheus/client_golang/prometheus" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -53,3 +59,279 @@ func Test_metricsExporter_Export(t *testing.T) { err := promtestutil.CollectAndCompare(reg, strings.NewReader(expect), metricNames...) require.NoError(t, err) } + +func Test_LogsExporter_Export(t *testing.T) { + now, err := time.Parse("2006-01-02T15:04:05Z0700", "2021-09-30T10:46:17.680Z") + require.NoError(t, err) + tt := []struct { + desc string + format LogFormat + payload payload.Payload + expect loki.Entry + }{ + { + desc: "export logfmt for log payload", + format: FormatLogfmt, + payload: payload.Payload{ + Logs: []payload.Log{ + { + Message: "React Router Future Flag Warning", + LogLevel: payload.LogLevelInfo, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=log message="React Router Future Flag Warning" level=info traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for exception payload", + format: FormatLogfmt, + payload: payload.Payload{ + Exceptions: []payload.Exception{ + { + Type: "Error", + Value: "Cannot read property 'find' of undefined", + Timestamp: now, + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Function: "?", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 8639, + Colno: 42, + }, + { + Function: "dispatchAction", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 268095, + Colno: 9, + }, + }, + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("exception"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=exception type=Error value="Cannot read property 'find' of undefined" stacktrace="Error: Cannot read property 'find' of undefined\n at ? (http://fe:3002/static/js/vendors~main.chunk.js:8639:42)\n at dispatchAction (http://fe:3002/static/js/vendors~main.chunk.js:268095:9)" hash=2735541995122471342 browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for measurement payload", + format: FormatLogfmt, + payload: payload.Payload{ + Measurements: []payload.Measurement{ + { + Type: "sum", + Values: map[string]float64{ + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14, + }, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + Context: payload.MeasurementContext{"host": "localhost"}, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("measurement"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=measurement type=sum ttfb=14.000000 ttfcp=22.120000 ttfp=20.120000 traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa context_host=localhost value_ttfb=14 value_ttfcp=22.12 value_ttfp=20.12 browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for event payload", + format: FormatLogfmt, + payload: payload.Payload{ + Events: []payload.Event{ + { + Name: "click_login_button", + Domain: "frontend", + Attributes: map[string]string{"button_name": "login"}, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("event"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=event event_name=click_login_button event_domain=frontend event_data_button_name=login traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa browser_mobile=false`, + }, + }, + }, + { + desc: "export json for log payload", + format: FormatJSON, + payload: payload.Payload{ + Logs: []payload.Log{ + { + Message: "React Router Future Flag Warning", + LogLevel: payload.LogLevelInfo, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","kind":"log","level":"info","message":"React Router Future Flag Warning","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838"}`, + }, + }, + }, + { + desc: "export json for exception payload", + format: FormatJSON, + payload: payload.Payload{ + Exceptions: []payload.Exception{ + { + Type: "Error", + Value: "Cannot read property 'find' of undefined", + Timestamp: now, + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Function: "?", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 8639, + Colno: 42, + }, + { + Function: "dispatchAction", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 268095, + Colno: 9, + }, + }, + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("exception"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","hash":"2735541995122471342","kind":"exception","stacktrace":"Error: Cannot read property 'find' of undefined\n at ? (http://fe:3002/static/js/vendors~main.chunk.js:8639:42)\n at dispatchAction (http://fe:3002/static/js/vendors~main.chunk.js:268095:9)","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","type":"Error","value":"Cannot read property 'find' of undefined"}`, + }, + }, + }, + { + desc: "export json for measurement payload", + format: FormatJSON, + payload: payload.Payload{ + Measurements: []payload.Measurement{ + { + Type: "sum", + Values: map[string]float64{ + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14, + }, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + Context: payload.MeasurementContext{"host": "localhost"}, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("measurement"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","context_host":"localhost","kind":"measurement","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838","ttfb":"14.000000","ttfcp":"22.120000","ttfp":"20.120000","type":"sum","value_ttfb":14,"value_ttfcp":22.12,"value_ttfp":20.12}`, + }, + }, + }, + { + desc: "export json for event payload", + format: FormatJSON, + payload: payload.Payload{ + Events: []payload.Event{ + { + Name: "click_login_button", + Domain: "frontend", + Attributes: map[string]string{"button_name": "login"}, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("event"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","event_data_button_name":"login","event_domain":"frontend","event_name":"click_login_button","kind":"event","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838"}`, + }, + }, + }, + } + for _, tc := range tt { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + var ( + lr = newFakeLogsReceiver(t) + exp = newLogsExporter(util.TestLogger(t), &varSourceMapsStore{}, tc.format) + ) + exp.SetReceivers([]loki.LogsReceiver{lr}) + exp.SetLabels(map[string]string{ + "foo": "bar", + "kind": "", + }) + ctx := componenttest.TestContext(t) + require.NoError(t, exp.Export(ctx, tc.payload)) + // Sleep for 2ms since fake logger process in separate go routine + time.Sleep(2 * time.Millisecond) + require.Len(t, lr.GetEntries(), 1) + require.Equal(t, tc.expect, lr.entries[0]) + }) + } +} diff --git a/internal/component/faro/receiver/receiver.go b/internal/component/faro/receiver/receiver.go index b5ed5e572b..aebab7ebce 100644 --- a/internal/component/faro/receiver/receiver.go +++ b/internal/component/faro/receiver/receiver.go @@ -54,7 +54,7 @@ func New(o component.Options, args Arguments) (*Component, error) { varStore = &varSourceMapsStore{} metrics = newMetricsExporter(o.Registerer) - logs = newLogsExporter(log.With(o.Logger, "exporter", "logs"), varStore) + logs = newLogsExporter(log.With(o.Logger, "exporter", "logs"), varStore, args.LogFormat) traces = newTracesExporter(log.With(o.Logger, "exporter", "traces")) ) diff --git a/internal/component/faro/receiver/receiver_test.go b/internal/component/faro/receiver/receiver_test.go index 84c35288ec..15c732e3a5 100644 --- a/internal/component/faro/receiver/receiver_test.go +++ b/internal/component/faro/receiver/receiver_test.go @@ -4,9 +4,7 @@ import ( "fmt" "net/http" "strings" - "sync" "testing" - "time" "github.com/grafana/loki/v3/pkg/logproto" "github.com/phayes/freeport" @@ -21,54 +19,91 @@ import ( // Test performs an end-to-end test of the component. func Test(t *testing.T) { - ctx := componenttest.TestContext(t) - - ctrl, err := componenttest.NewControllerFromID( - util.TestLogger(t), - "faro.receiver", - ) - require.NoError(t, err) - - freePort, err := freeport.GetFreePort() - require.NoError(t, err) - - lr := newFakeLogsReceiver(t) - - go func() { - err := ctrl.Run(ctx, Arguments{ - LogLabels: map[string]string{ - "foo": "bar", - "kind": "", + tt := []struct { + desc string + logFormat LogFormat + expect loki.Entry + }{ + { + desc: "format logfmt", + logFormat: FormatLogfmt, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-01-01 00:00:00 +0000 UTC" kind=log message="hello, world" level=info context_env=dev traceID=0 spanID=0 browser_mobile=false`, + }, }, - - Server: ServerArguments{ - Host: "127.0.0.1", - Port: freePort, - IncludeMetadata: true, + }, + { + desc: "format json", + logFormat: FormatJSON, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","context_env":"dev","kind":"log","level":"info","message":"hello, world","spanID":"0","timestamp":"2021-01-01 00:00:00 +0000 UTC","traceID":"0"}`, + }, }, + }, + } + for _, tc := range tt { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := componenttest.TestContext(t) + + ctrl, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "faro.receiver", + ) + require.NoError(t, err) + + freePort, err := freeport.GetFreePort() + require.NoError(t, err) + + lr := newFakeLogsReceiver(t) + + go func() { + err := ctrl.Run(ctx, Arguments{ + LogLabels: map[string]string{ + "foo": "bar", + "kind": "", + }, + LogFormat: tc.logFormat, + + Server: ServerArguments{ + Host: "127.0.0.1", + Port: freePort, + IncludeMetadata: true, + }, + + Output: OutputArguments{ + Logs: []loki.LogsReceiver{lr}, + Traces: []otelcol.Consumer{}, + }, + }) + require.NoError(t, err) + }() + + // Wait for the server to be running. + util.Eventually(t, func(t require.TestingT) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", freePort)) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + }) - Output: OutputArguments{ - Logs: []loki.LogsReceiver{lr}, - Traces: []otelcol.Consumer{}, - }, - }) - require.NoError(t, err) - }() - - // Wait for the server to be running. - util.Eventually(t, func(t require.TestingT) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", freePort)) - require.NoError(t, err) - defer resp.Body.Close() - - require.Equal(t, http.StatusOK, resp.StatusCode) - }) - - // Send a sample payload to the server. - req, err := http.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/collect", freePort), - strings.NewReader(`{ + // Send a sample payload to the server. + req, err := http.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/collect", freePort), + strings.NewReader(`{ "traces": { "resourceSpans": [] }, @@ -86,77 +121,21 @@ func Test(t *testing.T) { "measurements": [], "meta": {} }`), - ) - require.NoError(t, err) - - req.Header.Add("Tenant-Id", "TENANTID") - req.Header.Add("Content-Type", "application/json") - - client := &http.Client{} - resp, err := client.Do(req) - require.NoError(t, err) - defer resp.Body.Close() + ) + require.NoError(t, err) - require.Equal(t, http.StatusAccepted, resp.StatusCode) - require.Len(t, lr.GetEntries(), 1) + req.Header.Add("Tenant-Id", "TENANTID") + req.Header.Add("Content-Type", "application/json") - expect := loki.Entry{ - Labels: model.LabelSet{ - "foo": model.LabelValue("bar"), - "kind": model.LabelValue("log"), - }, - Entry: logproto.Entry{ - Line: `timestamp="2021-01-01 00:00:00 +0000 UTC" kind=log message="hello, world" level=info context_env=dev traceID=0 spanID=0 browser_mobile=false`, - }, - } - require.Equal(t, expect, lr.entries[0]) -} - -type fakeLogsReceiver struct { - ch chan loki.Entry + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() - entriesMut sync.RWMutex - entries []loki.Entry -} - -var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil) - -func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { - ctx := componenttest.TestContext(t) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + require.Len(t, lr.GetEntries(), 1) - lr := &fakeLogsReceiver{ - ch: make(chan loki.Entry, 1), + require.Equal(t, tc.expect, lr.entries[0]) + }) } - - go func() { - defer close(lr.ch) - - select { - case <-ctx.Done(): - return - case ent := <-lr.Chan(): - lr.entriesMut.Lock() - lr.entries = append(lr.entries, loki.Entry{ - Labels: ent.Labels, - Entry: logproto.Entry{ - Timestamp: time.Time{}, // Use consistent time for testing. - Line: ent.Line, - StructuredMetadata: ent.StructuredMetadata, - }, - }) - lr.entriesMut.Unlock() - } - }() - - return lr -} - -func (lr *fakeLogsReceiver) Chan() chan loki.Entry { - return lr.ch -} - -func (lr *fakeLogsReceiver) GetEntries() []loki.Entry { - lr.entriesMut.RLock() - defer lr.entriesMut.RUnlock() - return lr.entries } diff --git a/internal/component/faro/receiver/test_utils.go b/internal/component/faro/receiver/test_utils.go new file mode 100644 index 0000000000..cc03767561 --- /dev/null +++ b/internal/component/faro/receiver/test_utils.go @@ -0,0 +1,60 @@ +package receiver + +import ( + "sync" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/loki/v3/pkg/logproto" +) + +type fakeLogsReceiver struct { + ch chan loki.Entry + + entriesMut sync.RWMutex + entries []loki.Entry +} + +var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil) + +func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { + ctx := componenttest.TestContext(t) + + lr := &fakeLogsReceiver{ + ch: make(chan loki.Entry), + } + + go func() { + defer close(lr.ch) + + select { + case <-ctx.Done(): + return + case ent := <-lr.Chan(): + lr.entriesMut.Lock() + lr.entries = append(lr.entries, loki.Entry{ + Labels: ent.Labels, + Entry: logproto.Entry{ + Timestamp: time.Time{}, // Use consistent time for testing. + Line: ent.Line, + StructuredMetadata: ent.StructuredMetadata, + }, + }) + lr.entriesMut.Unlock() + } + }() + + return lr +} + +func (lr *fakeLogsReceiver) Chan() chan loki.Entry { + return lr.ch +} + +func (lr *fakeLogsReceiver) GetEntries() []loki.Entry { + lr.entriesMut.RLock() + defer lr.entriesMut.RUnlock() + return lr.entries +} diff --git a/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy b/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy index c454dbd84c..712f63ae59 100644 --- a/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy +++ b/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy @@ -23,6 +23,7 @@ logging { faro.receiver "integrations_app_agent_receiver" { extra_log_labels = {} + log_format = "" server { listen_address = "localhost" diff --git a/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy b/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy index 9f06a3cdc5..b7b1bf1dc7 100644 --- a/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy +++ b/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy @@ -18,6 +18,7 @@ loki.write "logs_log_config" { faro.receiver "integrations_app_agent_receiver" { extra_log_labels = {} + log_format = "" server { listen_address = "localhost" From 45b21db7d08c5689a3704504ecbf1c6bd0336b6f Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Fri, 13 Dec 2024 08:49:30 +0530 Subject: [PATCH 2/4] Fix docs links --- docs/sources/reference/components/faro/faro.receiver.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/reference/components/faro/faro.receiver.md b/docs/sources/reference/components/faro/faro.receiver.md index 52f3b33622..5101de3ebf 100644 --- a/docs/sources/reference/components/faro/faro.receiver.md +++ b/docs/sources/reference/components/faro/faro.receiver.md @@ -30,13 +30,13 @@ The following arguments are supported: Name | Type | Description | Default | Required -------------------|---------------|----------------------------------------------|---------|--------- `extra_log_labels` | `map(string)` | Extra labels to attach to emitted log lines. | `{}` | no -`log_format` | `string` | Export format for the logs | `logfmt`| no +`log_format` | `string` | Export format for the logs. | `logfmt`| no ### Log format The following strings are recognized as valid log line formats: -* `"logfmt"`: Export logs as [logfmt][] lines. +* `"logfmt"`: Export logs as [logfmt](https://brandur.org/logfmt) lines. * `"json"`: Export logs as JSON objects. ## Blocks From 61f5b9778aa5e4f174c79c1812dd1a3801312035 Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Wed, 18 Dec 2024 22:12:24 +0530 Subject: [PATCH 3/4] Fix rebase issue Fix chan size --- .../component/faro/receiver/receiver_test.go | 51 ++++++++++++++++ .../component/faro/receiver/test_utils.go | 60 ------------------- 2 files changed, 51 insertions(+), 60 deletions(-) delete mode 100644 internal/component/faro/receiver/test_utils.go diff --git a/internal/component/faro/receiver/receiver_test.go b/internal/component/faro/receiver/receiver_test.go index 15c732e3a5..1199ee1709 100644 --- a/internal/component/faro/receiver/receiver_test.go +++ b/internal/component/faro/receiver/receiver_test.go @@ -4,7 +4,9 @@ import ( "fmt" "net/http" "strings" + "sync" "testing" + "time" "github.com/grafana/loki/v3/pkg/logproto" "github.com/phayes/freeport" @@ -139,3 +141,52 @@ func Test(t *testing.T) { }) } } + +type fakeLogsReceiver struct { + ch chan loki.Entry + + entriesMut sync.RWMutex + entries []loki.Entry +} + +var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil) + +func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { + ctx := componenttest.TestContext(t) + + lr := &fakeLogsReceiver{ + ch: make(chan loki.Entry, 1), + } + + go func() { + defer close(lr.ch) + + select { + case <-ctx.Done(): + return + case ent := <-lr.Chan(): + lr.entriesMut.Lock() + lr.entries = append(lr.entries, loki.Entry{ + Labels: ent.Labels, + Entry: logproto.Entry{ + Timestamp: time.Time{}, // Use consistent time for testing. + Line: ent.Line, + StructuredMetadata: ent.StructuredMetadata, + }, + }) + lr.entriesMut.Unlock() + } + }() + + return lr +} + +func (lr *fakeLogsReceiver) Chan() chan loki.Entry { + return lr.ch +} + +func (lr *fakeLogsReceiver) GetEntries() []loki.Entry { + lr.entriesMut.RLock() + defer lr.entriesMut.RUnlock() + return lr.entries +} diff --git a/internal/component/faro/receiver/test_utils.go b/internal/component/faro/receiver/test_utils.go deleted file mode 100644 index cc03767561..0000000000 --- a/internal/component/faro/receiver/test_utils.go +++ /dev/null @@ -1,60 +0,0 @@ -package receiver - -import ( - "sync" - "testing" - "time" - - "github.com/grafana/alloy/internal/component/common/loki" - "github.com/grafana/alloy/internal/runtime/componenttest" - "github.com/grafana/loki/v3/pkg/logproto" -) - -type fakeLogsReceiver struct { - ch chan loki.Entry - - entriesMut sync.RWMutex - entries []loki.Entry -} - -var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil) - -func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { - ctx := componenttest.TestContext(t) - - lr := &fakeLogsReceiver{ - ch: make(chan loki.Entry), - } - - go func() { - defer close(lr.ch) - - select { - case <-ctx.Done(): - return - case ent := <-lr.Chan(): - lr.entriesMut.Lock() - lr.entries = append(lr.entries, loki.Entry{ - Labels: ent.Labels, - Entry: logproto.Entry{ - Timestamp: time.Time{}, // Use consistent time for testing. - Line: ent.Line, - StructuredMetadata: ent.StructuredMetadata, - }, - }) - lr.entriesMut.Unlock() - } - }() - - return lr -} - -func (lr *fakeLogsReceiver) Chan() chan loki.Entry { - return lr.ch -} - -func (lr *fakeLogsReceiver) GetEntries() []loki.Entry { - lr.entriesMut.RLock() - defer lr.entriesMut.RUnlock() - return lr.entries -} From 2167bbff05b62e72f928e4a2957931460e6bfabe Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Thu, 19 Dec 2024 21:49:03 +0530 Subject: [PATCH 4/4] Fix the flaky failure --- internal/component/faro/receiver/receiver_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/component/faro/receiver/receiver_test.go b/internal/component/faro/receiver/receiver_test.go index 1199ee1709..3c987b665b 100644 --- a/internal/component/faro/receiver/receiver_test.go +++ b/internal/component/faro/receiver/receiver_test.go @@ -135,6 +135,7 @@ func Test(t *testing.T) { defer resp.Body.Close() require.Equal(t, http.StatusAccepted, resp.StatusCode) + lr.wg.Wait() // Wait for the fakelogreceiver goroutine to process require.Len(t, lr.GetEntries(), 1) require.Equal(t, tc.expect, lr.entries[0]) @@ -146,6 +147,7 @@ type fakeLogsReceiver struct { ch chan loki.Entry entriesMut sync.RWMutex + wg sync.WaitGroup entries []loki.Entry } @@ -158,6 +160,7 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { ch: make(chan loki.Entry, 1), } + lr.wg.Add(1) go func() { defer close(lr.ch) @@ -175,6 +178,7 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { }, }) lr.entriesMut.Unlock() + lr.wg.Done() } }()