diff --git a/CHANGELOG.md b/CHANGELOG.md index 685f799212..31a33ba9f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Main (unreleased) ### Features +- Add `otelcol.receiver.syslog` component to receive otel logs in syslog format (@dehaansa) + - Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip) - Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index da5471fbfb..aa9a3b043c 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -370,6 +370,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.receiver.otlp](../components/otelcol/otelcol.receiver.otlp) - [otelcol.receiver.prometheus](../components/otelcol/otelcol.receiver.prometheus) - [otelcol.receiver.solace](../components/otelcol/otelcol.receiver.solace) +- [otelcol.receiver.syslog](../components/otelcol/otelcol.receiver.syslog) - [otelcol.receiver.vcenter](../components/otelcol/otelcol.receiver.vcenter) - [otelcol.receiver.zipkin](../components/otelcol/otelcol.receiver.zipkin) {{< /collapse >}} diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md new file mode 100644 index 0000000000..f803480fc8 --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md @@ -0,0 +1,278 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.syslog/ +description: Learn about otelcol.receiver.syslog +title: otelcol.receiver.syslog +--- + +Public preview + +# otelcol.receiver.syslog + +{{< docs/shared lookup="stability/public_preview.md" source="alloy" version="" >}} + +`otelcol.receiver.syslog` accepts syslog messages over the network and forwards them as logs to other `otelcol.*` components. +It supports syslog protocols [RFC5424][] and [RFC3164][] and can receive data over `TCP` or `UDP`. + +{{< admonition type="note" >}} +`otelcol.receiver.syslog` is a wrapper over the upstream OpenTelemetry Collector `syslog` receiver. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +You can specify multiple `otelcol.receiver.syslog` components by giving them different labels. + +[RFC5424]: https://www.rfc-editor.org/rfc/rfc5424 +[RFC3164]: https://www.rfc-editor.org/rfc/rfc3164 + +## Usage + +```alloy +otelcol.receiver.syslog "LABEL" { + tcp { ... } + udp { ... } + + output { + logs = [...] + } +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|-----------------------------------|----------|--------------------------------------------------------------------|-----------|----------| +| `protocol` | `string` | The syslog protocol that the syslog server supports. | `rfc5424` | no | +| `location` | `string` | The geographic time zone to use when parsing an RFC3164 timestamp. | `UTC` | no | +| `enable_octet_counting` | `bool` | Whether to enable RFC6587 octet counting. | `false` | no | +| `max_octets` | `int` | The maximum octets for messages when octet counting is enabled. | `8192` | no | +| `allow_skip_pri_header` | `bool` | Allow parsing records without a priority header. | `false` | no | +| `non_transparent_framing_trailer` | `string` | The framing trailer when using RFC6587 Non-Transparent-Framing. | `nil` | no | + +The `protocol` argument specifies the syslog format supported by the receiver. +`protocol` must be one of `rfc5424` or `rfc3164` + +The `location` argument specifies a Time Zone identifier. The available locations depend on the local IANA Time Zone database. +Refer to the [list of tz database time zones][tz-wiki] in Wikipedia for a non-comprehensive list. + +The `non_transparent_framing_trailer` and `enable_octet_counting` arguments specify TCP syslog behavior as defined in [RFC6587]. +These arguments are mutually exclusive. +They can't be used with a UDP syslog listener configured. +If configured, the `non_transparent_framing_trailer` argument must be one of `LF`, `NUL`. + + +[RFC6587]: https://datatracker.ietf.org/doc/html/rfc6587 +[tz-wiki]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.receiver.syslog`: + + +| Hierarchy | Block | Description | Required | +|------------------|----------------------|-------------------------------------------------------------------------------------------------|----------| +| udp | [udp][] | Configures a UDP syslog server to receive syslog messages. | no* | +| udp > multiline | [multiline][] | Configures rules for multiline parsing of incoming messages. | no | +| udp > async | [async][] | Configures rules for asynchronous parsing of incoming messages. | no | +| tcp | [tcp][] | Configures a TCP syslog server to receive syslog messages. | no* | +| tcp > multiline | [multiline][] | Configures rules for multiline parsing of incoming messages | no | +| tcp > tls | [tls][] | Configures TLS for the TCP syslog server. | no | +| retry_on_failure | [retry_on_failure][] | Configures the retry behavior when the receiver encounters an error downstream in the pipeline. | no | +| debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no | +| output | [output][] | Configures where to send received telemetry data. | yes | + +A syslog receiver must have either a `udp` or `tcp` block configured. + +The `>` symbol indicates deeper levels of nesting. For example, `tcp > tls` +refers to a `tls` block defined inside a `tcp` block. + +[tls]: #tls-block +[udp]: #udp-block +[tcp]: #tcp-block +[multiline]: #multiline-block +[async]: #async-block +[retry_on_failure]: #retry-on-failure-block +[debug_metrics]: #debug_metrics-block +[output]: #output-block + +### udp block + +The `udp` block configures a UDP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|----------|--------------------------------------------------------------------------------------------------------------|---------|----------| +| `listen_address` | `string` | The `` address to listen to for syslog messages. | | yes | +| `one_log_per_packet` | `bool` | Skip log tokenization, improving performance when messages always contain one log and multiline is not used. | `false` | no | +| `add_attributes` | `bool` | Add net.* attributes to log messages according to OpenTelemetry semantic conventions. | `false` | no | +| `encoding` | `string` | The encoding of the syslog messages. | `utf-8` | no | +| `preserve_leading_whitespaces` | `bool` | Preserves leading whitespace in messages when set to `true`. | `false` | no | +| `preserve_trailing_whitespaces` | `bool` | Preserves trailing whitespace in messages when set to `true`. | `false` | no | + +The `encoding` argument specifies the encoding of the incoming syslog messages. +`encoding` must be one of `utf-8`, `utf-16le`, `utf-16be`, `ascii`, `big5`, or `nop`. +Refer to the upstream receiver [documentation][encoding-documentation] for more details. + +### multiline block + +The `multiline` block configures logic for splitting incoming log entries. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|----------------------|----------|-----------------------------------------------------------------|---------|----------| +| `line_start_pattern` | `string` | A regular expression that matches the beginning of a log entry. | | no | +| `line_end_pattern` | `string` | A regular expression that matches the end of a log entry. | | no | +| `omit_pattern` | `bool` | Omit the start/end pattern from the split log entries. | `false` | no | + +A `multiline` block must contain either `line_start_pattern` or `line_end_pattern`. + +If a `multiline` block is not set, log entries will not be split. + +### async block + +The `async` block configures concurrent asynchronous readers for a UDP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|-------|----------------------------------------------------------------------------------|---------|----------| +| `readers` | `int` | The number of goroutines to concurrently read from the UDP syslog server. | `1` | no | +| `processors` | `int` | The number of goroutines to concurrently process logs before sending downstream. | `1` | no | +| `max_queue_length` | `int` | The maximum number of messages to wait for an available processor. | `100` | no | + +If `async` is not set, a single goroutine will read and process messages synchronously. + +### tcp block + +The `tcp` block configures a TCP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|----------|--------------------------------------------------------------------------------------------------------------|---------|----------| +| `listen_address` | `string` | The `` address to listen to for syslog messages. | | yes | +| `max_log_size` | `string` | The maximum size of a log entry to read before failing. | `1MiB` | no | +| `one_log_per_packet` | `bool` | Skip log tokenization, improving performance when messages always contain one log and multiline is not used. | `false` | no | +| `add_attributes` | `bool` | Add net.* attributes to log messages according to OpenTelemetry semantic conventions. | `false` | no | +| `encoding` | `string` | The encoding of the syslog messages. | `utf-8` | no | +| `preserve_leading_whitespaces` | `bool` | Preserves leading whitespace in messages when set to `true`. | `false` | no | +| `preserve_trailing_whitespaces` | `bool` | Preserves trailing whitespace in messages when set to `true`. | `false` | no | + +The `encoding` argument specifies the encoding of the incoming syslog messages. +`encoding` must be one of `utf-8`, `utf-16le`, `utf-16be`, `ascii`, `big5`, `nop`. +See the upstream receiver [documentation][encoding-documentation] for more details. + +The `max_log_size` argument has a minimum value of `64KiB` + +### tls block + +The `tls` block configures TLS settings used for a server. If the `tls` block +isn't provided, TLS won't be used for connections to the server. + +{{< docs/shared lookup="reference/components/otelcol-tls-server-block.md" source="alloy" version="" >}} + +### retry on failure block + +The `retry_on_failure` block configures the retry behavior when the receiver encounters an error downstream in the pipeline. +A backoff algorithm is used to delay the retry upon subsequent failures. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|------------|-----------------------------------------------------------------------------------------------------------|--------------|----------| +| `enabled` | `bool` | If true, the receiver will pause reading a file and attempt to resend the current batch of logs on error. | `false` | no | +| `initial_interval` | `duration` | The time to wait after first failure to retry. | `1s` | no | +| `max_interval` | `duration` | The maximum time to wait after applying backoff logic. | `30s` | no | +| `max_elapsed_time` | `duration` | The maximum age of a message before the data is discarded. | `5m` | no | + +If `max_elapsed_time` is set to `0` data will never be discarded. + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +## Exported fields + +`otelcol.receiver.syslog` does not export any fields. + +## Component health + +`otelcol.receiver.syslog` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.receiver.syslog` does not expose any component-specific debug +information. + +## Debug metrics + +`otelcol.receiver.syslog` does not expose any component-specific debug metrics. + +## Example + +This example proxies syslog messages from the `otelcol.receiver.syslog` receiver to the +`otelcol.exporter.syslog` component, and then sends them on to a `loki.source.syslog` component +before being logged by a `loki.echo` component. This shows how the `otelcol` syslog components +can be used to proxy syslog messages before sending them to another destination. + +Using the `otelcol` syslog components in this way results in the messages being forwarded as sent, +attempting to use the `loki.source.syslog` component for a similar proxy use case requires +careful mapping of any structured data fields through the `otelcol.processor.transform` component. A +very simple example of that can be found in the [`otelcol.exporter.syslog`][exporter-examples] documentation. + +```alloy +otelcol.receiver.syslog "default" { + protocol = "rfc5424" + tcp { + listen_address = "localhost:1515" + } + output { + logs = [otelcol.exporter.syslog.default.input] + } +} + +otelcol.exporter.syslog "default" { + endpoint = "localhost" + network = "tcp" + port = 1514 + protocol = "rfc5424" + enable_octet_counting = false + tls { + insecure = true + } +} + +loki.source.syslog "default" { + listener { + address = "localhost:1514" + protocol = "tcp" + syslog_format = "rfc5424" + label_structured_data = true + use_rfc5424_message = true + } + forward_to = [loki.echo.default.receiver] +} + +loki.echo "default" {} +``` + +[exporter-examples]: ../otelcol.exporter.syslog/#use-the-otelcolprocessortransform-component-to-format-logs-from-lokisourcesyslog +[encoding-documentation]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/syslogreceiver/README.md#supported-encodings + + +## Compatible components + +`otelcol.receiver.syslog` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 4dbf6351b1..0a241163aa 100644 --- a/go.mod +++ b/go.mod @@ -125,6 +125,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.112.0 @@ -143,6 +144,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 github.com/ory/dockertest/v3 v3.8.1 @@ -845,6 +847,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.112.0 github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.112.0 // indirect diff --git a/go.sum b/go.sum index 6076b1a759..38763b2b12 100644 --- a/go.sum +++ b/go.sum @@ -2612,6 +2612,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2client github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.112.0/go.mod h1:F7NU4rHqbTrkfOFH6ZtbSHoD+vLNgfu0MrqDw5n1I8Y= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.112.0 h1:XYTTlyT5xjZKcUaQT05ffltMahw2S2wU7qIWtjLp7XE= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.112.0/go.mod h1:wlnJiEwFYq3920DXOgDby5w6ctv57GJUTkvH4gHoBVA= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.112.0 h1:OtZFEz8PEAcGJFGAI3m1bu8gC3rS8IbtnB5mO8B9AAU= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.112.0/go.mod h1:ykfaFUQlOIuWrWSwc4wtY0TDwWjjGHF/8jNm3lFH0cM= github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.112.0 h1:3zGjQ0pRszCibVGvjqTWDVEDT0D+d5pY8JRy8rV8X9k= github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.112.0/go.mod h1:0vf3+lFg/yOaXwQ17MAF2JmBkTGeq09qR+ftaJQqN08= github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.112.0 h1:PVgAm7sIQUOS8TtX5ANV+hHn67vW6cW6uVy3qifccKc= @@ -2716,6 +2718,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusrec github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0/go.mod h1:q2lFBHfnG+ar2DJJlIU6RviOFXDeFur9vJ083NvOMQs= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 h1:cHk8vS/D1pjeZ0o4LJJAENP847HHWjTXFe4y1RJYlfo= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0/go.mod h1:2CK7Hh6UGLnBSGW7Y0nopvEhoo25D6t/395jFEephEs= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0 h1:GNWvYGjT08ByMbKuvY/uB57TQYrPJc/aF+nnpraELgU= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0/go.mod h1:U0XNYcs+DJTwElKNKXADGBpQLIFrrEKAI78PzqOVl/E= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 h1:Vv1FDwd7pykzj8Wmuc7yj7bcN0qUv1mGBb/dcTMPfNE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0/go.mod h1:lklLK8ELD2Wk5z7ywjaf6XEbbViDtf7uK8jAExjRlls= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 h1:XhKHjEpQJQMaUuWVhWS1FEuaY4LJDwBgsGXE166j9SY= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 65d48a019e..d82838a2ab 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -96,8 +96,8 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling _ "github.com/grafana/alloy/internal/component/otelcol/processor/transform" // Import otelcol.processor.transform _ "github.com/grafana/alloy/internal/component/otelcol/receiver/datadog" // Import otelcol.receiver.datadog - _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/file_stats" // Import otelcol.receiver.file_stats + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger _ "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka _ "github.com/grafana/alloy/internal/component/otelcol/receiver/loki" // Import otelcol.receiver.loki @@ -105,6 +105,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp _ "github.com/grafana/alloy/internal/component/otelcol/receiver/prometheus" // Import otelcol.receiver.prometheus _ "github.com/grafana/alloy/internal/component/otelcol/receiver/solace" // Import otelcol.receiver.solace + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" // Import otelcol.receiver.syslog _ "github.com/grafana/alloy/internal/component/otelcol/receiver/vcenter" // Import otelcol.receiver.vcenter _ "github.com/grafana/alloy/internal/component/otelcol/receiver/zipkin" // Import otelcol.receiver.zipkin _ "github.com/grafana/alloy/internal/component/prometheus/exporter/apache" // Import prometheus.exporter.apache diff --git a/internal/component/otelcol/config_consumer_retry.go b/internal/component/otelcol/config_consumer_retry.go new file mode 100644 index 0000000000..d687788606 --- /dev/null +++ b/internal/component/otelcol/config_consumer_retry.go @@ -0,0 +1,30 @@ +package otelcol + +import ( + "time" + + "github.com/grafana/alloy/syntax" +) + +// ConsumerRetryArguments holds shared settings for stanza receivers which can retry +// requests. There is no Convert functionality as the consumerretry package is stanza internal +type ConsumerRetryArguments struct { + Enabled bool `alloy:"enabled,attr,optional"` + InitialInterval time.Duration `alloy:"initial_interval,attr,optional"` + MaxInterval time.Duration `alloy:"max_interval,attr,optional"` + MaxElapsedTime time.Duration `alloy:"max_elapsed_time,attr,optional"` +} + +var ( + _ syntax.Defaulter = (*ConsumerRetryArguments)(nil) +) + +// SetToDefault implements syntax.Defaulter. +func (args *ConsumerRetryArguments) SetToDefault() { + *args = ConsumerRetryArguments{ + Enabled: false, + InitialInterval: 1 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} diff --git a/internal/component/otelcol/receiver/syslog/syslog.go b/internal/component/otelcol/receiver/syslog/syslog.go new file mode 100644 index 0000000000..842228be86 --- /dev/null +++ b/internal/component/otelcol/receiver/syslog/syslog.go @@ -0,0 +1,325 @@ +// Package syslog provides an otelcol.receiver.syslog component. +package syslog + +import ( + "fmt" + "net" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/receiver" + "github.com/grafana/alloy/internal/featuregate" + "github.com/hashicorp/go-multierror" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + stanzainputsyslog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/syslog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" + stanzainputtcp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" + stanzainputudp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" + stanzaparsersyslog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.syslog", + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := syslogreceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.syslog component. +type Arguments struct { + Protocol config.SysLogFormat `alloy:"protocol,attr,optional"` + Location string `alloy:"location,attr,optional"` + EnableOctetCounting bool `alloy:"enable_octet_counting,attr,optional"` + MaxOctets int `alloy:"max_octets,attr,optional"` + AllowSkipPriHeader bool `alloy:"allow_skip_pri_header,attr,optional"` + NonTransparentFramingTrailer *FramingTrailer `alloy:"non_transparent_framing_trailer,attr,optional"` + + ConsumerRetry otelcol.ConsumerRetryArguments `alloy:"retry_on_failure,block,optional"` + TCP *TCP `alloy:"tcp,block,optional"` + UDP *UDP `alloy:"udp,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` + + // Output configures where to send received data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` +} + +type FramingTrailer string + +var NULTrailer FramingTrailer = "NUL" +var LFTrailer FramingTrailer = "LF" + +// MarshalText implements encoding.TextMarshaler +func (s FramingTrailer) MarshalText() (text []byte, err error) { + return []byte(s), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler +func (s *FramingTrailer) UnmarshalText(text []byte) error { + str := string(text) + switch str { + case "NUL": + *s = NULTrailer + case "LF": + *s = LFTrailer + default: + return fmt.Errorf("unknown syslog format: %s", str) + } + + return nil +} + +// Values taken from tcp input Build function +const tcpDefaultMaxLogSize = helper.ByteSize(tcp.DefaultMaxLogSize) +const minMaxLogSize = helper.ByteSize(64 * 1024) + +type TCP struct { + MaxLogSize units.Base2Bytes `alloy:"max_log_size,attr,optional"` + ListenAddress string `alloy:"listen_address,attr,optional"` + TLS *otelcol.TLSServerArguments `alloy:"tls,block,optional"` + AddAttributes bool `alloy:"add_attributes,attr,optional"` + OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *TrimConfig `alloy:",squash"` +} + +type UDP struct { + ListenAddress string `alloy:"listen_address,attr,optional"` + OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` + AddAttributes bool `alloy:"add_attributes,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *TrimConfig `alloy:",squash"` + Async *AsyncConfig `alloy:"async,block,optional"` +} + +type TrimConfig struct { + PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespaces,attr,optional"` + PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespaces,attr,optional"` +} + +func (c *TrimConfig) Convert() *trim.Config { + if c == nil { + return nil + } + + return &trim.Config{ + PreserveLeading: c.PreserveLeadingWhitespace, + PreserveTrailing: c.PreserveTrailingWhitespace, + } +} + +type MultilineConfig struct { + LineStartPattern string `alloy:"line_start_pattern,attr,optional"` + LineEndPattern string `alloy:"line_end_pattern,attr,optional"` + OmitPattern bool `alloy:"omit_pattern,attr,optional"` +} + +func (c *MultilineConfig) Convert() *split.Config { + if c == nil { + return nil + } + + return &split.Config{ + LineStartPattern: c.LineStartPattern, + LineEndPattern: c.LineEndPattern, + OmitPattern: c.OmitPattern, + } +} + +type AsyncConfig struct { + Readers int `alloy:"readers,attr,optional"` + Processors int `alloy:"processors,attr,optional"` + MaxQueueLength int `alloy:"max_queue_length,attr,optional"` +} + +func (c *AsyncConfig) Convert() *stanzainputudp.AsyncConfig { + if c == nil { + return nil + } + + return &stanzainputudp.AsyncConfig{ + Readers: c.Readers, + Processors: c.Processors, + MaxQueueLength: c.MaxQueueLength, + } +} + +var _ receiver.Arguments = Arguments{} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Location: "UTC", + Protocol: config.SyslogFormatRFC5424, + Output: &otelcol.ConsumerArguments{}, + } + args.DebugMetrics.SetToDefault() + args.ConsumerRetry.SetToDefault() +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + + c := stanzainputsyslog.NewConfig() + c.BaseConfig = stanzaparsersyslog.BaseConfig{ + Protocol: string(args.Protocol), + Location: args.Location, + EnableOctetCounting: args.EnableOctetCounting, + MaxOctets: args.MaxOctets, + AllowSkipPriHeader: args.AllowSkipPriHeader, + } + + if args.NonTransparentFramingTrailer != nil { + s := string(*args.NonTransparentFramingTrailer) + c.BaseConfig.NonTransparentFramingTrailer = &s + } + + if args.TCP != nil { + c.TCP = &stanzainputtcp.BaseConfig{ + MaxLogSize: helper.ByteSize(args.TCP.MaxLogSize), + ListenAddress: args.TCP.ListenAddress, + TLS: args.TCP.TLS.Convert(), + AddAttributes: args.TCP.AddAttributes, + OneLogPerPacket: args.TCP.OneLogPerPacket, + Encoding: args.TCP.Encoding, + } + if c.TCP.MaxLogSize == 0 { + c.TCP.MaxLogSize = tcpDefaultMaxLogSize + } + split := args.TCP.MultilineConfig.Convert() + if split != nil { + c.TCP.SplitConfig = *split + } + trim := args.TCP.TrimConfig.Convert() + if trim != nil { + c.TCP.TrimConfig = *trim + } + } + + if args.UDP != nil { + c.UDP = &stanzainputudp.BaseConfig{ + ListenAddress: args.UDP.ListenAddress, + OneLogPerPacket: args.UDP.OneLogPerPacket, + AddAttributes: args.UDP.AddAttributes, + Encoding: args.UDP.Encoding, + } + split := args.UDP.MultilineConfig.Convert() + if split != nil { + c.UDP.SplitConfig = *split + } + trim := args.UDP.TrimConfig.Convert() + if trim != nil { + c.UDP.TrimConfig = *trim + } + async := args.UDP.Async.Convert() + if async != nil { + c.UDP.AsyncConfig = async + } + } + + def := syslogreceiver.ReceiverType{}.CreateDefaultConfig() + cfg := def.(*syslogreceiver.SysLogConfig) + cfg.InputConfig = *c + + // consumerretry package is stanza internal so we can't just Convert + cfg.RetryOnFailure.Enabled = args.ConsumerRetry.Enabled + cfg.RetryOnFailure.InitialInterval = args.ConsumerRetry.InitialInterval + cfg.RetryOnFailure.MaxInterval = args.ConsumerRetry.MaxInterval + cfg.RetryOnFailure.MaxElapsedTime = args.ConsumerRetry.MaxElapsedTime + + return cfg, nil +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[pipeline.Signal]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + var errs error + if args.TCP == nil && args.UDP == nil { + errs = multierror.Append(errs, fmt.Errorf("at least one of 'tcp' or 'udp' must be configured")) + } + + if args.Protocol != config.SyslogFormatRFC3164 && args.Protocol != config.SyslogFormatRFC5424 { + errs = multierror.Append(errs, fmt.Errorf("invalid protocol, must be one of 'rfc3164', 'rfc5424': %s", args.Protocol)) + } + + if args.TCP != nil { + if err := validateListenAddress(args.TCP.ListenAddress, "tcp.listen_address"); err != nil { + errs = multierror.Append(errs, err) + } + + if args.NonTransparentFramingTrailer != nil && *args.NonTransparentFramingTrailer != LFTrailer && *args.NonTransparentFramingTrailer != NULTrailer { + errs = multierror.Append(errs, fmt.Errorf("invalid non_transparent_framing_trailer, must be one of 'LF', 'NUL': %s", *args.NonTransparentFramingTrailer)) + } + + _, err := decode.LookupEncoding(args.TCP.Encoding) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid tcp.encoding: %w", err)) + } + + if args.TCP.MaxLogSize != 0 && (int64(args.TCP.MaxLogSize) < int64(minMaxLogSize)) { + errs = multierror.Append(errs, fmt.Errorf("invalid value %d for parameter 'tcp.max_log_size', must be equal to or greater than %d bytes", args.TCP.MaxLogSize, minMaxLogSize)) + } + } + + if args.UDP != nil { + if err := validateListenAddress(args.UDP.ListenAddress, "udp.listen_address"); err != nil { + errs = multierror.Append(errs, err) + } + + _, err := decode.LookupEncoding(args.UDP.Encoding) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid udp.encoding: %w", err)) + } + } + + return errs +} + +func validateListenAddress(url string, urlName string) error { + if url == "" { + return fmt.Errorf("%s cannot be empty", urlName) + } + + if _, _, err := net.SplitHostPort(url); err != nil { + return fmt.Errorf("invalid %s: %w", urlName, err) + } + return nil +} + +// DebugMetricsConfig implements receiver.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/receiver/syslog/syslog_test.go b/internal/component/otelcol/receiver/syslog/syslog_test.go new file mode 100644 index 0000000000..c9ea563a26 --- /dev/null +++ b/internal/component/otelcol/receiver/syslog/syslog_test.go @@ -0,0 +1,181 @@ +package syslog_test + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" + "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax" + "github.com/grafana/dskit/backoff" + "github.com/phayes/freeport" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" +) + +// Test performs a basic integration test which runs the otelcol.receiver.syslog +// component and ensures that it can receive and forward data. +func Test(t *testing.T) { + tcp := getFreeAddr(t) + + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.syslog") + require.NoError(t, err) + + cfg := fmt.Sprintf(` + protocol = "rfc5424" + tcp { + listen_address = "%s" + } + + output { + // no-op: will be overridden by test code. + } + `, tcp) + + require.NoError(t, err) + + var args syslog.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + + // Override our settings so logs get forwarded to logsCh. + logCh := make(chan plog.Logs) + args.Output = makeLogsOutput(logCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(3*time.Second)) + // TODO(@dehaansa) - test if this is removeable after https://github.com/grafana/alloy/pull/2262 + time.Sleep(1 * time.Second) + + // Send traces in the background to our receiver. + go func() { + request := func() error { + conn, err := net.Dial("tcp", tcp) + require.NoError(t, err) + defer conn.Close() + + _, err = fmt.Fprint(conn, "<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey=\"1\"] An application event log entry...\n") + return err + } + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + if err := request(); err != nil { + level.Error(l).Log("msg", "failed to send logs", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our client to get a span. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for logs") + case log := <-logCh: + require.Equal(t, 1, log.LogRecordCount()) + } +} + +// makeLogsOutput returns ConsumerArguments which will forward logs to the +// provided channel. +func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments { + logsConsumer := fakeconsumer.Consumer{ + ConsumeLogsFunc: func(ctx context.Context, l plog.Logs) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- l: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Logs: []otelcol.Consumer{&logsConsumer}, + } +} + +func getFreeAddr(t *testing.T) string { + t.Helper() + + portNumber, err := freeport.GetFreePort() + require.NoError(t, err) + + return fmt.Sprintf("127.0.0.1:%d", portNumber) +} + +func TestUnmarshal(t *testing.T) { + alloyCfg := ` + protocol = "rfc5424" + location = "UTC" + enable_octet_counting = true + max_octets = 16000 + allow_skip_pri_header = true + non_transparent_framing_trailer = "NUL" + + tcp { + listen_address = "localhost:1514" + max_log_size = "2MiB" + one_log_per_packet = true + add_attributes = true + encoding = "utf-16be" + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + tls { + include_system_ca_certs_pool = true + reload_interval = "1m" + } + } + + udp { + listen_address = "localhost:1515" + one_log_per_packet = false + add_attributes = false + encoding = "utf-16le" + preserve_leading_whitespaces = false + preserve_trailing_whitespaces = false + async { + readers = 2 + processors = 4 + max_queue_length = 1000 + } + multiline { + line_end_pattern = "logend" + omit_pattern = true + } + + } + + retry_on_failure { + enabled = true + initial_interval = "10s" + max_interval = "1m" + max_elapsed_time = "10m" + } + + output { + } + ` + var args syslog.Arguments + err := syntax.Unmarshal([]byte(alloyCfg), &args) + require.NoError(t, err) +} diff --git a/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go new file mode 100644 index 0000000000..014abc0395 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go @@ -0,0 +1,128 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" +) + +func init() { + converters = append(converters, syslogReceiverConverter{}) +} + +type syslogReceiverConverter struct{} + +func (syslogReceiverConverter) Factory() component.Factory { + return syslogreceiver.NewFactory() +} + +func (syslogReceiverConverter) InputComponentName() string { + return "otelcol.receiver.syslog" +} + +func (syslogReceiverConverter) ConvertAndAppend(state *State, id componentstatus.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toOtelcolReceiversyslog(cfg.(*syslogreceiver.SysLogConfig)) + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "syslog"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments { + args := &syslog.Arguments{ + Protocol: config.SysLogFormat(cfg.InputConfig.Protocol), + Location: cfg.InputConfig.Location, + EnableOctetCounting: cfg.InputConfig.EnableOctetCounting, + AllowSkipPriHeader: cfg.InputConfig.AllowSkipPriHeader, + MaxOctets: cfg.InputConfig.MaxOctets, + DebugMetrics: common.DefaultValue[syslog.Arguments]().DebugMetrics, + } + + if cfg.InputConfig.NonTransparentFramingTrailer != nil { + trailer := syslog.FramingTrailer(*cfg.InputConfig.NonTransparentFramingTrailer) + args.NonTransparentFramingTrailer = &trailer + } + + if cfg.InputConfig.TCP != nil { + args.TCP = &syslog.TCP{ + MaxLogSize: units.Base2Bytes(cfg.InputConfig.TCP.MaxLogSize), + ListenAddress: cfg.InputConfig.TCP.ListenAddress, + TLS: toTLSServerArguments(cfg.InputConfig.TCP.TLS), + AddAttributes: cfg.InputConfig.TCP.AddAttributes, + OneLogPerPacket: cfg.InputConfig.TCP.OneLogPerPacket, + Encoding: cfg.InputConfig.TCP.Encoding, + MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.TCP.SplitConfig), + TrimConfig: toSyslogTrimConfig(cfg.InputConfig.TCP.TrimConfig), + } + } + + if cfg.InputConfig.UDP != nil { + args.UDP = &syslog.UDP{ + ListenAddress: cfg.InputConfig.UDP.ListenAddress, + OneLogPerPacket: cfg.InputConfig.UDP.OneLogPerPacket, + AddAttributes: cfg.InputConfig.UDP.AddAttributes, + Encoding: cfg.InputConfig.UDP.Encoding, + MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.UDP.SplitConfig), + TrimConfig: toSyslogTrimConfig(cfg.InputConfig.UDP.TrimConfig), + Async: toSyslogAsyncConfig(cfg.InputConfig.UDP.AsyncConfig), + } + } + + // This isn't done in a function because the type is not exported + args.ConsumerRetry = otelcol.ConsumerRetryArguments{ + Enabled: cfg.RetryOnFailure.Enabled, + InitialInterval: cfg.RetryOnFailure.InitialInterval, + MaxInterval: cfg.RetryOnFailure.MaxInterval, + MaxElapsedTime: cfg.RetryOnFailure.MaxElapsedTime, + } + + return args + +} + +func toSyslogMultilineConfig(cfg split.Config) *syslog.MultilineConfig { + return &syslog.MultilineConfig{ + LineStartPattern: cfg.LineStartPattern, + LineEndPattern: cfg.LineEndPattern, + OmitPattern: cfg.OmitPattern, + } +} + +func toSyslogTrimConfig(cfg trim.Config) *syslog.TrimConfig { + return &syslog.TrimConfig{ + PreserveLeadingWhitespace: cfg.PreserveLeading, + PreserveTrailingWhitespace: cfg.PreserveTrailing, + } +} + +func toSyslogAsyncConfig(cfg *udp.AsyncConfig) *syslog.AsyncConfig { + if cfg == nil { + return nil + } + + return &syslog.AsyncConfig{ + Readers: cfg.Readers, + Processors: cfg.Processors, + MaxQueueLength: cfg.MaxQueueLength, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/syslog.alloy b/internal/converter/internal/otelcolconvert/testdata/syslog.alloy new file mode 100644 index 0000000000..ff0f9af32b --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/syslog.alloy @@ -0,0 +1,55 @@ +otelcol.receiver.syslog "default" { + enable_octet_counting = true + max_octets = 16000 + allow_skip_pri_header = true + non_transparent_framing_trailer = "NUL" + + retry_on_failure { + enabled = true + initial_interval = "10s" + max_interval = "1m0s" + max_elapsed_time = "10m0s" + } + + tcp { + max_log_size = "2MiB" + listen_address = "localhost:1514" + + tls { + reload_interval = "1m0s" + include_system_ca_certs_pool = true + } + add_attributes = true + one_log_per_packet = true + encoding = "utf-16be" + + multiline { } + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + } + + udp { + listen_address = "localhost:1515" + encoding = "utf-16le" + + multiline { + line_end_pattern = "logend" + omit_pattern = true + } + + async { + readers = 2 + processors = 4 + max_queue_length = 1000 + } + } +} + +otelcol.exporter.syslog "default" { + tls { + insecure_skip_verify = true + } + endpoint = "localhost" + port = 1514 + enable_octet_counting = true +} diff --git a/internal/converter/internal/otelcolconvert/testdata/syslog.yaml b/internal/converter/internal/otelcolconvert/testdata/syslog.yaml new file mode 100644 index 0000000000..f6a44160a6 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/syslog.yaml @@ -0,0 +1,57 @@ +receivers: + syslog: + location: "UTC" + protocol: "rfc5424" + enable_octet_counting: true + max_octets: 16000 + allow_skip_pri_header: true + non_transparent_framing_trailer: "NUL" + tcp: + listen_address: "localhost:1514" + max_log_size: "2MiB" + one_log_per_packet: true + add_attributes: true + encoding: "utf-16be" + preserve_leading_whitespaces: true + preserve_trailing_whitespaces: true + tls: + include_system_ca_certs_pool: true + reload_interval: "1m" + udp: + listen_address: "localhost:1515" + one_log_per_packet: false + add_attributes: false + encoding: "utf-16le" + preserve_leading_whitespaces: false + preserve_trailing_whitespaces: false + async: + readers: 2 + processors: 4 + max_queue_length: 1000 + multiline: + line_end_pattern: "logend" + omit_pattern: true + retry_on_failure: + enabled: true + initial_interval: "10s" + max_interval: "1m" + max_elapsed_time: "10m" + + +exporters: + syslog: + endpoint: localhost + port: 1514 + protocol: "rfc5424" + network: "tcp" + enable_octet_counting: true + tls: + insecure: false + insecure_skip_verify: true + +service: + pipelines: + logs: + receivers: [syslog] + processors: [] + exporters: [syslog]