Skip to content

Commit

Permalink
feat: sync fork with v4.26.0 (#7)
Browse files Browse the repository at this point in the history
* Update README

* fix: always bind a given stream when SubscribeSync()

Signed-off-by: Anton Frank <[email protected]>

* Update docs

* Ensure we set the stream for jetstream

* Use newer scram package

* Fix batch timer panic

* Accept 'null' as 'false' in if statements (until v5)

* Add CRDB changefeed input

* Rework crdb changefeed input

* Update CHANGELOG

* Punt unused field

Also clean up .golangci.yml a bit.

Signed-off-by: Mihai Todor <[email protected]>

* Fix jsonschema root path resolution

* Fix metrics mapping config parsing

* Bump golangci/golangci-lint-action from 3 to 4

Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 3 to 4.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](golangci/golangci-lint-action@v3...v4)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>

* Bump the production-dependencies group with 24 updates

Bumps the production-dependencies group with 24 updates:

| Package | From | To |
| --- | --- | --- |
| [cloud.google.com/go/bigquery](https://github.com/googleapis/google-cloud-go) | `1.58.0` | `1.59.0` |
| [cloud.google.com/go/pubsub](https://github.com/googleapis/google-cloud-go) | `1.34.0` | `1.36.1` |
| [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) | `1.36.0` | `1.37.0` |
| [github.com/Azure/azure-sdk-for-go/sdk/azcore](https://github.com/Azure/azure-sdk-for-go) | `1.9.1` | `1.9.2` |
| [github.com/ClickHouse/clickhouse-go/v2](https://github.com/ClickHouse/clickhouse-go) | `2.17.1` | `2.18.0` |
| [github.com/IBM/sarama](https://github.com/IBM/sarama) | `1.42.1` | `1.42.2` |
| [github.com/apache/pulsar-client-go](https://github.com/apache/pulsar-client-go) | `0.11.1` | `0.12.0` |
| [github.com/aws/aws-lambda-go](https://github.com/aws/aws-lambda-go) | `1.45.0` | `1.46.0` |
| [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) | `1.50.0` | `1.50.15` |
| [github.com/getsentry/sentry-go](https://github.com/getsentry/sentry-go) | `0.26.0` | `0.27.0` |
| [github.com/go-faker/faker/v4](https://github.com/go-faker/faker) | `4.2.0` | `4.3.0` |
| [github.com/jhump/protoreflect](https://github.com/jhump/protoreflect) | `1.15.4` | `1.15.6` |
| [github.com/klauspost/compress](https://github.com/klauspost/compress) | `1.17.4` | `1.17.6` |
| [github.com/sijms/go-ora/v2](https://github.com/sijms/go-ora) | `2.8.6` | `2.8.7` |
| [github.com/twmb/franz-go](https://github.com/twmb/franz-go) | `1.16.0` | `1.16.1` |
| [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [go.opentelemetry.io/otel/exporters/otlp/otlptrace](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [go.opentelemetry.io/otel/sdk](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [go.opentelemetry.io/otel/trace](https://github.com/open-telemetry/opentelemetry-go) | `1.22.0` | `1.23.1` |
| [golang.org/x/net](https://github.com/golang/net) | `0.20.0` | `0.21.0` |
| [golang.org/x/oauth2](https://github.com/golang/oauth2) | `0.16.0` | `0.17.0` |
| [google.golang.org/api](https://github.com/googleapis/google-api-go-client) | `0.157.0` | `0.162.0` |


Updates `cloud.google.com/go/bigquery` from 1.58.0 to 1.59.0
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](googleapis/google-cloud-go@bigquery/v1.58.0...bigquery/v1.59.0)

Updates `cloud.google.com/go/pubsub` from 1.34.0 to 1.36.1
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](googleapis/google-cloud-go@pubsub/v1.34.0...pubsub/v1.36.1)

Updates `cloud.google.com/go/storage` from 1.36.0 to 1.37.0
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](googleapis/google-cloud-go@pubsub/v1.36.0...spanner/v1.37.0)

Updates `github.com/Azure/azure-sdk-for-go/sdk/azcore` from 1.9.1 to 1.9.2
- [Release notes](https://github.com/Azure/azure-sdk-for-go/releases)
- [Changelog](https://github.com/Azure/azure-sdk-for-go/blob/main/documentation/release.md)
- [Commits](Azure/azure-sdk-for-go@sdk/azcore/v1.9.1...sdk/azcore/v1.9.2)

Updates `github.com/ClickHouse/clickhouse-go/v2` from 2.17.1 to 2.18.0
- [Release notes](https://github.com/ClickHouse/clickhouse-go/releases)
- [Changelog](https://github.com/ClickHouse/clickhouse-go/blob/main/CHANGELOG.md)
- [Commits](ClickHouse/clickhouse-go@v2.17.1...v2.18.0)

Updates `github.com/IBM/sarama` from 1.42.1 to 1.42.2
- [Release notes](https://github.com/IBM/sarama/releases)
- [Changelog](https://github.com/IBM/sarama/blob/main/CHANGELOG.md)
- [Commits](IBM/sarama@v1.42.1...v1.42.2)

Updates `github.com/apache/pulsar-client-go` from 0.11.1 to 0.12.0
- [Release notes](https://github.com/apache/pulsar-client-go/releases)
- [Changelog](https://github.com/apache/pulsar-client-go/blob/master/CHANGELOG.md)
- [Commits](apache/pulsar-client-go@v0.11.1...v0.12.0)

Updates `github.com/aws/aws-lambda-go` from 1.45.0 to 1.46.0
- [Release notes](https://github.com/aws/aws-lambda-go/releases)
- [Commits](aws/aws-lambda-go@v1.45.0...v1.46.0)

Updates `github.com/aws/aws-sdk-go` from 1.50.0 to 1.50.15
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Commits](aws/aws-sdk-go@v1.50.0...v1.50.15)

Updates `github.com/getsentry/sentry-go` from 0.26.0 to 0.27.0
- [Release notes](https://github.com/getsentry/sentry-go/releases)
- [Changelog](https://github.com/getsentry/sentry-go/blob/master/CHANGELOG.md)
- [Commits](getsentry/sentry-go@v0.26.0...v0.27.0)

Updates `github.com/go-faker/faker/v4` from 4.2.0 to 4.3.0
- [Release notes](https://github.com/go-faker/faker/releases)
- [Commits](go-faker/faker@v4.2.0...v4.3.0)

Updates `github.com/jhump/protoreflect` from 1.15.4 to 1.15.6
- [Release notes](https://github.com/jhump/protoreflect/releases)
- [Commits](jhump/protoreflect@v1.15.4...v1.15.6)

Updates `github.com/klauspost/compress` from 1.17.4 to 1.17.6
- [Release notes](https://github.com/klauspost/compress/releases)
- [Changelog](https://github.com/klauspost/compress/blob/master/.goreleaser.yml)
- [Commits](klauspost/compress@v1.17.4...v1.17.6)

Updates `github.com/sijms/go-ora/v2` from 2.8.6 to 2.8.7
- [Release notes](https://github.com/sijms/go-ora/releases)
- [Commits](sijms/go-ora@v2.8.6...v2.8.7)

Updates `github.com/twmb/franz-go` from 1.16.0 to 1.16.1
- [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md)
- [Commits](twmb/franz-go@v1.16.0...v1.16.1)

Updates `go.opentelemetry.io/otel` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `go.opentelemetry.io/otel/sdk` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `go.opentelemetry.io/otel/trace` from 1.22.0 to 1.23.1
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.22.0...v1.23.1)

Updates `golang.org/x/net` from 0.20.0 to 0.21.0
- [Commits](golang/net@v0.20.0...v0.21.0)

Updates `golang.org/x/oauth2` from 0.16.0 to 0.17.0
- [Commits](golang/oauth2@v0.16.0...v0.17.0)

Updates `google.golang.org/api` from 0.157.0 to 0.162.0
- [Release notes](https://github.com/googleapis/google-api-go-client/releases)
- [Changelog](https://github.com/googleapis/google-api-go-client/blob/main/CHANGES.md)
- [Commits](googleapis/google-api-go-client@v0.157.0...v0.162.0)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/bigquery
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: cloud.google.com/go/pubsub
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: cloud.google.com/go/storage
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/Azure/azure-sdk-for-go/sdk/azcore
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/ClickHouse/clickhouse-go/v2
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/IBM/sarama
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/apache/pulsar-client-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/aws/aws-lambda-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/aws/aws-sdk-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/getsentry/sentry-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/go-faker/faker/v4
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/jhump/protoreflect
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/klauspost/compress
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/sijms/go-ora/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/twmb/franz-go
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/sdk
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/trace
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/net
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/oauth2
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: google.golang.org/api
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
...

Signed-off-by: dependabot[bot] <[email protected]>

* Mark version in CHANGELOG for RCs

* Update release notes script with release candidates section

* Migrate all aws components to v2 apis

* Add support for otlp sampling (redpanda-data#2364)

* add otlp sampling option

Signed-off-by: j0shthomas <[email protected]>

* slight docs change

Signed-off-by: j0shthomas <[email protected]>

* gen docs

Signed-off-by: j0shthomas <[email protected]>

* gofmt

Signed-off-by: j0shthomas <[email protected]>

* slight formatting

Signed-off-by: j0shthomas <[email protected]>

---------

Signed-off-by: j0shthomas <[email protected]>
Co-authored-by: joshuathomas97 <[email protected]>

* Tidy up some fields

* Add support for ARNs in kinesis components

* reduce kinesis output wait timer

* Favour faster error return for kinesis output connect

* Fix message structured set to null

* Add CosmosDB components and gocosmos SQL driver

This is a revamp of redpanda-data#1583, where I did the initial exploration to
add these components. Since then, [gocosmos](https://pkg.go.dev/github.com/microsoft/gocosmos)
became an official Microsoft SQL driver for CosmosDB and it is
currently marked as experimental, so I decided to include it as
well because it comes with some extra goodies, such as support for
[hierarchical partition keys](https://learn.microsoft.com/en-us/azure/cosmos-db/hierarchical-partition-keys)
and [cross-partition queries](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-query-container#cross-partition-query),
which are not currently supported by the [`azcosmos`](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos)
driver.

Signed-off-by: Mihai Todor <[email protected]>

* Update some of the docs fields

* Fix output resources blocking unit tests

* WIP: Add opensearch output

* Finish opensearch output, update deps

* Reduce bloblang parser allocations

* Simplify delimited parser

* Update CHANGELOG

* Add explicit slug to add components

* Fix template config parsing

* Fix non-generalised docs defaults

* Migrate parsers to use generics

* Fix diff and patch docs

* Add credit property to amqp1 input (redpanda-data#2357)

* Add support for max_in_flight messages (aka credit) for amqp_1 input

Signed-off-by: Andreas Habel <[email protected]>

* Update CHANGELOG

Signed-off-by: Andreas Habel <[email protected]>

* Changed name of description var to fix linting

Signed-off-by: Andreas Habel <[email protected]>

* Updated CHANGELOG and default value of credit

Signed-off-by: Andreas Habel <[email protected]>

* Updated docs

Signed-off-by: Andreas Habel <[email protected]>

---------

Signed-off-by: Andreas Habel <[email protected]>
Signed-off-by: Andreas Habel <[email protected]>
Co-authored-by: Andreas Habel <[email protected]>

* Prevent redundant bootstrap reattempts with studio sync

* Fix file input race condition on errored close

* Tidy up cockroachdb changefeed shutdown

* Pre-emptively recover sqlx close panics

* Add some examples to http_server input

* Add root level if statements

* Add sql cache

* MongoDB input - add batching

Signed-off-by: Brad Anderson <[email protected]>

* Tidy up docs and fields

* Fix double fire on @hourly generate intervals

* Fix fmt interpretted imports of configs

* Support extended JSON in mongodb mappings

* Rename url field on otel tracer targets to address

* Add idempotent_write field to kafka output

* Refactor and tidy up nats connections

* Tidy up all AWS integration tests

* Update CHANGELOG

---------

Signed-off-by: Anton Frank <[email protected]>
Signed-off-by: Mihai Todor <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: j0shthomas <[email protected]>
Signed-off-by: Andreas Habel <[email protected]>
Signed-off-by: Andreas Habel <[email protected]>
Signed-off-by: Brad Anderson <[email protected]>
Co-authored-by: Ashley Jeffs <[email protected]>
Co-authored-by: Anton Frank <[email protected]>
Co-authored-by: Dan Goodman <[email protected]>
Co-authored-by: Mihai Todor <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Joshua Thomas <[email protected]>
Co-authored-by: joshuathomas97 <[email protected]>
Co-authored-by: Arunachalam Lakshmanan <[email protected]>
Co-authored-by: andreas-habel <[email protected]>
Co-authored-by: Andreas Habel <[email protected]>
Co-authored-by: Brad Anderson <[email protected]>
  • Loading branch information
12 people authored Apr 18, 2024
1 parent 1b60014 commit b5425ce
Show file tree
Hide file tree
Showing 40 changed files with 1,034 additions and 837 deletions.
29 changes: 28 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,34 @@ Changelog

All notable changes to this project will be documented in this file.

## Unreleased
## 4.26.0 - 2024-03-18

### Added

- Field `credit` added to the `amqp_1` input to specify the maximum number of unacknowledged messages the sender can transmit.
- Bloblang now supports root-level `if` statements.
- New experimental `sql` cache.
- Fields `batch_size`, `sort` and `limit` added to the `mongodb` input.
- Field `idemponent_write` added to the `kafka` output.

### Changed

- The default value of the `amqp_1.credit` input has changed from `1` to `64`.
- The `mongodb` processor and output now support extended JSON in canonical form for document, filter and hint mappings.
- The `open_telemetry_collector` tracer has had the `url` field of gRPC and HTTP collectors deprecated in favour of `address`, which more accurately describes the intended format of endpoints. The old style will continue to work, but eventually will have its default value removed and an explicit value will be required.

### Fixed

- Resource config imports containing `%` characters were being incorrectly parsed during unit test execution. This was a regression introduced in v4.25.0.
- Dynamic input and output config updates containing `%` characters were being incorrectly parsed. This was a regression introduced in v4.25.0.

## 4.25.1 - 2024-03-01

### Fixed

- Fixed a regression in v4.25.0 where [template based components](https://www.benthos.dev/docs/configuration/templating) were not parsing correctly from configs.

## 4.25.0 - 2024-03-01

### Added

Expand Down
8 changes: 6 additions & 2 deletions internal/cli/test/processors_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/benthosdev/benthos/v4/internal/bloblang/parser"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/component/testutil"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/filepath/ifs"
Expand Down Expand Up @@ -345,7 +344,12 @@ func (p *ProcessorsProvider) getConfs(jsonPtr string, environment map[string]str
return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err)
}

extraMgrWrapper, err := testutil.ManagerFromYAML(string(resourceBytes))
confNode, err := docs.UnmarshalYAML(resourceBytes)
if err != nil {
return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err)
}

extraMgrWrapper, err := manager.FromAny(bundle.GlobalEnvironment, confNode)
if err != nil {
return confs, fmt.Errorf("failed to parse resources config file '%v': %v", path, err)
}
Expand Down
101 changes: 0 additions & 101 deletions internal/impl/aws/cache_s3_integration_test.go

This file was deleted.

44 changes: 7 additions & 37 deletions internal/impl/aws/integration_kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/integration"
Expand Down Expand Up @@ -71,35 +69,7 @@ func createKinesisShards(ctx context.Context, t testing.TB, awsPort, id string,
return shards, nil
}

func TestIntegrationAWSKinesis(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Minute * 2
if dline, ok := t.Deadline(); ok && time.Until(dline) < pool.MaxWait {
pool.MaxWait = time.Until(dline)
}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "localstack/localstack",
ExposedPorts: []string{"4566/tcp"},
Env: []string{"SERVICES=dynamodb,kinesis"},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

require.NoError(t, pool.Retry(func() error {
_, err := createKinesisShards(context.Background(), t, resource.GetPort("4566/tcp"), "testtable", 2)
return err
}))

func kinesisIntegrationSuite(t *testing.T, lsPort string) {
template := `
output:
aws_kinesis:
Expand Down Expand Up @@ -146,7 +116,7 @@ input:
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) {
streamName := "stream-" + testID
shards, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 2)
shards, err := createKinesisShards(ctx, t, lsPort, testID, 2)
require.NoError(t, err)

for i, shard := range shards {
Expand All @@ -157,7 +127,7 @@ input:
}
}
}),
integration.StreamTestOptPort(resource.GetPort("4566/tcp")),
integration.StreamTestOptPort(lsPort),
integration.StreamTestOptAllowDupes(),
integration.StreamTestOptVarTwo("10"),
)
Expand All @@ -167,10 +137,10 @@ input:
suite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) {
_, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 2)
_, err := createKinesisShards(ctx, t, lsPort, testID, 2)
require.NoError(t, err)
}),
integration.StreamTestOptPort(resource.GetPort("4566/tcp")),
integration.StreamTestOptPort(lsPort),
integration.StreamTestOptAllowDupes(),
integration.StreamTestOptVarTwo("10"),
)
Expand All @@ -182,11 +152,11 @@ input:
).Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) {
shards, err := createKinesisShards(ctx, t, resource.GetPort("4566/tcp"), testID, 1)
shards, err := createKinesisShards(ctx, t, lsPort, testID, 1)
require.NoError(t, err)
vars.Var1 = ":" + shards[0]
}),
integration.StreamTestOptPort(resource.GetPort("4566/tcp")),
integration.StreamTestOptPort(lsPort),
integration.StreamTestOptAllowDupes(),
integration.StreamTestOptVarTwo("10"),
)
Expand Down
Loading

0 comments on commit b5425ce

Please sign in to comment.