diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a5c323be1..4b60626be8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,34 @@ Changelog All notable changes to this project will be documented in this file. -## Unreleased +## 4.26.0 - 2024-03-18 + +### Added + +- Field `credit` added to the `amqp_1` input to specify the maximum number of unacknowledged messages the sender can transmit. +- Bloblang now supports root-level `if` statements. +- New experimental `sql` cache. +- Fields `batch_size`, `sort` and `limit` added to the `mongodb` input. +- Field `idemponent_write` added to the `kafka` output. + +### Changed + +- The default value of the `amqp_1.credit` input has changed from `1` to `64`. +- The `mongodb` processor and output now support extended JSON in canonical form for document, filter and hint mappings. +- The `open_telemetry_collector` tracer has had the `url` field of gRPC and HTTP collectors deprecated in favour of `address`, which more accurately describes the intended format of endpoints. The old style will continue to work, but eventually will have its default value removed and an explicit value will be required. + +### Fixed + +- Resource config imports containing `%` characters were being incorrectly parsed during unit test execution. This was a regression introduced in v4.25.0. +- Dynamic input and output config updates containing `%` characters were being incorrectly parsed. This was a regression introduced in v4.25.0. + +## 4.25.1 - 2024-03-01 + +### Fixed + +- Fixed a regression in v4.25.0 where [template based components](https://www.benthos.dev/docs/configuration/templating) were not parsing correctly from configs. + +## 4.25.0 - 2024-03-01 ### Added diff --git a/internal/cli/test/processors_provider.go b/internal/cli/test/processors_provider.go index 371b7db2e9..359d57ec3c 100644 --- a/internal/cli/test/processors_provider.go +++ b/internal/cli/test/processors_provider.go @@ -16,7 +16,6 @@ import ( "github.com/benthosdev/benthos/v4/internal/bloblang/parser" "github.com/benthosdev/benthos/v4/internal/bundle" "github.com/benthosdev/benthos/v4/internal/component/processor" - "github.com/benthosdev/benthos/v4/internal/component/testutil" "github.com/benthosdev/benthos/v4/internal/config" "github.com/benthosdev/benthos/v4/internal/docs" "github.com/benthosdev/benthos/v4/internal/filepath/ifs" @@ -345,7 +344,12 @@ func (p *ProcessorsProvider) getConfs(jsonPtr string, environment map[string]str return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err) } - extraMgrWrapper, err := testutil.ManagerFromYAML(string(resourceBytes)) + confNode, err := docs.UnmarshalYAML(resourceBytes) + if err != nil { + return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err) + } + + extraMgrWrapper, err := manager.FromAny(bundle.GlobalEnvironment, confNode) if err != nil { return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err) } diff --git a/internal/impl/aws/cache_s3_integration_test.go b/internal/impl/aws/cache_s3_integration_test.go deleted file mode 100644 index 416e7b761c..0000000000 --- a/internal/impl/aws/cache_s3_integration_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package aws - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/ory/dockertest/v3" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/benthosdev/benthos/v4/internal/integration" -) - -func createBucket(ctx context.Context, s3Port, bucket string) error { - endpoint := fmt.Sprintf("http://localhost:%v", s3Port) - - conf, err := config.LoadDefaultConfig(ctx, - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")), - ) - if err != nil { - return err - } - - conf.BaseEndpoint = &endpoint - - client := s3.NewFromConfig(conf, func(o *s3.Options) { - o.UsePathStyle = true - }) - - _, err = client.CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: &bucket, - }) - if err != nil { - return err - } - - waiter := s3.NewBucketExistsWaiter(client) - return waiter.Wait(ctx, &s3.HeadBucketInput{ - Bucket: &bucket, - }, time.Minute) -} - -func TestIntegrationS3Cache(t *testing.T) { - integration.CheckSkip(t) - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pool.MaxWait = time.Second * 30 - - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "localstack/localstack", - ExposedPorts: []string{"4566/tcp"}, - Env: []string{"SERVICES=s3"}, - }) - require.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, pool.Purge(resource)) - }) - - servicePort := resource.GetPort("4566/tcp") - - _ = resource.Expire(900) - require.NoError(t, pool.Retry(func() error { - return createBucket(context.Background(), servicePort, "probe-bucket") - })) - - template := ` -cache_resources: - - label: testcache - aws_s3: - endpoint: http://localhost:$PORT - region: eu-west-1 - force_path_style_urls: true - bucket: $ID - credentials: - id: xxxxx - secret: xxxxx - token: xxxxx -` - suite := integration.CacheTests( - integration.CacheTestOpenClose(), - integration.CacheTestMissingKey(), - integration.CacheTestDoubleAdd(), - integration.CacheTestDelete(), - integration.CacheTestGetAndSet(1), - ) - suite.Run( - t, template, - integration.CacheTestOptPort(servicePort), - integration.CacheTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.CacheTestConfigVars) { - require.NoError(t, createBucket(ctx, servicePort, testID)) - }), - ) -} diff --git a/internal/impl/aws/integration_kinesis_test.go b/internal/impl/aws/integration_kinesis_test.go index 61780c2c8b..1e8d09f0e6 100644 --- a/internal/impl/aws/integration_kinesis_test.go +++ b/internal/impl/aws/integration_kinesis_test.go @@ -10,8 +10,6 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" - "github.com/ory/dockertest/v3" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/benthosdev/benthos/v4/internal/integration" @@ -71,35 +69,7 @@ func createKinesisShards(ctx context.Context, t testing.TB, awsPort, id string, return shards, nil } -func TestIntegrationAWSKinesis(t *testing.T) { - integration.CheckSkip(t) - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pool.MaxWait = time.Minute * 2 - if dline, ok := t.Deadline(); ok && time.Until(dline) < pool.MaxWait { - pool.MaxWait = time.Until(dline) - } - - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "localstack/localstack", - ExposedPorts: []string{"4566/tcp"}, - Env: []string{"SERVICES=dynamodb,kinesis"}, - }) - require.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, pool.Purge(resource)) - }) - - _ = resource.Expire(900) - - require.NoError(t, pool.Retry(func() error { - _, err := createKinesisShards(context.Background(), t, resource.GetPort("4566/tcp"), "testtable", 2) - return err - })) - +func kinesisIntegrationSuite(t *testing.T, lsPort string) { template := ` output: aws_kinesis: @@ -146,7 +116,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { streamName := "stream-" + testID - shards, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 2) + shards, err := createKinesisShards(ctx, t, lsPort, testID, 2) require.NoError(t, err) for i, shard := range shards { @@ -157,7 +127,7 @@ input: } } }), - integration.StreamTestOptPort(resource.GetPort("4566/tcp")), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), integration.StreamTestOptVarTwo("10"), ) @@ -167,10 +137,10 @@ input: suite.Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - _, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 2) + _, err := createKinesisShards(ctx, t, lsPort, testID, 2) require.NoError(t, err) }), - integration.StreamTestOptPort(resource.GetPort("4566/tcp")), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), integration.StreamTestOptVarTwo("10"), ) @@ -182,11 +152,11 @@ input: ).Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - shards, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 1) + shards, err := createKinesisShards(ctx, t, lsPort, testID, 1) require.NoError(t, err) vars.Var1 = ":" + shards[0] }), - integration.StreamTestOptPort(resource.GetPort("4566/tcp")), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), integration.StreamTestOptVarTwo("10"), ) diff --git a/internal/impl/aws/integration_aws_test.go b/internal/impl/aws/integration_s3_test.go similarity index 60% rename from internal/impl/aws/integration_aws_test.go rename to internal/impl/aws/integration_s3_test.go index 29e15d1aed..a04e68792c 100644 --- a/internal/impl/aws/integration_aws_test.go +++ b/internal/impl/aws/integration_s3_test.go @@ -2,7 +2,9 @@ package aws import ( "context" + "crypto/rand" "fmt" + "sync" "testing" "time" @@ -13,16 +15,60 @@ import ( s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sqs" sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/gofrs/uuid" - "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/benthosdev/benthos/v4/internal/bundle" + "github.com/benthosdev/benthos/v4/internal/component/output" "github.com/benthosdev/benthos/v4/internal/integration" + "github.com/benthosdev/benthos/v4/internal/manager" + "github.com/benthosdev/benthos/v4/internal/message" _ "github.com/benthosdev/benthos/v4/internal/impl/pure" ) +func createBucket(ctx context.Context, s3Port, bucket string) error { + endpoint := fmt.Sprintf("http://localhost:%v", s3Port) + + conf, err := config.LoadDefaultConfig(ctx, + config.WithRegion("eu-west-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")), + config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: endpoint, + SigningRegion: "eu-west-1", + }, nil + })), + ) + if err != nil { + return err + } + + client := s3.NewFromConfig(conf, func(o *s3.Options) { + o.UsePathStyle = true + }) + + _, err = client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: &bucket, + CreateBucketConfiguration: &s3types.CreateBucketConfiguration{ + Location: &s3types.LocationInfo{ + Name: aws.String("eu-west-1"), + Type: s3types.LocationTypeAvailabilityZone, + }, + LocationConstraint: s3types.BucketLocationConstraintEuWest1, + }, + }) + if err != nil { + return err + } + + waiter := s3.NewBucketExistsWaiter(client) + return waiter.Wait(ctx, &s3.HeadBucketInput{ + Bucket: &bucket, + }, time.Minute) +} + func createBucketQueue(ctx context.Context, s3Port, sqsPort, id string) error { endpoint := fmt.Sprintf("http://localhost:%v", s3Port) bucket := "bucket-" + id @@ -35,12 +81,19 @@ func createBucketQueue(ctx context.Context, s3Port, sqsPort, id string) error { var s3Client *s3.Client if s3Port != "" { conf, err := config.LoadDefaultConfig(ctx, + config.WithRegion("eu-west-1"), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")), + config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: endpoint, + SigningRegion: "eu-west-1", + }, nil + })), ) if err != nil { return err } - conf.BaseEndpoint = &endpoint s3Client = s3.NewFromConfig(conf, func(o *s3.Options) { o.UsePathStyle = true @@ -63,6 +116,13 @@ func createBucketQueue(ctx context.Context, s3Port, sqsPort, id string) error { if s3Client != nil { if _, err := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ Bucket: &bucket, + CreateBucketConfiguration: &s3types.CreateBucketConfiguration{ + Location: &s3types.LocationInfo{ + Name: aws.String("eu-west-1"), + Type: s3types.LocationTypeAvailabilityZone, + }, + LocationConstraint: s3types.BucketLocationConstraintEuWest1, + }, }); err != nil { return fmt.Errorf("create bucket: %w", err) } @@ -117,37 +177,8 @@ func createBucketQueue(ctx context.Context, s3Port, sqsPort, id string) error { return nil } -func TestIntegrationAWS(t *testing.T) { - integration.CheckSkip(t) - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pool.MaxWait = time.Minute * 3 - - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "localstack/localstack", - ExposedPorts: []string{"4566/tcp"}, - Env: []string{"SERVICES=s3,sqs"}, - }) - require.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, pool.Purge(resource)) - }) - - _ = resource.Expire(900) - - servicePort := resource.GetPort("4566/tcp") - - require.NoError(t, pool.Retry(func() (err error) { - u4, err := uuid.NewV4() - require.NoError(t, err) - - return createBucketQueue(context.Background(), servicePort, servicePort, u4.String()) - })) - - t.Run("s3_to_sqs", func(t *testing.T) { +func s3IntegrationSuite(t *testing.T, lsPort string) { + t.Run("via_sqs", func(t *testing.T) { template := ` output: aws_s3: @@ -191,14 +222,14 @@ input: ).Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createBucketQueue(ctx, servicePort, servicePort, testID)) + require.NoError(t, createBucketQueue(ctx, lsPort, lsPort, testID)) }), - integration.StreamTestOptPort(servicePort), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), ) }) - t.Run("s3_to_sqs_lines", func(t *testing.T) { + t.Run("via_sqs_lines", func(t *testing.T) { template := ` output: aws_s3: @@ -246,14 +277,14 @@ input: if vars.OutputBatchCount == 0 { vars.OutputBatchCount = 1 } - require.NoError(t, createBucketQueue(ctx, servicePort, servicePort, testID)) + require.NoError(t, createBucketQueue(ctx, lsPort, lsPort, testID)) }), - integration.StreamTestOptPort(servicePort), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), ) }) - t.Run("s3_to_sqs_lines_old_codec", func(t *testing.T) { + t.Run("via_sqs_lines_old_codec", func(t *testing.T) { template := ` output: aws_s3: @@ -301,14 +332,14 @@ input: if vars.OutputBatchCount == 0 { vars.OutputBatchCount = 1 } - require.NoError(t, createBucketQueue(ctx, servicePort, servicePort, testID)) + require.NoError(t, createBucketQueue(ctx, lsPort, lsPort, testID)) }), - integration.StreamTestOptPort(servicePort), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptAllowDupes(), ) }) - t.Run("s3", func(t *testing.T) { + t.Run("batch", func(t *testing.T) { template := ` output: aws_s3: @@ -342,51 +373,113 @@ input: ).Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createBucketQueue(ctx, servicePort, "", testID)) + require.NoError(t, createBucketQueue(ctx, lsPort, "", testID)) }), - integration.StreamTestOptPort(servicePort), + integration.StreamTestOptPort(lsPort), integration.StreamTestOptVarOne("false"), ) }) - t.Run("sqs", func(t *testing.T) { + t.Run("cache", func(t *testing.T) { template := ` -output: - aws_sqs: - url: http://localhost:$PORT/000000000000/queue-$ID - endpoint: http://localhost:$PORT - region: eu-west-1 - credentials: - id: xxxxx - secret: xxxxx - token: xxxxx - max_in_flight: $MAX_IN_FLIGHT - batching: - count: $OUTPUT_BATCH_COUNT - -input: - aws_sqs: - url: http://localhost:$PORT/000000000000/queue-$ID - endpoint: http://localhost:$PORT - region: eu-west-1 - credentials: - id: xxxxx - secret: xxxxx - token: xxxxx +cache_resources: + - label: testcache + aws_s3: + endpoint: http://localhost:$PORT + region: eu-west-1 + force_path_style_urls: true + bucket: $ID + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx ` - integration.StreamTests( - integration.StreamTestOpenClose(), - integration.StreamTestSendBatch(10), - integration.StreamTestStreamSequential(50), - integration.StreamTestStreamParallel(50), - integration.StreamTestStreamParallelLossy(50), - integration.StreamTestStreamParallelLossyThroughReconnect(50), - ).Run( + suite := integration.CacheTests( + integration.CacheTestOpenClose(), + integration.CacheTestMissingKey(), + integration.CacheTestDoubleAdd(), + integration.CacheTestDelete(), + integration.CacheTestGetAndSet(1), + ) + suite.Run( t, template, - integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createBucketQueue(ctx, "", servicePort, testID)) + integration.CacheTestOptPort(lsPort), + integration.CacheTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.CacheTestConfigVars) { + require.NoError(t, createBucket(ctx, lsPort, testID)) }), - integration.StreamTestOptPort(servicePort), ) }) } + +func TestS3OutputThroughput(t *testing.T) { + // Skipping as this test was only put together out of curiosity + t.Skip() + + servicePort := getLocalStack(t) + + ctx, done := context.WithTimeout(context.Background(), time.Minute) + defer done() + + bucketName := "bench-test-bucket" + require.NoError(t, createBucket(context.Background(), servicePort, bucketName)) + + dataTarget := 1024 * 1024 * 1024 * 1 // 1GB + dataFiles := 1000 + + testData := make([]byte, dataTarget/dataFiles) + _, err := rand.Read(testData) + require.NoError(t, err) + + outConf, err := output.FromAny(bundle.GlobalEnvironment, map[string]any{ + "aws_s3": map[string]any{ + "bucket": bucketName, + "endpoint": fmt.Sprintf("http://localhost:%v", servicePort), + "force_path_style_urls": true, + "region": "eu-west-1", + "path": `${!counter()}.txt`, + "credentials": map[string]any{ + "id": "xxxxx", + "secret": "xxxxx", + "token": "xxxxx", + }, + "batching": map[string]any{ + "count": 1, + }, + }, + }) + require.NoError(t, err) + + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + out, err := mgr.NewOutput(outConf) + require.NoError(t, err) + + tStarted := time.Now() + + tChan := make(chan message.Transaction) + require.NoError(t, out.Consume(tChan)) + + var wg sync.WaitGroup + wg.Add(dataFiles) + + for i := 0; i < dataFiles; i++ { + select { + case tChan <- message.NewTransactionFunc(message.Batch{ + message.NewPart(testData), + }, func(ctx context.Context, err error) error { + wg.Done() + assert.NoError(t, err) + return nil + }): + case <-ctx.Done(): + t.Fatal("timed out") + } + } + + wg.Wait() + t.Logf("Delivery took %v total", time.Since(tStarted)) + + out.TriggerCloseNow() + require.NoError(t, out.WaitForClose(ctx)) +} diff --git a/internal/impl/aws/integration_sqs_test.go b/internal/impl/aws/integration_sqs_test.go new file mode 100644 index 0000000000..f2789788d0 --- /dev/null +++ b/internal/impl/aws/integration_sqs_test.go @@ -0,0 +1,53 @@ +package aws + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/benthosdev/benthos/v4/internal/integration" + + _ "github.com/benthosdev/benthos/v4/internal/impl/pure" +) + +func sqsIntegrationSuite(t *testing.T, lsPort string) { + template := ` +output: + aws_sqs: + url: http://localhost:$PORT/000000000000/queue-$ID + endpoint: http://localhost:$PORT + region: eu-west-1 + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx + max_in_flight: $MAX_IN_FLIGHT + batching: + count: $OUTPUT_BATCH_COUNT + +input: + aws_sqs: + url: http://localhost:$PORT/000000000000/queue-$ID + endpoint: http://localhost:$PORT + region: eu-west-1 + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx +` + integration.StreamTests( + integration.StreamTestOpenClose(), + integration.StreamTestSendBatch(10), + integration.StreamTestStreamSequential(50), + integration.StreamTestStreamParallel(50), + integration.StreamTestStreamParallelLossy(50), + integration.StreamTestStreamParallelLossyThroughReconnect(50), + ).Run( + t, template, + integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { + require.NoError(t, createBucketQueue(ctx, "", lsPort, testID)) + }), + integration.StreamTestOptPort(lsPort), + ) +} diff --git a/internal/impl/aws/integration_test.go b/internal/impl/aws/integration_test.go new file mode 100644 index 0000000000..ad098dfd9a --- /dev/null +++ b/internal/impl/aws/integration_test.go @@ -0,0 +1,78 @@ +package aws + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/benthosdev/benthos/v4/internal/integration" + + _ "github.com/benthosdev/benthos/v4/internal/impl/pure" +) + +func getLocalStack(t testing.TB) (port string) { + portInt, err := integration.GetFreePort() + require.NoError(t, err) + + port = strconv.Itoa(portInt) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "localstack/localstack", + ExposedPorts: []string{port + "/tcp"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + docker.Port(port + "/tcp"): { + docker.PortBinding{HostIP: "", HostPort: port}, + }, + }, + Env: []string{ + fmt.Sprintf("GATEWAY_LISTEN=0.0.0.0:%v", port), + }, + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + _ = resource.Expire(900) + + require.NoError(t, pool.Retry(func() (err error) { + defer func() { + if err != nil { + t.Logf("localstack probe error: %v", err) + } + }() + return createBucket(context.Background(), port, "test-bucket") + })) + return +} + +func TestIntegration(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + servicePort := getLocalStack(t) + + t.Run("kinesis", func(t *testing.T) { + kinesisIntegrationSuite(t, servicePort) + }) + + t.Run("s3", func(t *testing.T) { + s3IntegrationSuite(t, servicePort) + }) + + t.Run("sqs", func(t *testing.T) { + sqsIntegrationSuite(t, servicePort) + }) +} diff --git a/internal/impl/aws/resources/docker-compose.yaml b/internal/impl/aws/resources/docker-compose.yaml index 683cd2151b..8ae3acef26 100644 --- a/internal/impl/aws/resources/docker-compose.yaml +++ b/internal/impl/aws/resources/docker-compose.yaml @@ -4,6 +4,9 @@ services: localstack: image: localstack/localstack environment: - SERVICES: dynamodb,kinesis,s3,sqs + DEBUG: 1 + LOCALSTACK_HOST: localhost:4566 ports: - "4566:4566" + # volumes: + # - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/internal/impl/io/input_dynamic.go b/internal/impl/io/input_dynamic.go index 1059817934..349759cfc9 100644 --- a/internal/impl/io/input_dynamic.go +++ b/internal/impl/io/input_dynamic.go @@ -11,7 +11,6 @@ import ( "github.com/benthosdev/benthos/v4/internal/bundle" "github.com/benthosdev/benthos/v4/internal/component/input" "github.com/benthosdev/benthos/v4/internal/component/interop" - "github.com/benthosdev/benthos/v4/internal/component/testutil" "github.com/benthosdev/benthos/v4/internal/docs" "github.com/benthosdev/benthos/v4/public/service" ) @@ -143,7 +142,12 @@ func newDynamicInputFromParsed(conf *service.ParsedConfig, res *service.Resource } dynAPI.OnUpdate(func(ctx context.Context, id string, c []byte) error { - newConf, err := testutil.InputFromYAML(string(c)) + confNode, err := docs.UnmarshalYAML(c) + if err != nil { + return err + } + + newConf, err := input.FromAny(bundle.GlobalEnvironment, confNode) if err != nil { return err } diff --git a/internal/impl/io/output_dynamic.go b/internal/impl/io/output_dynamic.go index 28dc5458df..82c34bc680 100644 --- a/internal/impl/io/output_dynamic.go +++ b/internal/impl/io/output_dynamic.go @@ -11,7 +11,6 @@ import ( "github.com/benthosdev/benthos/v4/internal/bundle" "github.com/benthosdev/benthos/v4/internal/component/interop" "github.com/benthosdev/benthos/v4/internal/component/output" - "github.com/benthosdev/benthos/v4/internal/component/testutil" "github.com/benthosdev/benthos/v4/internal/docs" "github.com/benthosdev/benthos/v4/internal/impl/pure" "github.com/benthosdev/benthos/v4/public/service" @@ -136,7 +135,12 @@ func newDynamicOutputFromParsed(conf *service.ParsedConfig, res *service.Resourc } dynAPI.OnUpdate(func(ctx context.Context, id string, c []byte) error { - newConf, err := testutil.OutputFromYAML(string(c)) + confNode, err := docs.UnmarshalYAML(c) + if err != nil { + return err + } + + newConf, err := output.FromAny(bundle.GlobalEnvironment, confNode) if err != nil { return err } diff --git a/internal/impl/kafka/output_sarama_kafka.go b/internal/impl/kafka/output_sarama_kafka.go index e625975792..685c771838 100644 --- a/internal/impl/kafka/output_sarama_kafka.go +++ b/internal/impl/kafka/output_sarama_kafka.go @@ -39,6 +39,7 @@ const ( oskFieldAckReplicas = "ack_replicas" oskFieldMaxMsgBytes = "max_msg_bytes" oskFieldTimeout = "timeout" + oskFieldIdempotentWrite = "idempotent_write" oskFieldRetryAsBatch = "retry_as_batch" oskFieldBatching = "batching" oskFieldMaxRetries = "max_retries" @@ -126,6 +127,10 @@ Unfortunately this error message will appear for a wide range of connection prob Description("Specify criteria for which metadata values are sent with messages as headers."), span.InjectTracingSpanMappingDocs(), service.NewOutputMaxInFlightField(), + service.NewBoolField(oskFieldIdempotentWrite). + Description("Enable the idempotent write producer option. This requires the `IDEMPOTENT_WRITE` permission on `CLUSTER` and can be disabled if this permission is not available."). + Default(false). + Advanced(), service.NewBoolField(oskFieldAckReplicas). Description("Ensure that messages have been copied across all replicas before acknowledging receipt."). Advanced().Default(false), @@ -433,6 +438,10 @@ func (k *kafkaWriter) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram return nil, err } + if config.Producer.Idempotent, err = conf.FieldBool(oskFieldIdempotentWrite); err != nil { + return nil, err + } + if ackReplicas { config.Producer.RequiredAcks = sarama.WaitForAll } else { diff --git a/internal/impl/mongodb/common.go b/internal/impl/mongodb/common.go index 5a56c1d0a9..04177f885e 100644 --- a/internal/impl/mongodb/common.go +++ b/internal/impl/mongodb/common.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/writeconcern" @@ -286,19 +287,18 @@ const ( func writeMapsFields() []*service.ConfigField { return []*service.ConfigField{ service.NewBloblangField(commonFieldDocumentMap). - Description("A bloblang map representing the records in the mongo db. Used to generate the document for mongodb by " + - "mapping the fields in the message to the mongodb fields. The document map is required for the operations " + + Description("A bloblang map representing a document to store within MongoDB, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The document map is required for the operations " + "insert-one, replace-one and update-one."). Examples(mapExamples()...). Default(""), service.NewBloblangField(commonFieldFilterMap). - Description("A bloblang map representing the filter for the mongo db command. The filter map is required for all operations except " + + Description("A bloblang map representing a filter for a MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The filter map is required for all operations except " + "insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should " + "have the fields required to locate the document to delete."). Examples(mapExamples()...). Default(""), service.NewBloblangField(commonFieldHintMap). - Description("A bloblang map representing the hint for the mongo db command. This map is optional and is used with all operations " + + Description("A bloblang map representing the hint for the MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). This map is optional and is used with all operations " + "except insert-one. It is used to improve performance of finding the documents in the mongodb."). Examples(mapExamples()...). Default(""), @@ -369,58 +369,49 @@ func writeMapsFromParsed(conf *service.ParsedConfig, operation Operation) (maps return } -func (w writeMaps) extractFromMessage(operation Operation, i int, batch service.MessageBatch) ( - docJSON, filterJSON, hintJSON any, err error, -) { - var hintVal, filterVal, documentVal *service.Message - var filterValWanted, documentValWanted bool - - filterValWanted = operation.isFilterAllowed() - documentValWanted = operation.isDocumentAllowed() - - if filterValWanted { - if filterVal, err = batch.BloblangQuery(i, w.filterMap); err != nil { - err = fmt.Errorf("failed to execute filter_map: %v", err) - return - } +func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) { + msg, err := b.BloblangQuery(i, m) + if err != nil { + return nil, err } - - if (filterVal != nil || !filterValWanted) && documentValWanted { - if documentVal, err = batch.BloblangQuery(i, w.documentMap); err != nil { - err = fmt.Errorf("failed to execute document_map: %v", err) - return - } + if msg == nil { + return nil, nil } - if filterVal == nil && filterValWanted { - err = fmt.Errorf("failed to generate filterVal") - return + valBytes, err := msg.AsBytes() + if err != nil { + return nil, err } - if documentVal == nil && documentValWanted { - err = fmt.Errorf("failed to generate documentVal") - return + var ejsonVal any + if err := bson.UnmarshalExtJSON(valBytes, true, &ejsonVal); err != nil { + return nil, err } + return ejsonVal, nil +} - if filterValWanted { - if filterJSON, err = filterVal.AsStructured(); err != nil { +func (w writeMaps) extractFromMessage(operation Operation, i int, batch service.MessageBatch) ( + docJSON, filterJSON, hintJSON any, err error, +) { + filterValWanted := operation.isFilterAllowed() + documentValWanted := operation.isDocumentAllowed() + + if filterValWanted && w.filterMap != nil { + if filterJSON, err = extJSONFromMap(batch, i, w.filterMap); err != nil { + err = fmt.Errorf("failed to execute filter_map: %v", err) return } } - if documentValWanted { - if docJSON, err = documentVal.AsStructured(); err != nil { + if documentValWanted && w.documentMap != nil { + if docJSON, err = extJSONFromMap(batch, i, w.documentMap); err != nil { + err = fmt.Errorf("failed to execute document_map: %v", err) return } } if w.hintMap != nil { - hintVal, err = batch.BloblangQuery(i, w.hintMap) - if err != nil { - err = fmt.Errorf("failed to execute hint_map: %v", err) - return - } - if hintJSON, err = hintVal.AsStructured(); err != nil { + if hintJSON, err = extJSONFromMap(batch, i, w.hintMap); err != nil { return } } diff --git a/internal/impl/mongodb/input.go b/internal/impl/mongodb/input.go index 7222ac2b9f..4388dad4a5 100644 --- a/internal/impl/mongodb/input.go +++ b/internal/impl/mongodb/input.go @@ -2,10 +2,12 @@ package mongodb import ( "context" + "errors" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "github.com/benthosdev/benthos/v4/public/service" ) @@ -21,8 +23,8 @@ func mongoConfigSpec() *service.ConfigSpec { // Stable(). TODO Version("3.64.0"). Categories("Services"). - Summary("Executes a find query and creates a message for each row received."). - Description(`Once the rows from the query are exhausted this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a [sequence](/docs/components/inputs/sequence) to execute).`). + Summary("Executes a query and creates a message for each document received."). + Description(`Once the documents from the query are exhausted, this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a [sequence](/docs/components/inputs/sequence) to execute).`). Fields(clientFields()...). Field(service.NewStringField("collection").Description("The collection to select from.")). Field(service.NewStringEnumField("operation", FindInputOperation, AggregateInputOperation). @@ -44,26 +46,45 @@ func mongoConfigSpec() *service.ConfigSpec { Example(` root.from = {"$lte": timestamp_unix()} root.to = {"$gte": timestamp_unix()} -`)) +`)). + Field(service.NewIntField("batch_size"). + Description("A explicit number of documents to batch up before flushing them for processing. Must be greater than `0`. Operations: `find`, `aggregate`"). + Optional(). + Example(1000). + Version("4.26.0")). + Field(service.NewIntMapField("sort"). + Description("An object specifying fields to sort by, and the respective sort order (`1` ascending, `-1` descending). Note: The driver currently appears to support only one sorting key. Operations: `find`"). + Optional(). + Example(map[string]int{"name": 1}). + Example(map[string]int{"age": -1}). + Version("4.26.0")). + Field(service.NewIntField("limit"). + Description("An explicit maximum number of documents to return. Operations: `find`"). + Optional(). + Version("4.26.0")) } func init() { - err := service.RegisterInput( + err := service.RegisterBatchInput( "mongodb", mongoConfigSpec(), - func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { - return newMongoInput(conf) + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + return newMongoInput(conf, mgr.Logger()) }) if err != nil { panic(err) } } -func newMongoInput(conf *service.ParsedConfig) (service.Input, error) { +func newMongoInput(conf *service.ParsedConfig, logger *service.Logger) (service.BatchInput, error) { + var ( + limit, batchSize int + sort map[string]int + ) + mClient, database, err := getClient(conf) if err != nil { return nil, err } - collection, err := conf.FieldString("collection") if err != nil { return nil, err @@ -84,14 +105,35 @@ func newMongoInput(conf *service.ParsedConfig) (service.Input, error) { if err != nil { return nil, err } - - return service.AutoRetryNacks(&mongoInput{ + if conf.Contains("batch_size") { + if batchSize, err = conf.FieldInt("batch_size"); err != nil { + return nil, err + } else if batchSize < 1 { + return nil, errors.New("batch_size must be >0") + } + } + if conf.Contains("sort") { + if sort, err = conf.FieldIntMap("sort"); err != nil { + return nil, err + } + } + if conf.Contains("limit") { + if limit, err = conf.FieldInt("limit"); err != nil { + return nil, err + } + } + return service.AutoRetryNacksBatched(&mongoInput{ query: query, collection: collection, client: mClient, database: database, operation: operation, marshalCanon: marshalMode == string(JSONMarshalModeCanonical), + batchSize: int32(batchSize), + sort: sort, + limit: int64(limit), + count: 0, + logger: logger, }), nil } @@ -103,6 +145,11 @@ type mongoInput struct { cursor *mongo.Cursor operation string marshalCanon bool + batchSize int32 + sort map[string]int + limit int64 + count int + logger *service.Logger } func (m *mongoInput) Connect(ctx context.Context) error { @@ -118,11 +165,21 @@ func (m *mongoInput) Connect(ctx context.Context) error { collection := m.database.Collection(m.collection) switch m.operation { case "find": - m.cursor, err = collection.Find(ctx, m.query) + var findOptions *options.FindOptions + findOptions, err = m.getFindOptions() + if err != nil { + return fmt.Errorf("error parsing 'find' options: %v", err) + } + m.cursor, err = collection.Find(ctx, m.query, findOptions) case "aggregate": - m.cursor, err = collection.Aggregate(ctx, m.query) + var aggregateOptions *options.AggregateOptions + aggregateOptions, err = m.getAggregateOptions() + if err != nil { + return fmt.Errorf("error parsing 'aggregate' options: %v", err) + } + m.cursor, err = collection.Aggregate(ctx, m.query, aggregateOptions) default: - return fmt.Errorf("opertaion %s not supported. the supported values are \"find\" and \"aggregate\"", m.operation) + return fmt.Errorf("operation '%s' not supported. the supported values are 'find' and 'aggregate'", m.operation) } if err != nil { _ = m.client.Disconnect(ctx) @@ -131,33 +188,66 @@ func (m *mongoInput) Connect(ctx context.Context) error { return nil } -func (m *mongoInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { +func (m *mongoInput) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { if m.cursor == nil { return nil, nil, service.ErrNotConnected } - if !m.cursor.Next(ctx) { - return nil, nil, service.ErrEndOfInput - } - var decoded any - if err := m.cursor.Decode(&decoded); err != nil { - return nil, nil, err - } - - data, err := bson.MarshalExtJSON(decoded, m.marshalCanon, false) - if err != nil { - return nil, nil, err - } - msg := service.NewMessage(nil) - msg.SetBytes(data) - return msg, func(ctx context.Context, err error) error { - return nil - }, nil + batch := make(service.MessageBatch, 0, m.batchSize) + for m.cursor.Next(ctx) { + msg := service.NewMessage(nil) + msg.MetaSet("mongo_database", m.database.Name()) + msg.MetaSet("mongo_collection", m.collection) + + var decoded any + if err := m.cursor.Decode(&decoded); err != nil { + msg.SetError(err) + } else { + data, err := bson.MarshalExtJSON(decoded, m.marshalCanon, false) + if err != nil { + msg.SetError(err) + } + msg.SetBytes(data) + } + + batch = append(batch, msg) + m.count++ + + if m.batchSize == 0 || m.cursor.RemainingBatchLength() == 0 { + return batch, func(ctx context.Context, err error) error { + return nil + }, nil + } + } + return nil, nil, service.ErrEndOfInput } func (m *mongoInput) Close(ctx context.Context) error { if m.cursor != nil && m.client != nil { + m.logger.Debugf("Got %d documents from '%s' collection", m.count, m.collection) return m.client.Disconnect(ctx) } return nil } + +func (m *mongoInput) getFindOptions() (*options.FindOptions, error) { + findOptions := options.Find() + if m.batchSize > 0 { + findOptions.SetBatchSize(m.batchSize) + } + if m.sort != nil { + findOptions.SetSort(m.sort) + } + if m.limit > 0 { + findOptions.SetLimit(m.limit) + } + return findOptions, nil +} + +func (m *mongoInput) getAggregateOptions() (*options.AggregateOptions, error) { + aggregateOptions := options.Aggregate() + if m.batchSize > 0 { + aggregateOptions.SetBatchSize(m.batchSize) + } + return aggregateOptions, nil +} diff --git a/internal/impl/mongodb/input_test.go b/internal/impl/mongodb/input_test.go index 2251ac0714..199cfea89d 100644 --- a/internal/impl/mongodb/input_test.go +++ b/internal/impl/mongodb/input_test.go @@ -18,7 +18,7 @@ import ( "github.com/benthosdev/benthos/v4/public/service" ) -func TestSQLSelectInputEmptyShutdown(t *testing.T) { +func TestMongoInputEmptyShutdown(t *testing.T) { conf := ` url: "mongodb://localhost:27017" username: foouser @@ -32,13 +32,14 @@ query: | spec := mongoConfigSpec() env := service.NewEnvironment() + resources := service.MockResources() mongoConfig, err := spec.ParseYAML(conf, env) require.NoError(t, err) - selectInput, err := newMongoInput(mongoConfig) + mongoInput, err := newMongoInput(mongoConfig, resources.Logger()) require.NoError(t, err) - require.NoError(t, selectInput.Close(context.Background())) + require.NoError(t, mongoInput.Close(context.Background())) } func TestInputIntegration(t *testing.T) { @@ -128,6 +129,7 @@ func TestInputIntegration(t *testing.T) { placeholderConf string jsonMarshalMode JSONMarshalMode } + limit := int64(3) cases := map[string]testCase{ "find": { query: func(coll *mongo.Collection) (*mongo.Cursor, error) { @@ -135,6 +137,11 @@ func TestInputIntegration(t *testing.T) { "age": bson.M{ "$gte": 18, }, + }, &options.FindOptions{ + Sort: bson.M{ + "name": 1, + }, + Limit: &limit, }) }, placeholderConf: ` @@ -146,6 +153,10 @@ collection: "TestCollection" json_marshal_mode: relaxed query: | root.age = {"$gte": 18} +batchSize: 2 +sort: + name: 1 +limit: 3 `, jsonMarshalMode: JSONMarshalModeRelaxed, }, @@ -160,7 +171,12 @@ query: | }, }, bson.M{ - "$limit": 3, + "$sort": bson.M{ + "name": 1, + }, + }, + bson.M{ + "$limit": limit, }, }) }, @@ -181,10 +197,16 @@ query: | } } }, + { + "$sort": { + "name": 1 + } + }, { "$limit": 3 } ] +batchSize: 2 `, jsonMarshalMode: JSONMarshalModeCanonical, }, @@ -227,27 +249,40 @@ func testInput( spec := mongoConfigSpec() env := service.NewEnvironment() + resources := service.MockResources() mongoConfig, err := spec.ParseYAML(conf, env) require.NoError(t, err) - selectInput, err := newMongoInput(mongoConfig) + mongoInput, err := newMongoInput(mongoConfig, resources.Logger()) require.NoError(t, err) ctx := context.Background() - err = selectInput.Connect(ctx) + err = mongoInput.Connect(ctx) require.NoError(t, err) - for _, wMsg := range wantMsgs { - msg, ack, err := selectInput.Read(ctx) + + // read all batches + var actualMsgs service.MessageBatch + for { + batch, ack, err := mongoInput.ReadBatch(ctx) + if err == service.ErrEndOfInput { + break + } require.NoError(t, err) + actualMsgs = append(actualMsgs, batch...) + require.NoError(t, ack(ctx, nil)) + } + + // compare to wanted messages + for i, wMsg := range wantMsgs { + msg := actualMsgs[i] msgBytes, err := msg.AsBytes() require.NoError(t, err) assert.JSONEq(t, string(wMsg), string(msgBytes)) - require.NoError(t, ack(ctx, nil)) } - _, ack, err := selectInput.Read(ctx) + _, ack, err := mongoInput.ReadBatch(ctx) assert.Equal(t, service.ErrEndOfInput, err) require.Nil(t, ack) - require.NoError(t, selectInput.Close(context.Background())) + require.NoError(t, mongoInput.Close(context.Background())) } diff --git a/internal/impl/nats/auth.go b/internal/impl/nats/auth.go index c481fad8eb..e2f8da8e11 100644 --- a/internal/impl/nats/auth.go +++ b/internal/impl/nats/auth.go @@ -12,11 +12,61 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" + "github.com/benthosdev/benthos/v4/internal/docs" "github.com/benthosdev/benthos/v4/public/service" ) -func authConfToOptions(auth auth.Config, fs *service.FS) []nats.Option { +func authDescription() string { + return `### Authentication + +There are several components within Benthos which utilise NATS services. You will find that each of these components +support optional advanced authentication parameters for [NKeys](https://docs.nats.io/nats-server/configuration/securing_nats/auth_intro/nkey_auth) +and [User Credentials](https://docs.nats.io/developing-with-nats/security/creds). + +An in depth tutorial can be found [here](https://docs.nats.io/developing-with-nats/tutorials/jwt). + +#### NKey file + +The NATS server can use these NKeys in several ways for authentication. The simplest is for the server to be configured +with a list of known public keys and for the clients to respond to the challenge by signing it with its private NKey +configured in the ` + "`nkey_file`" + ` field. + +More details [here](https://docs.nats.io/developing-with-nats/security/nkey). + +#### User Credentials + +NATS server supports decentralized authentication based on JSON Web Tokens (JWT). Clients need an [user JWT](https://docs.nats.io/nats-server/configuration/securing_nats/jwt#json-web-tokens) +and a corresponding [NKey secret](https://docs.nats.io/developing-with-nats/security/nkey) when connecting to a server +which is configured to use this authentication scheme. + +The ` + "`user_credentials_file`" + ` field should point to a file containing both the private key and the JWT and can be +generated with the [nsc tool](https://docs.nats.io/nats-tools/nsc). + +Alternatively, the ` + "`user_jwt`" + ` field can contain a plain text JWT and the ` + "`user_nkey_seed`" + `can contain +the plain text NKey Seed. + +More details [here](https://docs.nats.io/developing-with-nats/security/creds).` +} + +func authFieldSpec() docs.FieldSpec { + return docs.FieldObject("auth", "Optional configuration of NATS authentication parameters.").WithChildren( + docs.FieldString("nkey_file", "An optional file containing a NKey seed.", "./seed.nk").Optional(), + docs.FieldString("user_credentials_file", "An optional file containing user credentials which consist of an user JWT and corresponding NKey seed.", "./user.creds").Optional(), + docs.FieldString("user_jwt", "An optional plain text user JWT (given along with the corresponding user NKey Seed).").Secret().Optional(), + docs.FieldString("user_nkey_seed", "An optional plain text user NKey Seed (given along with the corresponding user JWT).").Secret().Optional(), + ).Advanced() +} + +type authConfig struct { + NKeyFile string + UserCredentialsFile string + UserJWT string + UserNkeySeed string +} + +//------------------------------------------------------------------------------ + +func authConfToOptions(auth authConfig, fs *service.FS) []nats.Option { var opts []nats.Option if auth.NKeyFile != "" { if opt, err := nats.NkeyOptionFromSeed(auth.NKeyFile); err != nil { @@ -48,8 +98,7 @@ func authConfToOptions(auth auth.Config, fs *service.FS) []nats.Option { } // AuthFromParsedConfig attempts to extract an auth config from a ParsedConfig. -func AuthFromParsedConfig(p *service.ParsedConfig) (c auth.Config, err error) { - c = auth.New() +func AuthFromParsedConfig(p *service.ParsedConfig) (c authConfig, err error) { if p.Contains("nkey_file") { if c.NKeyFile, err = p.FieldString("nkey_file"); err != nil { return diff --git a/internal/impl/nats/auth/docs.go b/internal/impl/nats/auth/docs.go deleted file mode 100644 index 13e5804468..0000000000 --- a/internal/impl/nats/auth/docs.go +++ /dev/null @@ -1,46 +0,0 @@ -package auth - -import "github.com/benthosdev/benthos/v4/internal/docs" - -// Description returns a markdown version of NATs authentication documentation. -func Description() string { - return `### Authentication - -There are several components within Benthos which utilise NATS services. You will find that each of these components -support optional advanced authentication parameters for [NKeys](https://docs.nats.io/nats-server/configuration/securing_nats/auth_intro/nkey_auth) -and [User Credentials](https://docs.nats.io/developing-with-nats/security/creds). - -An in depth tutorial can be found [here](https://docs.nats.io/developing-with-nats/tutorials/jwt). - -#### NKey file - -The NATS server can use these NKeys in several ways for authentication. The simplest is for the server to be configured -with a list of known public keys and for the clients to respond to the challenge by signing it with its private NKey -configured in the ` + "`nkey_file`" + ` field. - -More details [here](https://docs.nats.io/developing-with-nats/security/nkey). - -#### User Credentials - -NATS server supports decentralized authentication based on JSON Web Tokens (JWT). Clients need an [user JWT](https://docs.nats.io/nats-server/configuration/securing_nats/jwt#json-web-tokens) -and a corresponding [NKey secret](https://docs.nats.io/developing-with-nats/security/nkey) when connecting to a server -which is configured to use this authentication scheme. - -The ` + "`user_credentials_file`" + ` field should point to a file containing both the private key and the JWT and can be -generated with the [nsc tool](https://docs.nats.io/nats-tools/nsc). - -Alternatively, the ` + "`user_jwt`" + ` field can contain a plain text JWT and the ` + "`user_nkey_seed`" + `can contain -the plain text NKey Seed. - -More details [here](https://docs.nats.io/developing-with-nats/security/creds).` -} - -// FieldSpec returns documentation authentication specs for NATS components. -func FieldSpec() docs.FieldSpec { - return docs.FieldObject("auth", "Optional configuration of NATS authentication parameters.").WithChildren( - docs.FieldString("nkey_file", "An optional file containing a NKey seed.", "./seed.nk").Optional(), - docs.FieldString("user_credentials_file", "An optional file containing user credentials which consist of an user JWT and corresponding NKey seed.", "./user.creds").Optional(), - docs.FieldString("user_jwt", "An optional plain text user JWT (given along with the corresponding user NKey Seed).").Secret().Optional(), - docs.FieldString("user_nkey_seed", "An optional plain text user NKey Seed (given along with the corresponding user JWT).").Secret().Optional(), - ).Advanced() -} diff --git a/internal/impl/nats/auth/type.go b/internal/impl/nats/auth/type.go deleted file mode 100644 index f11204a82d..0000000000 --- a/internal/impl/nats/auth/type.go +++ /dev/null @@ -1,19 +0,0 @@ -package auth - -// Config contains configuration params for NATS authentication. -type Config struct { - NKeyFile string `json:"nkey_file" yaml:"nkey_file"` - UserCredentialsFile string `json:"user_credentials_file" yaml:"user_credentials_file"` - UserJWT string `json:"user_jwt" yaml:"user_jwt"` - UserNkeySeed string `json:"user_nkey_seed" yaml:"user_nkey_seed"` -} - -// New creates a new Config instance. -func New() Config { - return Config{ - NKeyFile: "", - UserCredentialsFile: "", - UserJWT: "", - UserNkeySeed: "", - } -} diff --git a/internal/impl/nats/auth_test.go b/internal/impl/nats/auth_test.go index 50d240622f..48670d64d9 100644 --- a/internal/impl/nats/auth_test.go +++ b/internal/impl/nats/auth_test.go @@ -8,7 +8,6 @@ import ( "github.com/nats-io/nkeys" "github.com/stretchr/testify/assert" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/public/service" ) @@ -31,7 +30,7 @@ SUABRFVRZW4YPTRCQOFZKF45ISHYBPRXPUV7NHHZJVF3D3M2HLZLDKIJ2U ) func TestNatsAuthConfToOptions(t *testing.T) { - conf := auth.New() + conf := authConfig{} conf.UserCredentialsFile = "user.creds" fs := fstest.MapFS{ diff --git a/internal/impl/nats/connection.go b/internal/impl/nats/connection.go new file mode 100644 index 0000000000..3d85a784c4 --- /dev/null +++ b/internal/impl/nats/connection.go @@ -0,0 +1,75 @@ +package nats + +import ( + "context" + "crypto/tls" + "strings" + + "github.com/nats-io/nats.go" + + "github.com/benthosdev/benthos/v4/public/service" +) + +// I've split the connection fields into two, which allows us to put tls and +// auth further down the fields stack. This is literally just polish for the +// docs. +func connectionHeadFields() []*service.ConfigField { + return []*service.ConfigField{ + service.NewStringListField("urls"). + Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). + Example([]string{"nats://127.0.0.1:4222"}). + Example([]string{"nats://username:password@127.0.0.1:4222"}), + } +} + +func connectionTailFields() []*service.ConfigField { + return []*service.ConfigField{ + service.NewTLSToggledField("tls"), + service.NewInternalField(authFieldSpec()), + } +} + +type connectionDetails struct { + label string + logger *service.Logger + tlsConf *tls.Config + authConf authConfig + fs *service.FS + urls string +} + +func connectionDetailsFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (c connectionDetails, err error) { + c.label = mgr.Label() + c.fs = mgr.FS() + c.logger = mgr.Logger() + + var urlList []string + if urlList, err = conf.FieldStringList("urls"); err != nil { + return + } + c.urls = strings.Join(urlList, ",") + + var tlsEnabled bool + if c.tlsConf, tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil { + return + } + if !tlsEnabled { + c.tlsConf = nil + } + + if c.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { + return + } + return +} + +func (c *connectionDetails) get(_ context.Context) (*nats.Conn, error) { + var opts []nats.Option + if c.tlsConf != nil { + opts = append(opts, nats.Secure(c.tlsConf)) + } + opts = append(opts, nats.Name(c.label)) + opts = append(opts, errorHandlerOption(c.logger)) + opts = append(opts, authConfToOptions(c.authConf, c.fs)...) + return nats.Connect(c.urls, opts...) +} diff --git a/internal/impl/nats/input.go b/internal/impl/nats/input.go index 4bf18aef88..592893d508 100644 --- a/internal/impl/nats/input.go +++ b/internal/impl/nats/input.go @@ -2,16 +2,13 @@ package nats import ( "context" - "crypto/tls" "errors" - "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/benthosdev/benthos/v4/internal/component/input/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/public/service" ) @@ -32,11 +29,8 @@ This input adds the following metadata fields to each message: You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewStringField("subject"). Description("A subject to consume from. Supports wildcards for consuming multiple subjects. Either a subject or stream must be specified."). Example("foo.bar.baz").Example("foo.*.baz").Example("foo.bar.*").Example("foo.>")). @@ -53,8 +47,7 @@ You can access these metadata fields using [function interpolation](/docs/config Advanced(). Default(nats.DefaultSubPendingMsgsLimit). LintRule(`root = if this < 0 { ["prefetch count must be greater than or equal to zero"] }`)). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())). + Fields(connectionTailFields()...). Field(span.ExtractTracingSpanMappingDocs().Version(tracingVersion)) } @@ -75,17 +68,13 @@ func init() { } type natsReader struct { - label string - urls string + connDetails connectionDetails subject string queue string prefetchCount int nakDelay time.Duration - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS cMut sync.Mutex @@ -98,17 +87,14 @@ type natsReader struct { func newNATSReader(conf *service.ParsedConfig, mgr *service.Resources) (*natsReader, error) { n := natsReader{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), interruptChan: make(chan struct{}), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if n.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - n.urls = strings.Join(urlList, ",") if n.subject, err = conf.FieldString("subject"); err != nil { return nil, err @@ -133,19 +119,6 @@ func newNATSReader(conf *service.ParsedConfig, mgr *service.Resources) (*natsRea return nil, err } } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - n.tlsConf = tlsConf - } - - if n.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } - return &n, nil } @@ -160,19 +133,11 @@ func (n *natsReader) Connect(ctx context.Context) error { var natsConn *nats.Conn var natsSub *nats.Subscription var err error - var opts []nats.Option - if n.tlsConf != nil { - opts = append(opts, nats.Secure(n.tlsConf)) - } - - opts = append(opts, nats.Name(n.label)) - opts = append(opts, authConfToOptions(n.authConf, n.fs)...) - opts = append(opts, errorHandlerOption(n.log)) - - if natsConn, err = nats.Connect(n.urls, opts...); err != nil { + if natsConn, err = n.connDetails.get(ctx); err != nil { return err } + natsChan := make(chan *nats.Msg, n.prefetchCount) if len(n.queue) > 0 { diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index 98ff965392..74d19e4de3 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -2,18 +2,15 @@ package nats import ( "context" - "crypto/tls" "errors" "fmt" "strconv" - "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/benthosdev/benthos/v4/internal/component/input/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/internal/shutdown" "github.com/benthosdev/benthos/v4/public/service" ) @@ -46,11 +43,8 @@ This input adds the following metadata fields to each message: You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewStringField("queue"). Description("An optional queue group to consume as."). Optional()). @@ -83,8 +77,7 @@ You can access these metadata fields using Description("The maximum number of outstanding acks to be allowed before consuming is halted."). Advanced(). Default(1024)). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())). + Fields(connectionTailFields()...). Field(span.ExtractTracingSpanMappingDocs().Version(tracingVersion)) } @@ -106,8 +99,7 @@ func init() { //------------------------------------------------------------------------------ type jetStreamReader struct { - label string - urls string + connDetails connectionDetails deliverOpt nats.SubOpt subject string queue string @@ -117,11 +109,8 @@ type jetStreamReader struct { durable string ackWait time.Duration maxAckPending int - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS connMut sync.Mutex natsConn *nats.Conn @@ -132,17 +121,14 @@ type jetStreamReader struct { func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*jetStreamReader, error) { j := jetStreamReader{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), shutSig: shutdown.NewSignaller(), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if j.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - j.urls = strings.Join(urlList, ",") deliver, err := conf.FieldString("deliver") if err != nil { @@ -207,19 +193,6 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou if j.maxAckPending, err = conf.FieldInt("max_ack_pending"); err != nil { return nil, err } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - j.tlsConf = tlsConf - } - - if j.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } - return &j, nil } @@ -247,14 +220,7 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { } }() - var opts []nats.Option - if j.tlsConf != nil { - opts = append(opts, nats.Secure(j.tlsConf)) - } - opts = append(opts, nats.Name(j.label)) - opts = append(opts, authConfToOptions(j.authConf, j.fs)...) - opts = append(opts, errorHandlerOption(j.log)) - if natsConn, err = nats.Connect(j.urls, opts...); err != nil { + if natsConn, err = j.connDetails.get(ctx); err != nil { return err } diff --git a/internal/impl/nats/input_jetstream_test.go b/internal/impl/nats/input_jetstream_test.go index 37b9028526..f8c15feb44 100644 --- a/internal/impl/nats/input_jetstream_test.go +++ b/internal/impl/nats/input_jetstream_test.go @@ -30,12 +30,12 @@ auth: e, err := newJetStreamReaderFromConfig(conf, service.MockResources()) require.NoError(t, err) - assert.Equal(t, "url1,url2", e.urls) + assert.Equal(t, "url1,url2", e.connDetails.urls) assert.Equal(t, "testsubject", e.subject) - assert.Equal(t, "test auth n key file", e.authConf.NKeyFile) - assert.Equal(t, "test auth user creds file", e.authConf.UserCredentialsFile) - assert.Equal(t, "test auth inline user JWT", e.authConf.UserJWT) - assert.Equal(t, "test auth inline user NKey Seed", e.authConf.UserNkeySeed) + assert.Equal(t, "test auth n key file", e.connDetails.authConf.NKeyFile) + assert.Equal(t, "test auth user creds file", e.connDetails.authConf.UserCredentialsFile) + assert.Equal(t, "test auth inline user JWT", e.connDetails.authConf.UserJWT) + assert.Equal(t, "test auth inline user NKey Seed", e.connDetails.authConf.UserNkeySeed) }) t.Run("Missing user_nkey_seed", func(t *testing.T) { diff --git a/internal/impl/nats/input_kv.go b/internal/impl/nats/input_kv.go index 67d234a065..4c9a2f9311 100644 --- a/internal/impl/nats/input_kv.go +++ b/internal/impl/nats/input_kv.go @@ -2,13 +2,10 @@ package nats import ( "context" - "crypto/tls" - "strings" "sync" "github.com/nats-io/nats.go" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/internal/shutdown" "github.com/benthosdev/benthos/v4/public/service" ) @@ -33,11 +30,8 @@ This input adds the following metadata fields to each message: - nats_kv_created ` + "```" + ` -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewStringField("bucket"). Description("The name of the KV bucket to watch for updates."). Example("my_kv_bucket")). @@ -57,8 +51,7 @@ This input adds the following metadata fields to each message: Description("Retrieve only the metadata of the entry"). Default(false). Advanced()). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())) + Fields(connectionTailFields()...) } func init() { @@ -75,18 +68,14 @@ func init() { } type kvReader struct { - label string - urls string + connDetails connectionDetails bucket string key string ignoreDeletes bool includeHistory bool metaOnly bool - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS shutSig *shutdown.Signaller @@ -97,17 +86,14 @@ type kvReader struct { func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader, error) { r := &kvReader{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), shutSig: shutdown.NewSignaller(), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if r.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - r.urls = strings.Join(urlList, ",") if r.bucket, err = conf.FieldString("bucket"); err != nil { return nil, err @@ -128,19 +114,6 @@ func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader, if r.key, err = conf.FieldString("key"); err != nil { return nil, err } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - r.tlsConf = tlsConf - } - - if r.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } - return r, nil } @@ -163,13 +136,7 @@ func (r *kvReader) Connect(ctx context.Context) (err error) { } }() - var opts []nats.Option - if r.tlsConf != nil { - opts = append(opts, nats.Secure(r.tlsConf)) - } - opts = append(opts, nats.Name(r.label)) - opts = append(opts, authConfToOptions(r.authConf, r.fs)...) - if r.natsConn, err = nats.Connect(r.urls, opts...); err != nil { + if r.natsConn, err = r.connDetails.get(ctx); err != nil { return err } diff --git a/internal/impl/nats/input_kv_test.go b/internal/impl/nats/input_kv_test.go index f90dd1f8fe..4f1270c0e1 100644 --- a/internal/impl/nats/input_kv_test.go +++ b/internal/impl/nats/input_kv_test.go @@ -34,16 +34,16 @@ auth: e, err := newKVReader(conf, service.MockResources()) require.NoError(t, err) - assert.Equal(t, "url1,url2", e.urls) + assert.Equal(t, "url1,url2", e.connDetails.urls) assert.Equal(t, "testbucket", e.bucket) assert.Equal(t, "testkey", e.key) assert.Equal(t, true, e.ignoreDeletes) assert.Equal(t, true, e.includeHistory) assert.Equal(t, true, e.metaOnly) - assert.Equal(t, "test auth n key file", e.authConf.NKeyFile) - assert.Equal(t, "test auth user creds file", e.authConf.UserCredentialsFile) - assert.Equal(t, "test auth inline user JWT", e.authConf.UserJWT) - assert.Equal(t, "test auth inline user NKey Seed", e.authConf.UserNkeySeed) + assert.Equal(t, "test auth n key file", e.connDetails.authConf.NKeyFile) + assert.Equal(t, "test auth user creds file", e.connDetails.authConf.UserCredentialsFile) + assert.Equal(t, "test auth inline user JWT", e.connDetails.authConf.UserJWT) + assert.Equal(t, "test auth inline user NKey Seed", e.connDetails.authConf.UserNkeySeed) }) t.Run("Missing user_nkey_seed", func(t *testing.T) { diff --git a/internal/impl/nats/input_stream.go b/internal/impl/nats/input_stream.go index 0ebc190cf6..ffc63ee203 100644 --- a/internal/impl/nats/input_stream.go +++ b/internal/impl/nats/input_stream.go @@ -2,9 +2,7 @@ package nats import ( "context" - "crypto/tls" "strconv" - "strings" "sync" "time" @@ -14,7 +12,6 @@ import ( "github.com/benthosdev/benthos/v4/internal/component" "github.com/benthosdev/benthos/v4/internal/component/input/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/public/service" ) @@ -35,7 +32,7 @@ const ( ) type siConfig struct { - URLs []string + connDetails connectionDetails ClusterID string ClientID string QueueID string @@ -45,13 +42,10 @@ type siConfig struct { Subject string MaxInflight int AckWait time.Duration - TLS *tls.Config - TLSEnabled bool - Auth auth.Config } -func siConfigFromParsed(pConf *service.ParsedConfig) (conf siConfig, err error) { - if conf.URLs, err = pConf.FieldStringList(siFieldURLs); err != nil { +func siConfigFromParsed(pConf *service.ParsedConfig, mgr *service.Resources) (conf siConfig, err error) { + if conf.connDetails, err = connectionDetailsFromParsed(pConf, mgr); err != nil { return } if conf.ClusterID, err = pConf.FieldString(siFieldClusterID); err != nil { @@ -81,12 +75,6 @@ func siConfigFromParsed(pConf *service.ParsedConfig) (conf siConfig, err error) if conf.AckWait, err = pConf.FieldDuration(siFieldAckWait); err != nil { return } - if conf.TLS, conf.TLSEnabled, err = pConf.FieldTLSToggled(siFieldTLS); err != nil { - return - } - if conf.Auth, err = AuthFromParsedConfig(pConf.Namespace(siFieldAuth)); err != nil { - return - } return } @@ -115,12 +103,9 @@ This input adds the following metadata fields to each message: You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). -`+auth.Description()). +`+authDescription()). + Fields(connectionHeadFields()...). Fields( - service.NewStringListField(siFieldURLs). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"}), service.NewStringField(siFieldClusterID). Description("The ID of the cluster to consume from."), service.NewStringField(siFieldClientID). @@ -150,8 +135,9 @@ You can access these metadata fields using [function interpolation](/docs/config Description("An optional duration to specify at which a message that is yet to be acked will be automatically retried."). Advanced(). Default("30s"), - service.NewTLSToggledField(siFieldTLS), - service.NewInternalField(auth.FieldSpec()), + ). + Fields(connectionTailFields()...). + Fields( span.ExtractTracingSpanMappingDocs().Version(tracingVersion), ) } @@ -160,7 +146,7 @@ func init() { err := service.RegisterInput( "nats_stream", siSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { - pConf, err := siConfigFromParsed(conf) + pConf, err := siConfigFromParsed(conf, mgr) if err != nil { return nil, err } @@ -177,10 +163,7 @@ func init() { type natsStreamReader struct { conf siConfig - urls string - - log *service.Logger - fs *service.FS + log *service.Logger unAckMsgs []*stan.Msg @@ -205,14 +188,12 @@ func newNATSStreamReader(conf siConfig, mgr *service.Resources) (*natsStreamRead n := natsStreamReader{ conf: conf, - fs: mgr.FS(), log: mgr.Logger(), msgChan: make(chan *stan.Msg), interruptChan: make(chan struct{}), } close(n.msgChan) - n.urls = strings.Join(conf.URLs, ",") return &n, nil } @@ -241,15 +222,7 @@ func (n *natsStreamReader) Connect(ctx context.Context) error { return nil } - var opts []nats.Option - if n.conf.TLSEnabled && n.conf.TLS != nil { - opts = append(opts, nats.Secure(n.conf.TLS)) - } - - opts = append(opts, authConfToOptions(n.conf.Auth, n.fs)...) - opts = append(opts, errorHandlerOption(n.log)) - - natsConn, err := nats.Connect(n.urls, opts...) + natsConn, err := n.conf.connDetails.get(ctx) if err != nil { return err } diff --git a/internal/impl/nats/output.go b/internal/impl/nats/output.go index c95a3ac8d4..ec9943f125 100644 --- a/internal/impl/nats/output.go +++ b/internal/impl/nats/output.go @@ -2,16 +2,13 @@ package nats import ( "context" - "crypto/tls" "errors" "fmt" - "strings" "sync" "github.com/nats-io/nats.go" "github.com/benthosdev/benthos/v4/internal/component/output/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/public/service" ) @@ -22,11 +19,8 @@ func natsOutputConfig() *service.ConfigSpec { Summary("Publish to an NATS subject."). Description(`This output will interpolate functions within the subject field, you can find a list of functions [here](/docs/configuration/interpolation#bloblang-queries). -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewInterpolatedStringField("subject"). Description("The subject to publish to."). Example("foo.bar.baz")). @@ -43,8 +37,7 @@ func natsOutputConfig() *service.ConfigSpec { Field(service.NewIntField("max_in_flight"). Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput."). Default(64)). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())). + Fields(connectionTailFields()...). Field(span.InjectTracingSpanMappingDocs().Version(tracingVersion)) } @@ -70,17 +63,13 @@ func init() { } type natsWriter struct { - label string - urls string + connDetails connectionDetails headers map[string]*service.InterpolatedString metaFilter *service.MetadataFilter subjectStr *service.InterpolatedString subjectStrRaw string - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS natsConn *nats.Conn connMut sync.RWMutex @@ -88,16 +77,14 @@ type natsWriter struct { func newNATSWriter(conf *service.ParsedConfig, mgr *service.Resources) (*natsWriter, error) { n := natsWriter{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), headers: make(map[string]*service.InterpolatedString), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + + var err error + if n.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - n.urls = strings.Join(urlList, ",") if n.subjectStrRaw, err = conf.FieldString("subject"); err != nil { return nil, err @@ -116,18 +103,6 @@ func newNATSWriter(conf *service.ParsedConfig, mgr *service.Resources) (*natsWri return nil, err } } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - n.tlsConf = tlsConf - } - - if n.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } return &n, nil } @@ -140,23 +115,11 @@ func (n *natsWriter) Connect(ctx context.Context) error { } var err error - var opts []nats.Option - - if n.tlsConf != nil { - opts = append(opts, nats.Secure(n.tlsConf)) - } - - opts = append(opts, nats.Name(n.label)) - opts = append(opts, authConfToOptions(n.authConf, n.fs)...) - opts = append(opts, errorHandlerOption(n.log)) - - if n.natsConn, err = nats.Connect(n.urls, opts...); err != nil { + if n.natsConn, err = n.connDetails.get(ctx); err != nil { return err } - if err == nil { - n.log.Infof("Sending NATS messages to subject: %v\n", n.subjectStrRaw) - } + n.log.Infof("Sending NATS messages to subject: %v\n", n.subjectStrRaw) return err } diff --git a/internal/impl/nats/output_jetstream.go b/internal/impl/nats/output_jetstream.go index 8abb26d3c3..1ac87cfb5a 100644 --- a/internal/impl/nats/output_jetstream.go +++ b/internal/impl/nats/output_jetstream.go @@ -2,15 +2,12 @@ package nats import ( "context" - "crypto/tls" "fmt" - "strings" "sync" "github.com/nats-io/nats.go" "github.com/benthosdev/benthos/v4/internal/component/output/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/internal/shutdown" "github.com/benthosdev/benthos/v4/public/service" ) @@ -21,11 +18,8 @@ func natsJetStreamOutputConfig() *service.ConfigSpec { Categories("Services"). Version("3.46.0"). Summary("Write messages to a NATS JetStream subject."). - Description(ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). + Description(ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewInterpolatedStringField("subject"). Description("A subject to write to."). Example("foo.bar.baz"). @@ -44,8 +38,7 @@ func natsJetStreamOutputConfig() *service.ConfigSpec { Field(service.NewIntField("max_in_flight"). Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput."). Default(1024)). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())). + Fields(connectionTailFields()...). Field(span.InjectTracingSpanMappingDocs().Version(tracingVersion)) } @@ -72,17 +65,13 @@ func init() { //------------------------------------------------------------------------------ type jetStreamOutput struct { - label string - urls string + connDetails connectionDetails subjectStrRaw string subjectStr *service.InterpolatedString headers map[string]*service.InterpolatedString metaFilter *service.MetadataFilter - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS connMut sync.Mutex natsConn *nats.Conn @@ -93,17 +82,14 @@ type jetStreamOutput struct { func newJetStreamWriterFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*jetStreamOutput, error) { j := jetStreamOutput{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), shutSig: shutdown.NewSignaller(), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if j.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - j.urls = strings.Join(urlList, ",") if j.subjectStrRaw, err = conf.FieldString("subject"); err != nil { return nil, err @@ -122,18 +108,6 @@ func newJetStreamWriterFromConfig(conf *service.ParsedConfig, mgr *service.Resou return nil, err } } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - j.tlsConf = tlsConf - } - - if j.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } return &j, nil } @@ -156,14 +130,7 @@ func (j *jetStreamOutput) Connect(ctx context.Context) (err error) { } }() - var opts []nats.Option - if j.tlsConf != nil { - opts = append(opts, nats.Secure(j.tlsConf)) - } - opts = append(opts, nats.Name(j.label)) - opts = append(opts, authConfToOptions(j.authConf, j.fs)...) - opts = append(opts, errorHandlerOption(j.log)) - if natsConn, err = nats.Connect(j.urls, opts...); err != nil { + if natsConn, err = j.connDetails.get(ctx); err != nil { return err } diff --git a/internal/impl/nats/output_jetstream_test.go b/internal/impl/nats/output_jetstream_test.go index 3307763c4d..2f97b261bf 100644 --- a/internal/impl/nats/output_jetstream_test.go +++ b/internal/impl/nats/output_jetstream_test.go @@ -35,7 +35,7 @@ auth: msg := service.NewMessage((nil)) msg.MetaSet("Timestamp", "1651485106") - assert.Equal(t, "url1,url2", e.urls) + assert.Equal(t, "url1,url2", e.connDetails.urls) subject, err := e.subjectStr.TryString(msg) require.NoError(t, err) @@ -49,10 +49,10 @@ auth: require.NoError(t, err) assert.Equal(t, "1651485106", timestamp) - assert.Equal(t, "test auth n key file", e.authConf.NKeyFile) - assert.Equal(t, "test auth user creds file", e.authConf.UserCredentialsFile) - assert.Equal(t, "test auth inline user JWT", e.authConf.UserJWT) - assert.Equal(t, "test auth inline user NKey Seed", e.authConf.UserNkeySeed) + assert.Equal(t, "test auth n key file", e.connDetails.authConf.NKeyFile) + assert.Equal(t, "test auth user creds file", e.connDetails.authConf.UserCredentialsFile) + assert.Equal(t, "test auth inline user JWT", e.connDetails.authConf.UserJWT) + assert.Equal(t, "test auth inline user NKey Seed", e.connDetails.authConf.UserNkeySeed) }) t.Run("Missing user_nkey_seed", func(t *testing.T) { diff --git a/internal/impl/nats/output_kv.go b/internal/impl/nats/output_kv.go index 28ca402849..511f303783 100644 --- a/internal/impl/nats/output_kv.go +++ b/internal/impl/nats/output_kv.go @@ -2,13 +2,10 @@ package nats import ( "context" - "crypto/tls" - "strings" "sync" "github.com/nats-io/nats.go" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/internal/shutdown" "github.com/benthosdev/benthos/v4/public/service" ) @@ -24,11 +21,8 @@ The field ` + "`key`" + ` supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries), allowing you to create a unique key for each message. -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewStringField("bucket"). Description("The name of the KV bucket to operate on."). Example("my_kv_bucket")). @@ -40,8 +34,7 @@ you to create a unique key for each message. Field(service.NewIntField("max_in_flight"). Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput."). Default(1024)). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())) + Fields(connectionTailFields()...) } func init() { @@ -63,17 +56,12 @@ func init() { //------------------------------------------------------------------------------ type kvOutput struct { - label string - urls string - bucket string - key *service.InterpolatedString - keyRaw string - - authConf auth.Config - tlsConf *tls.Config + connDetails connectionDetails + bucket string + key *service.InterpolatedString + keyRaw string log *service.Logger - fs *service.FS connMut sync.Mutex natsConn *nats.Conn @@ -84,17 +72,14 @@ type kvOutput struct { func newKVOutput(conf *service.ParsedConfig, mgr *service.Resources) (*kvOutput, error) { kv := kvOutput{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), shutSig: shutdown.NewSignaller(), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if kv.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - kv.urls = strings.Join(urlList, ",") if kv.bucket, err = conf.FieldString("bucket"); err != nil { return nil, err @@ -107,18 +92,6 @@ func newKVOutput(conf *service.ParsedConfig, mgr *service.Resources) (*kvOutput, if kv.key, err = conf.FieldInterpolatedString("key"); err != nil { return nil, err } - - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - kv.tlsConf = tlsConf - } - - if kv.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } return &kv, nil } @@ -140,13 +113,7 @@ func (kv *kvOutput) Connect(ctx context.Context) (err error) { } }() - var opts []nats.Option - if kv.tlsConf != nil { - opts = append(opts, nats.Secure(kv.tlsConf)) - } - opts = append(opts, nats.Name(kv.label)) - opts = append(opts, authConfToOptions(kv.authConf, kv.fs)...) - if natsConn, err = nats.Connect(kv.urls, opts...); err != nil { + if natsConn, err = kv.connDetails.get(ctx); err != nil { return err } diff --git a/internal/impl/nats/output_stream.go b/internal/impl/nats/output_stream.go index 5a7c7e3f97..4c40e5d2e6 100644 --- a/internal/impl/nats/output_stream.go +++ b/internal/impl/nats/output_stream.go @@ -2,11 +2,9 @@ package nats import ( "context" - "crypto/tls" "errors" "fmt" "math/rand" - "strings" "sync" "time" @@ -15,7 +13,6 @@ import ( "github.com/benthosdev/benthos/v4/internal/component/output" "github.com/benthosdev/benthos/v4/internal/component/output/span" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/public/service" ) @@ -30,17 +27,14 @@ const ( ) type soConfig struct { - URLs []string - ClusterID string - ClientID string - Subject string - TLS *tls.Config - TLSEnabled bool - Auth auth.Config + connDetails connectionDetails + ClusterID string + ClientID string + Subject string } -func soConfigFromParsed(pConf *service.ParsedConfig) (conf soConfig, err error) { - if conf.URLs, err = pConf.FieldStringList(soFieldURLs); err != nil { +func soConfigFromParsed(pConf *service.ParsedConfig, mgr *service.Resources) (conf soConfig, err error) { + if conf.connDetails, err = connectionDetailsFromParsed(pConf, mgr); err != nil { return } if conf.ClusterID, err = pConf.FieldString(soFieldClusterID); err != nil { @@ -52,12 +46,6 @@ func soConfigFromParsed(pConf *service.ParsedConfig) (conf soConfig, err error) if conf.Subject, err = pConf.FieldString(soFieldSubject); err != nil { return } - if conf.TLS, conf.TLSEnabled, err = pConf.FieldTLSToggled(soFieldTLS); err != nil { - return - } - if conf.Auth, err = AuthFromParsedConfig(pConf.Namespace(soFieldAuth)); err != nil { - return - } return } @@ -71,12 +59,9 @@ func soSpec() *service.ConfigSpec { The NATS Streaming Server is being deprecated. Critical bug fixes and security fixes will be applied until June of 2023. NATS-enabled applications requiring persistence should use [JetStream](https://docs.nats.io/nats-concepts/jetstream). ::: -`+output.Description(true, false, auth.Description())). +`+output.Description(true, false, authDescription())). + Fields(connectionHeadFields()...). Fields( - service.NewStringListField(soFieldURLs). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"}), service.NewStringField(soFieldClusterID). Description("The cluster ID to publish to."), service.NewStringField(soFieldSubject). @@ -86,8 +71,9 @@ The NATS Streaming Server is being deprecated. Critical bug fixes and security f Default(""), service.NewOutputMaxInFlightField(). Description("The maximum number of messages to have in flight at a given time. Increase this to improve throughput."), - service.NewTLSToggledField(soFieldTLS), - service.NewInternalField(auth.FieldSpec()), + ). + Fields(connectionTailFields()...). + Fields( span.InjectTracingSpanMappingDocs().Version(tracingVersion), ) } @@ -96,7 +82,7 @@ func init() { err := service.RegisterOutput( "nats_stream", soSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Output, int, error) { - pConf, err := soConfigFromParsed(conf) + pConf, err := soConfigFromParsed(conf, mgr) if err != nil { return nil, 0, err } @@ -124,7 +110,6 @@ type natsStreamWriter struct { natsConn *nats.Conn connMut sync.RWMutex - urls string conf soConfig } @@ -143,7 +128,6 @@ func newNATSStreamWriter(conf soConfig, mgr *service.Resources) (*natsStreamWrit fs: service.NewFS(mgr.FS()), conf: conf, } - n.urls = strings.Join(conf.URLs, ",") return &n, nil } @@ -155,15 +139,7 @@ func (n *natsStreamWriter) Connect(ctx context.Context) error { return nil } - var opts []nats.Option - if n.conf.TLSEnabled && n.conf.TLS != nil { - opts = append(opts, nats.Secure(n.conf.TLS)) - } - - opts = append(opts, authConfToOptions(n.conf.Auth, n.fs)...) - opts = append(opts, errorHandlerOption(n.log)) - - natsConn, err := nats.Connect(n.urls, opts...) + natsConn, err := n.conf.connDetails.get(ctx) if err != nil { return err } diff --git a/internal/impl/nats/processor_kv.go b/internal/impl/nats/processor_kv.go index 2a7ca5ea0d..c1702670c1 100644 --- a/internal/impl/nats/processor_kv.go +++ b/internal/impl/nats/processor_kv.go @@ -2,15 +2,12 @@ package nats import ( "context" - "crypto/tls" "fmt" "strconv" - "strings" "sync" "github.com/nats-io/nats.go" - "github.com/benthosdev/benthos/v4/internal/impl/nats/auth" "github.com/benthosdev/benthos/v4/internal/shutdown" "github.com/benthosdev/benthos/v4/public/service" ) @@ -77,11 +74,8 @@ This processor adds the following metadata fields to each message, depending on - nats_kv_bucket ` + "```" + ` -` + ConnectionNameDescription() + auth.Description()). - Field(service.NewStringListField("urls"). - Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). - Example([]string{"nats://127.0.0.1:4222"}). - Example([]string{"nats://username:password@127.0.0.1:4222"})). +` + ConnectionNameDescription() + authDescription()). + Fields(connectionHeadFields()...). Field(service.NewStringField("bucket"). Description("The name of the KV bucket to watch for updates."). Example("my_kv_bucket")). @@ -100,8 +94,7 @@ This processor adds the following metadata fields to each message, depending on Example(`${! @nats_kv_revision }`). Optional(). Advanced()). - Field(service.NewTLSToggledField("tls")). - Field(service.NewInternalField(auth.FieldSpec())). + Fields(connectionTailFields()...). LintRule(`root = match { ["get_revision", "update"].contains(this.operation) && !this.exists("revision") => [ "'revision' must be set when operation is '" + this.operation + "'" ], !["get_revision", "update"].contains(this.operation) && this.exists("revision") => [ "'revision' cannot be set when operation is '" + this.operation + "'" ], @@ -123,19 +116,15 @@ func init() { } type kvProcessor struct { - label string - urls string + connDetails connectionDetails bucket string operation string key *service.InterpolatedString keyRaw string revision *service.InterpolatedString revisionRaw string - authConf auth.Config - tlsConf *tls.Config log *service.Logger - fs *service.FS shutSig *shutdown.Signaller @@ -146,17 +135,14 @@ type kvProcessor struct { func newKVProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*kvProcessor, error) { p := &kvProcessor{ - label: mgr.Label(), log: mgr.Logger(), - fs: mgr.FS(), shutSig: shutdown.NewSignaller(), } - urlList, err := conf.FieldStringList("urls") - if err != nil { + var err error + if p.connDetails, err = connectionDetailsFromParsed(conf, mgr); err != nil { return nil, err } - p.urls = strings.Join(urlList, ",") if p.bucket, err = conf.FieldString("bucket"); err != nil { return nil, err @@ -184,20 +170,7 @@ func newKVProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*kvProc } } - tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") - if err != nil { - return nil, err - } - if tlsEnabled { - p.tlsConf = tlsConf - } - - if p.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil { - return nil, err - } - err = p.Connect(context.Background()) - return p, err } @@ -363,13 +336,7 @@ func (p *kvProcessor) Connect(ctx context.Context) (err error) { } }() - var opts []nats.Option - if p.tlsConf != nil { - opts = append(opts, nats.Secure(p.tlsConf)) - } - opts = append(opts, nats.Name(p.label)) - opts = append(opts, authConfToOptions(p.authConf, p.fs)...) - if p.natsConn, err = nats.Connect(p.urls, opts...); err != nil { + if p.natsConn, err = p.connDetails.get(ctx); err != nil { return err } diff --git a/internal/impl/otlp/tracer_otlp.go b/internal/impl/otlp/tracer_otlp.go index 460e211dc8..2f369c3340 100644 --- a/internal/impl/otlp/tracer_otlp.go +++ b/internal/impl/otlp/tracer_otlp.go @@ -2,6 +2,7 @@ package otlp import ( "context" + "errors" "time" "go.opentelemetry.io/otel/attribute" @@ -18,20 +19,30 @@ import ( "github.com/benthosdev/benthos/v4/public/service" ) -func init() { - spec := service.NewConfigSpec(). +func oltpSpec() *service.ConfigSpec { + return service.NewConfigSpec(). Summary("Send tracing events to an [Open Telemetry collector](https://opentelemetry.io/docs/collector/)."). Field(service.NewObjectListField("http", - service.NewURLField("url"). + service.NewStringField("address"). + Description("The endpoint of a collector to send tracing events to."). + Optional(). + Example("localhost:4318"), + service.NewStringField("url"). Description("The URL of a collector to send tracing events to."). + Deprecated(). Default("localhost:4318"), service.NewBoolField("secure"). Description("Connect to the collector over HTTPS"). Default(false), ).Description("A list of http collectors.")). Field(service.NewObjectListField("grpc", + service.NewURLField("address"). + Description("The endpoint of a collector to send tracing events to."). + Optional(). + Example("localhost:4317"), service.NewURLField("url"). Description("The URL of a collector to send tracing events to."). + Deprecated(). Default("localhost:4317"), service.NewBoolField("secure"). Description("Connect to the collector with client transport security"). @@ -51,12 +62,13 @@ func init() { Optional()). Description("Settings for trace sampling. Sampling is recommended for high-volume production workloads."). Version("4.25.0")) +} +func init() { err := service.RegisterOtelTracerProvider( - "open_telemetry_collector", - spec, + "open_telemetry_collector", oltpSpec(), func(conf *service.ParsedConfig) (trace.TracerProvider, error) { - c, err := newOtlpConfig(conf) + c, err := oltpConfigFromParsed(conf) if err != nil { return nil, err } @@ -68,8 +80,8 @@ func init() { } type collector struct { - url string - secure bool + address string + secure bool } type sampleConfig struct { @@ -84,7 +96,7 @@ type otlp struct { sampling sampleConfig } -func newOtlpConfig(conf *service.ParsedConfig) (*otlp, error) { +func oltpConfigFromParsed(conf *service.ParsedConfig) (*otlp, error) { http, err := collectors(conf, "http") if err != nil { return nil, err @@ -120,9 +132,11 @@ func collectors(conf *service.ParsedConfig, name string) ([]collector, error) { } collectors := make([]collector, 0, len(list)) for _, pc := range list { - u, err := pc.FieldString("url") - if err != nil { - return nil, err + u, _ := pc.FieldString("address") + if u == "" { + if u, _ = pc.FieldString("url"); u == "" { + return nil, errors.New("an address must be specified") + } } secure, err := pc.FieldBool("secure") @@ -131,8 +145,8 @@ func collectors(conf *service.ParsedConfig, name string) ([]collector, error) { } collectors = append(collectors, collector{ - url: u, - secure: secure, + address: u, + secure: secure, }) } return collectors, nil @@ -205,7 +219,7 @@ func addGrpcCollectors(ctx context.Context, collectors []collector, opts []trace for _, c := range collectors { clientOpts := []otlptracegrpc.Option{ - otlptracegrpc.WithEndpoint(c.url), + otlptracegrpc.WithEndpoint(c.address), } if !c.secure { @@ -227,7 +241,7 @@ func addHTTPCollectors(ctx context.Context, collectors []collector, opts []trace for _, c := range collectors { clientOpts := []otlptracehttp.Option{ - otlptracehttp.WithEndpoint(c.url), + otlptracehttp.WithEndpoint(c.address), } if !c.secure { diff --git a/internal/impl/otlp/tracer_otlp_test.go b/internal/impl/otlp/tracer_otlp_test.go new file mode 100644 index 0000000000..be392f6299 --- /dev/null +++ b/internal/impl/otlp/tracer_otlp_test.go @@ -0,0 +1,90 @@ +package otlp + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfigParsingAddresses(t *testing.T) { + pConf, err := oltpSpec().ParseYAML(` +http: + - address: foo:123 + - address: foo:456 + secure: true + - {} +grpc: + - address: bar:123 + - address: bar:456 + secure: true + - {} +sampling: + enabled: true + ratio: 0.55 +`, nil) + require.NoError(t, err) + + cConf, err := oltpConfigFromParsed(pConf) + require.NoError(t, err) + + assert.Equal(t, true, cConf.sampling.enabled) + assert.Equal(t, 0.55, cConf.sampling.ratio) + + require.Len(t, cConf.http, 3) + assert.Equal(t, "foo:123", cConf.http[0].address) + assert.Equal(t, false, cConf.http[0].secure) + assert.Equal(t, "foo:456", cConf.http[1].address) + assert.Equal(t, true, cConf.http[1].secure) + assert.Equal(t, "localhost:4318", cConf.http[2].address) + assert.Equal(t, false, cConf.http[2].secure) + + require.Len(t, cConf.grpc, 3) + assert.Equal(t, "bar:123", cConf.grpc[0].address) + assert.Equal(t, false, cConf.grpc[0].secure) + assert.Equal(t, "bar:456", cConf.grpc[1].address) + assert.Equal(t, true, cConf.grpc[1].secure) + assert.Equal(t, "localhost:4317", cConf.grpc[2].address) + assert.Equal(t, false, cConf.grpc[2].secure) +} + +func TestConfigParsingDeprecated(t *testing.T) { + pConf, err := oltpSpec().ParseYAML(` +http: + - url: foo:123 + - url: foo:456 + secure: true + - {} +grpc: + - url: bar:123 + - url: bar:456 + secure: true + - {} +sampling: + enabled: true + ratio: 0.55 +`, nil) + require.NoError(t, err) + + cConf, err := oltpConfigFromParsed(pConf) + require.NoError(t, err) + + assert.Equal(t, true, cConf.sampling.enabled) + assert.Equal(t, 0.55, cConf.sampling.ratio) + + require.Len(t, cConf.http, 3) + assert.Equal(t, "foo:123", cConf.http[0].address) + assert.Equal(t, false, cConf.http[0].secure) + assert.Equal(t, "foo:456", cConf.http[1].address) + assert.Equal(t, true, cConf.http[1].secure) + assert.Equal(t, "localhost:4318", cConf.http[2].address) + assert.Equal(t, false, cConf.http[2].secure) + + require.Len(t, cConf.grpc, 3) + assert.Equal(t, "bar:123", cConf.grpc[0].address) + assert.Equal(t, false, cConf.grpc[0].secure) + assert.Equal(t, "bar:456", cConf.grpc[1].address) + assert.Equal(t, true, cConf.grpc[1].secure) + assert.Equal(t, "localhost:4317", cConf.grpc[2].address) + assert.Equal(t, false, cConf.grpc[2].secure) +} diff --git a/internal/impl/pure/input_generate.go b/internal/impl/pure/input_generate.go index 95214d5a81..a07ebc6699 100644 --- a/internal/impl/pure/input_generate.go +++ b/internal/impl/pure/input_generate.go @@ -104,24 +104,24 @@ func init() { //------------------------------------------------------------------------------ type generateReader struct { - remaining int - batchSize int - limited bool - firstIsFree bool - exec *mapping.Executor - timer *time.Ticker - schedule *cron.Schedule - location *time.Location + remaining int + batchSize int + limited bool + firstIsFree bool + exec *mapping.Executor + timer *time.Ticker + schedule *cron.Schedule + schedulePrev *time.Time } func newGenerateReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*generateReader, error) { var ( - duration time.Duration - timer *time.Ticker - schedule *cron.Schedule - location *time.Location - err error - firstIsFree = true + duration time.Duration + timer *time.Ticker + schedule *cron.Schedule + schedulePrev *time.Time + err error + firstIsFree = true ) mappingStr, err := conf.FieldString(giFieldMapping) @@ -138,11 +138,16 @@ func newGenerateReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewManag if duration, err = time.ParseDuration(intervalStr); err != nil { // interval is not a duration so try to parse as a cron expression var cerr error - if schedule, location, cerr = parseCronExpression(intervalStr); cerr != nil { + if schedule, cerr = parseCronExpression(intervalStr); cerr != nil { return nil, fmt.Errorf("failed to parse interval as duration string: %v, or as cron expression: %w", err, cerr) } firstIsFree = false - duration = getDurationTillNextSchedule(time.Now(), *schedule, location) + + tNext := (*schedule).Next(time.Now()) + if duration = time.Until(tNext); duration < 1 { + duration = 1 + } + schedulePrev = &tNext } if duration > 0 { timer = time.NewTicker(duration) @@ -167,47 +172,30 @@ func newGenerateReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewManag } return &generateReader{ - exec: exec, - remaining: count, - batchSize: batchSize, - limited: count > 0, - timer: timer, - schedule: schedule, - location: location, - firstIsFree: firstIsFree, + exec: exec, + remaining: count, + batchSize: batchSize, + limited: count > 0, + timer: timer, + schedule: schedule, + schedulePrev: schedulePrev, + firstIsFree: firstIsFree, }, nil } -func getDurationTillNextSchedule(previous time.Time, schedule cron.Schedule, location *time.Location) time.Duration { - tUntil := time.Until(schedule.Next(previous)) - if tUntil < 1 { - return 1 - } - return tUntil -} - -func parseCronExpression(cronExpression string) (*cron.Schedule, *time.Location, error) { +func parseCronExpression(cronExpression string) (*cron.Schedule, error) { // If time zone is not included, set default to UTC if !strings.HasPrefix(cronExpression, "TZ=") { cronExpression = fmt.Sprintf("TZ=%s %s", "UTC", cronExpression) } - end := strings.Index(cronExpression, " ") - eq := strings.Index(cronExpression, "=") - tz := cronExpression[eq+1 : end] - - loc, err := time.LoadLocation(tz) - if err != nil { - return nil, nil, err - } parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) - cronSchedule, err := parser.Parse(cronExpression) if err != nil { - return nil, nil, err + return nil, err } - return &cronSchedule, loc, nil + return &cronSchedule, nil } // Connect establishes a Bloblang reader. @@ -234,7 +222,19 @@ func (b *generateReader) ReadBatch(ctx context.Context) (message.Batch, input.As return nil, nil, component.ErrTypeClosed } if b.schedule != nil { - b.timer.Reset(getDurationTillNextSchedule(t, *b.schedule, b.location)) + if b.schedulePrev != nil { + t = *b.schedulePrev + } + + tNext := (*b.schedule).Next(t) + tNow := time.Now() + duration := tNext.Sub(tNow) + if duration < 1 { + duration = 1 + } + + b.schedulePrev = &tNext + b.timer.Reset(duration) } case <-ctx.Done(): return nil, nil, component.ErrTimeout diff --git a/internal/impl/pure/input_generate_test.go b/internal/impl/pure/input_generate_test.go index d4940268a7..4478729929 100644 --- a/internal/impl/pure/input_generate_test.go +++ b/internal/impl/pure/input_generate_test.go @@ -72,7 +72,6 @@ interval: '@every 1s' `) assert.NotNil(t, b.schedule) - assert.NotNil(t, b.location) err := b.Connect(ctx) require.NoError(t, err) diff --git a/website/docs/components/inputs/mongodb.md b/website/docs/components/inputs/mongodb.md index 4aaa0bbd4a..bcb4057a9f 100644 --- a/website/docs/components/inputs/mongodb.md +++ b/website/docs/components/inputs/mongodb.md @@ -18,7 +18,7 @@ import TabItem from '@theme/TabItem'; :::caution EXPERIMENTAL This component is experimental and therefore subject to change or removal outside of major version releases. ::: -Executes a find query and creates a message for each row received. +Executes a query and creates a message for each document received. Introduced in version 3.64.0. @@ -43,6 +43,9 @@ input: query: |2 # No default (required) root.from = {"$lte": timestamp_unix()} root.to = {"$gte": timestamp_unix()} + batch_size: 1000 # No default (optional) + sort: {} # No default (optional) + limit: 0 # No default (optional) ``` @@ -63,12 +66,15 @@ input: query: |2 # No default (required) root.from = {"$lte": timestamp_unix()} root.to = {"$gte": timestamp_unix()} + batch_size: 1000 # No default (optional) + sort: {} # No default (optional) + limit: 0 # No default (optional) ``` -Once the rows from the query are exhausted this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a [sequence](/docs/components/inputs/sequence) to execute). +Once the documents from the query are exhausted, this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a [sequence](/docs/components/inputs/sequence) to execute). ## Fields @@ -158,4 +164,44 @@ query: |2 root.to = {"$gte": timestamp_unix()} ``` +### `batch_size` + +A explicit number of documents to batch up before flushing them for processing. Must be greater than `0`. Operations: `find`, `aggregate` + + +Type: `int` +Requires version 4.26.0 or newer + +```yml +# Examples + +batch_size: 1000 +``` + +### `sort` + +An object specifying fields to sort by, and the respective sort order (`1` ascending, `-1` descending). Note: The driver currently appears to support only one sorting key. Operations: `find` + + +Type: `object` +Requires version 4.26.0 or newer + +```yml +# Examples + +sort: + name: 1 + +sort: + age: -1 +``` + +### `limit` + +An explicit maximum number of documents to return. Operations: `find` + + +Type: `int` +Requires version 4.26.0 or newer + diff --git a/website/docs/components/outputs/kafka.md b/website/docs/components/outputs/kafka.md index af48acd005..8d8a5901e1 100644 --- a/website/docs/components/outputs/kafka.md +++ b/website/docs/components/outputs/kafka.md @@ -87,6 +87,7 @@ output: exclude_prefixes: [] inject_tracing_map: meta = @.merge(this) # No default (optional) max_in_flight: 64 + idempotent_write: false ack_replicas: false max_msg_bytes: 1000000 timeout: 5s @@ -541,6 +542,14 @@ The maximum number of messages to have in flight at a given time. Increase this Type: `int` Default: `64` +### `idempotent_write` + +Enable the idempotent write producer option. This requires the `IDEMPOTENT_WRITE` permission on `CLUSTER` and can be disabled if this permission is not available. + + +Type: `bool` +Default: `false` + ### `ack_replicas` Ensure that messages have been copied across all replicas before acknowledging receipt. diff --git a/website/docs/components/outputs/mongodb.md b/website/docs/components/outputs/mongodb.md index 6ab559bf6c..38c19ced3a 100644 --- a/website/docs/components/outputs/mongodb.md +++ b/website/docs/components/outputs/mongodb.md @@ -192,7 +192,7 @@ Default: `""` ### `document_map` -A bloblang map representing the records in the mongo db. Used to generate the document for mongodb by mapping the fields in the message to the mongodb fields. The document map is required for the operations insert-one, replace-one and update-one. +A bloblang map representing a document to store within MongoDB, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The document map is required for the operations insert-one, replace-one and update-one. Type: `string` @@ -208,7 +208,7 @@ document_map: |- ### `filter_map` -A bloblang map representing the filter for the mongo db command. The filter map is required for all operations except insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should have the fields required to locate the document to delete. +A bloblang map representing a filter for a MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The filter map is required for all operations except insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should have the fields required to locate the document to delete. Type: `string` @@ -224,7 +224,7 @@ filter_map: |- ### `hint_map` -A bloblang map representing the hint for the mongo db command. This map is optional and is used with all operations except insert-one. It is used to improve performance of finding the documents in the mongodb. +A bloblang map representing the hint for the MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). This map is optional and is used with all operations except insert-one. It is used to improve performance of finding the documents in the mongodb. Type: `string` diff --git a/website/docs/components/processors/mongodb.md b/website/docs/components/processors/mongodb.md index 00bddcb56a..0d151a6137 100644 --- a/website/docs/components/processors/mongodb.md +++ b/website/docs/components/processors/mongodb.md @@ -167,7 +167,7 @@ Default: `""` ### `document_map` -A bloblang map representing the records in the mongo db. Used to generate the document for mongodb by mapping the fields in the message to the mongodb fields. The document map is required for the operations insert-one, replace-one and update-one. +A bloblang map representing a document to store within MongoDB, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The document map is required for the operations insert-one, replace-one and update-one. Type: `string` @@ -183,7 +183,7 @@ document_map: |- ### `filter_map` -A bloblang map representing the filter for the mongo db command. The filter map is required for all operations except insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should have the fields required to locate the document to delete. +A bloblang map representing a filter for a MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). The filter map is required for all operations except insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should have the fields required to locate the document to delete. Type: `string` @@ -199,7 +199,7 @@ filter_map: |- ### `hint_map` -A bloblang map representing the hint for the mongo db command. This map is optional and is used with all operations except insert-one. It is used to improve performance of finding the documents in the mongodb. +A bloblang map representing the hint for the MongoDB command, expressed as [extended JSON in canonical form](https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/). This map is optional and is used with all operations except insert-one. It is used to improve performance of finding the documents in the mongodb. Type: `string` diff --git a/website/docs/components/tracers/open_telemetry_collector.md b/website/docs/components/tracers/open_telemetry_collector.md index 2590f6d4a3..2b2e849622 100644 --- a/website/docs/components/tracers/open_telemetry_collector.md +++ b/website/docs/components/tracers/open_telemetry_collector.md @@ -65,13 +65,18 @@ A list of http collectors. Type: `array` -### `http[].url` +### `http[].address` -The URL of a collector to send tracing events to. +The endpoint of a collector to send tracing events to. Type: `string` -Default: `"localhost:4318"` + +```yml +# Examples + +address: localhost:4318 +``` ### `http[].secure` @@ -88,13 +93,18 @@ A list of grpc collectors. Type: `array` -### `grpc[].url` +### `grpc[].address` -The URL of a collector to send tracing events to. +The endpoint of a collector to send tracing events to. Type: `string` -Default: `"localhost:4317"` + +```yml +# Examples + +address: localhost:4317 +``` ### `grpc[].secure`