diff --git a/go.mod b/go.mod index 5c96367b1..05ffb76d1 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.22.9 require ( cloud.google.com/go/pubsub v1.45.3 github.com/BurntSushi/toml v1.4.0 - github.com/IBM/sarama v1.43.3 + github.com/IBM/sarama v1.44.0 github.com/NYTimes/gziphandler v1.1.1 github.com/OneOfOne/go-utils v0.0.0-20180319162427-6019ff89a94e github.com/dgryski/go-expirecache v0.0.0-20170314133854-743ef98b2adb @@ -71,7 +71,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo v1.14.0 // indirect github.com/onsi/gomega v1.10.1 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect diff --git a/go.sum b/go.sum index 780132e96..fbe2a8ed6 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ cloud.google.com/go/pubsub v1.45.3/go.mod h1:cGyloK/hXC4at7smAtxFnXprKEFTqmMXNNd github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= -github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= -github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= +github.com/IBM/sarama v1.44.0 h1:puNKqcScjSAgVLramjsuovZrS0nJZFVsrvuUymkWqhE= +github.com/IBM/sarama v1.44.0/go.mod h1:MxQ9SvGfvKIorbk077Ff6DUnBlGpidiQOtU2vuBaxVw= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/go-utils v0.0.0-20180319162427-6019ff89a94e h1:Kzs/MKSycSiJUW63f+BddSnX+3C5r+7JbHBV0b2wp50= @@ -199,8 +199,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/vendor/github.com/IBM/sarama/.golangci.yml b/vendor/github.com/IBM/sarama/.golangci.yml index 72e3e4c24..2e029401d 100644 --- a/vendor/github.com/IBM/sarama/.golangci.yml +++ b/vendor/github.com/IBM/sarama/.golangci.yml @@ -1,4 +1,5 @@ run: + go: "1.20" timeout: 5m deadline: 10m @@ -57,7 +58,7 @@ linters: enable: - bodyclose - depguard - - exportloopref + # - copyloopvar - dogsled - errcheck - errorlint @@ -79,6 +80,7 @@ linters: issues: exclude: + - "G115: integer overflow conversion" - "G404: Use of weak random number generator" exclude-rules: # exclude some linters from running on certains files. diff --git a/vendor/github.com/IBM/sarama/.pre-commit-config.yaml b/vendor/github.com/IBM/sarama/.pre-commit-config.yaml index 1869b8160..1e64cc0d8 100644 --- a/vendor/github.com/IBM/sarama/.pre-commit-config.yaml +++ b/vendor/github.com/IBM/sarama/.pre-commit-config.yaml @@ -2,7 +2,7 @@ fail_fast: false default_install_hook_types: [pre-commit, commit-msg] repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v5.0.0 hooks: - id: check-merge-conflict - id: check-yaml @@ -32,10 +32,10 @@ repos: files: \.go$ args: [] - repo: https://github.com/gitleaks/gitleaks - rev: v8.16.3 + rev: v8.21.2 hooks: - id: gitleaks - repo: https://github.com/golangci/golangci-lint - rev: v1.52.2 + rev: v1.61.0 hooks: - id: golangci-lint diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index 40f5f333b..b4d5c6acb 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,4 +1,4 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:de2a0a20c1c3b39c3de829196de9694d09f97cd18fda1004de855ed2b4c841ba +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:cf095e5668919ba1b4ace3888107684ad9d587b1830d3eb56973e6a54f456e67 USER root @@ -17,9 +17,9 @@ RUN cd /etc/java/java-11-openjdk/*/conf/security \ && echo 'networkaddress.cache.negative.ttl=0' >> java.security ARG SCALA_VERSION="2.13" -ARG KAFKA_VERSION="3.6.0" +ARG KAFKA_VERSION="3.6.2" -# https://github.com/apache/kafka/blob/9989b68d0d38c8f1357f78bf9d53a58c1476188d/tests/docker/Dockerfile#L46-L72 +# https://github.com/apache/kafka/blob/2e2b0a58eda3e677763af974a44a6aaa3c280214/tests/docker/Dockerfile#L77-L105 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" SHELL ["/bin/bash", "-o", "pipefail", "-c"] RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go index dcf1d7659..6549c7e6f 100644 --- a/vendor/github.com/IBM/sarama/admin.go +++ b/vendor/github.com/IBM/sarama/admin.go @@ -99,6 +99,9 @@ type ClusterAdmin interface { // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) + // ElectLeaders allows to trigger the election of preferred leaders for a set of partitions. + ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error) + // List the consumer groups available in the cluster. ListConsumerGroups() (map[string]string, error) @@ -907,6 +910,39 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi return mAcls, nil } +func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) { + request := &ElectLeadersRequest{ + Type: electionType, + TopicPartitions: partitions, + TimeoutMs: int32(60000), + } + + if ca.conf.Version.IsAtLeast(V2_4_0_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { + request.Version = 1 + } + + var res *ElectLeadersResponse + err := ca.retryOnError(isErrNotController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) + + res, err = b.ElectLeaders(request) + if isErrNotController(err) { + _, _ = ca.refreshController() + } + return err + }) + if err != nil { + return nil, err + } + return res.ReplicaElectionResults, nil +} + func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) { groupsPerBroker := make(map[*Broker][]string) diff --git a/vendor/github.com/IBM/sarama/async_producer.go b/vendor/github.com/IBM/sarama/async_producer.go index a6fa3d4a2..5f257524b 100644 --- a/vendor/github.com/IBM/sarama/async_producer.go +++ b/vendor/github.com/IBM/sarama/async_producer.go @@ -13,6 +13,13 @@ import ( "github.com/rcrowley/go-metrics" ) +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") + +// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function. +// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit. +const minFunctionalRetryBufferLength = 4 * 1024 + // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -1207,6 +1214,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { + maxBufferSize := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength { + maxBufferSize = minFunctionalRetryBufferLength + } + var msg *ProducerMessage buf := queue.New() @@ -1227,6 +1239,19 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + + if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + msgToHandle := buf.Peek().(*ProducerMessage) + if msgToHandle.flags == 0 { + select { + case p.input <- msgToHandle: + buf.Remove() + default: + buf.Remove() + p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) + } + } + } } } diff --git a/vendor/github.com/IBM/sarama/broker.go b/vendor/github.com/IBM/sarama/broker.go index d0d5b87b8..c4f1005f5 100644 --- a/vendor/github.com/IBM/sarama/broker.go +++ b/vendor/github.com/IBM/sarama/broker.go @@ -243,9 +243,9 @@ func (b *Broker) Open(conf *Config) error { if b.connErr != nil { err = b.conn.Close() if err == nil { - DebugLogger.Printf("Closed connection to broker %s\n", b.addr) + DebugLogger.Printf("Closed connection to broker %s due to SASL v0 auth error: %s\n", b.addr, b.connErr) } else { - Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) + Logger.Printf("Error while closing connection to broker %s (due to SASL v0 auth error: %s): %s\n", b.addr, b.connErr, err) } b.conn = nil atomic.StoreInt32(&b.opened, 0) @@ -264,9 +264,9 @@ func (b *Broker) Open(conf *Config) error { <-b.done err = b.conn.Close() if err == nil { - DebugLogger.Printf("Closed connection to broker %s\n", b.addr) + DebugLogger.Printf("Closed connection to broker %s due to SASL v1 auth error: %s\n", b.addr, b.connErr) } else { - Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) + Logger.Printf("Error while closing connection to broker %s (due to SASL v1 auth error: %s): %s\n", b.addr, b.connErr, err) } b.conn = nil atomic.StoreInt32(&b.opened, 0) @@ -689,6 +689,18 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR return response, nil } +// ElectLeaders sends aa elect leaders request and returns list partitions elect result +func (b *Broker) ElectLeaders(request *ElectLeadersRequest) (*ElectLeadersResponse, error) { + response := new(ElectLeadersResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + // DeleteRecords send a request to delete records and return delete record // response or error func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { @@ -1242,12 +1254,12 @@ func (b *Broker) authenticateViaSASLv1() error { handshakeErr := b.sendInternal(handshakeRequest, prom) if handshakeErr != nil { - Logger.Printf("Error while performing SASL handshake %s\n", b.addr) + Logger.Printf("Error while performing SASL handshake %s: %s\n", b.addr, handshakeErr) return handshakeErr } handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom, metricRegistry) if handshakeErr != nil { - Logger.Printf("Error while performing SASL handshake %s\n", b.addr) + Logger.Printf("Error while handling SASL handshake response %s: %s\n", b.addr, handshakeErr) return handshakeErr } @@ -1267,7 +1279,7 @@ func (b *Broker) authenticateViaSASLv1() error { } authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom, metricRegistry) if authErr != nil { - Logger.Printf("Error while performing SASL Auth %s\n", b.addr) + Logger.Printf("Error while performing SASL Auth %s: %s\n", b.addr, authErr) return nil, authErr } @@ -1385,7 +1397,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuthV0() error { if b.conf.Net.SASL.Handshake { handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version) if handshakeErr != nil { - Logger.Printf("Error while performing SASL handshake %s\n", b.addr) + Logger.Printf("Error while performing SASL handshake %s: %s\n", b.addr, handshakeErr) return handshakeErr } } @@ -1426,9 +1438,6 @@ func (b *Broker) sendAndReceiveSASLPlainAuthV0() error { func (b *Broker) sendAndReceiveSASLPlainAuthV1(authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error)) error { authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) _, err := authSendReceiver(authBytes) - if err != nil { - return err - } return err } diff --git a/vendor/github.com/IBM/sarama/client.go b/vendor/github.com/IBM/sarama/client.go index 2decba7c5..5c54b4461 100644 --- a/vendor/github.com/IBM/sarama/client.go +++ b/vendor/github.com/IBM/sarama/client.go @@ -363,34 +363,19 @@ func (client *client) MetadataTopics() ([]string, error) { } func (client *client) Partitions(topic string) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - partitions := client.cachedPartitions(topic, allPartitions) - - if len(partitions) == 0 { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - partitions = client.cachedPartitions(topic, allPartitions) - } - - // no partitions found after refresh metadata - if len(partitions) == 0 { - return nil, ErrUnknownTopicOrPartition - } - - return partitions, nil + return client.getPartitions(topic, allPartitions) } func (client *client) WritablePartitions(topic string) ([]int32, error) { + return client.getPartitions(topic, writablePartitions) +} + +func (client *client) getPartitions(topic string, pt partitionType) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient } - partitions := client.cachedPartitions(topic, writablePartitions) + partitions := client.cachedPartitions(topic, pt) // len==0 catches when it's nil (no such topic) and the odd case when every single // partition is undergoing leader election simultaneously. Callers have to be able to handle @@ -403,7 +388,7 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) { if err != nil { return nil, err } - partitions = client.cachedPartitions(topic, writablePartitions) + partitions = client.cachedPartitions(topic, pt) } if partitions == nil { @@ -414,56 +399,24 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) { } func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - metadata := client.cachedMetadata(topic, partitionID) - - if metadata == nil { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - metadata = client.cachedMetadata(topic, partitionID) - } - - if metadata == nil { - return nil, ErrUnknownTopicOrPartition - } - - if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.Replicas), metadata.Err - } - return dupInt32Slice(metadata.Replicas), nil + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.Replicas + }) } func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - metadata := client.cachedMetadata(topic, partitionID) - - if metadata == nil { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - metadata = client.cachedMetadata(topic, partitionID) - } - - if metadata == nil { - return nil, ErrUnknownTopicOrPartition - } - - if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.Isr), metadata.Err - } - return dupInt32Slice(metadata.Isr), nil + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.Isr + }) } func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) { + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.OfflineReplicas + }) +} + +func (client *client) getReplicas(topic string, partitionID int32, extractor func(metadata *PartitionMetadata) []int32) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient } @@ -482,10 +435,11 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, return nil, ErrUnknownTopicOrPartition } + replicas := extractor(metadata) if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.OfflineReplicas), metadata.Err + return dupInt32Slice(replicas), metadata.Err } - return dupInt32Slice(metadata.OfflineReplicas), nil + return dupInt32Slice(replicas), nil } func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { diff --git a/vendor/github.com/IBM/sarama/config.go b/vendor/github.com/IBM/sarama/config.go index f2f197887..8c7c4c985 100644 --- a/vendor/github.com/IBM/sarama/config.go +++ b/vendor/github.com/IBM/sarama/config.go @@ -269,6 +269,13 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // The limit is to prevent this buffer from overflowing or causing OOM. + // Defaults to 0 for unlimited. + // Any value between 0 and 4096 is pushed to 4096. + // A zero or negative value indicates unlimited. + MaxBufferLength int } // Interceptors to be called when the producer dispatcher reads the diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index 204768e32..a0e3d2e21 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -1,7 +1,7 @@ services: zookeeper-1: hostname: 'zookeeper-1' - image: 'docker.io/library/zookeeper:3.6.3' + image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always environment: @@ -14,7 +14,7 @@ services: ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-2: hostname: 'zookeeper-2' - image: 'docker.io/library/zookeeper:3.6.3' + image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always environment: @@ -27,7 +27,7 @@ services: ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-3: hostname: 'zookeeper-3' - image: 'docker.io/library/zookeeper:3.6.3' + image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always environment: @@ -40,19 +40,19 @@ services: ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: hostname: 'kafka-1' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.2}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-1:9091', ] @@ -67,7 +67,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' @@ -86,19 +86,19 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: hostname: 'kafka-2' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.2}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-2:9091', ] @@ -113,7 +113,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' @@ -132,19 +132,19 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: hostname: 'kafka-3' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.2}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-3:9091', ] @@ -159,7 +159,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' @@ -178,19 +178,19 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: hostname: 'kafka-4' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.2}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-4:9091', ] @@ -205,7 +205,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' @@ -224,19 +224,19 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: hostname: 'kafka-5' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.2}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-5:9091', ] @@ -251,7 +251,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.2} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' diff --git a/vendor/github.com/IBM/sarama/elect_leaders_request.go b/vendor/github.com/IBM/sarama/elect_leaders_request.go new file mode 100644 index 000000000..cd8d6a7f0 --- /dev/null +++ b/vendor/github.com/IBM/sarama/elect_leaders_request.go @@ -0,0 +1,134 @@ +package sarama + +type ElectLeadersRequest struct { + Version int16 + Type ElectionType + TopicPartitions map[string][]int32 + TimeoutMs int32 +} + +func (r *ElectLeadersRequest) encode(pe packetEncoder) error { + if r.Version > 0 { + pe.putInt8(int8(r.Type)) + } + + pe.putCompactArrayLength(len(r.TopicPartitions)) + + for topic, partitions := range r.TopicPartitions { + if r.Version < 2 { + if err := pe.putString(topic); err != nil { + return err + } + } else { + if err := pe.putCompactString(topic); err != nil { + return err + } + } + + if err := pe.putCompactInt32Array(partitions); err != nil { + return err + } + + if r.Version >= 2 { + pe.putEmptyTaggedFieldArray() + } + } + + pe.putInt32(r.TimeoutMs) + + if r.Version >= 2 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (r *ElectLeadersRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version > 0 { + t, err := pd.getInt8() + if err != nil { + return err + } + r.Type = ElectionType(t) + } + + topicCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if topicCount > 0 { + r.TopicPartitions = make(map[string][]int32) + for i := 0; i < topicCount; i++ { + var topic string + if r.Version < 2 { + topic, err = pd.getString() + } else { + topic, err = pd.getCompactString() + } + if err != nil { + return err + } + partitionCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + partitions := make([]int32, partitionCount) + for j := 0; j < partitionCount; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + partitions[j] = partition + } + r.TopicPartitions[topic] = partitions + if r.Version >= 2 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + } + + r.TimeoutMs, err = pd.getInt32() + if err != nil { + return err + } + + if r.Version >= 2 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + + return nil +} + +func (r *ElectLeadersRequest) key() int16 { + return 43 +} + +func (r *ElectLeadersRequest) version() int16 { + return r.Version +} + +func (r *ElectLeadersRequest) headerVersion() int16 { + return 2 +} + +func (r *ElectLeadersRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *ElectLeadersRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V2_4_0_0 + case 1: + return V0_11_0_0 + case 0: + return V0_10_0_0 + default: + return V2_4_0_0 + } +} diff --git a/vendor/github.com/IBM/sarama/elect_leaders_response.go b/vendor/github.com/IBM/sarama/elect_leaders_response.go new file mode 100644 index 000000000..8c85249ac --- /dev/null +++ b/vendor/github.com/IBM/sarama/elect_leaders_response.go @@ -0,0 +1,173 @@ +package sarama + +import "time" + +type PartitionResult struct { + ErrorCode KError + ErrorMessage *string +} + +func (b *PartitionResult) encode(pe packetEncoder, version int16) error { + pe.putInt16(int16(b.ErrorCode)) + if version < 2 { + if err := pe.putNullableString(b.ErrorMessage); err != nil { + return err + } + } else { + if err := pe.putNullableCompactString(b.ErrorMessage); err != nil { + return err + } + } + if version >= 2 { + pe.putEmptyTaggedFieldArray() + } + return nil +} + +func (b *PartitionResult) decode(pd packetDecoder, version int16) (err error) { + kerr, err := pd.getInt16() + if err != nil { + return err + } + b.ErrorCode = KError(kerr) + if version < 2 { + b.ErrorMessage, err = pd.getNullableString() + } else { + b.ErrorMessage, err = pd.getCompactNullableString() + } + if version >= 2 { + _, err = pd.getEmptyTaggedFieldArray() + } + return err +} + +type ElectLeadersResponse struct { + Version int16 + ThrottleTimeMs int32 + ErrorCode KError + ReplicaElectionResults map[string]map[int32]*PartitionResult +} + +func (r *ElectLeadersResponse) encode(pe packetEncoder) error { + pe.putInt32(r.ThrottleTimeMs) + + if r.Version > 0 { + pe.putInt16(int16(r.ErrorCode)) + } + + pe.putCompactArrayLength(len(r.ReplicaElectionResults)) + for topic, partitions := range r.ReplicaElectionResults { + if r.Version < 2 { + if err := pe.putString(topic); err != nil { + return err + } + } else { + if err := pe.putCompactString(topic); err != nil { + return err + } + } + pe.putCompactArrayLength(len(partitions)) + for partition, result := range partitions { + pe.putInt32(partition) + if err := result.encode(pe, r.Version); err != nil { + return err + } + } + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *ElectLeadersResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + if r.Version > 0 { + kerr, err := pd.getInt16() + if err != nil { + return err + } + r.ErrorCode = KError(kerr) + } + + numTopics, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + r.ReplicaElectionResults = make(map[string]map[int32]*PartitionResult, numTopics) + for i := 0; i < numTopics; i++ { + var topic string + if r.Version < 2 { + topic, err = pd.getString() + } else { + topic, err = pd.getCompactString() + } + if err != nil { + return err + } + + numPartitions, err := pd.getCompactArrayLength() + if err != nil { + return err + } + r.ReplicaElectionResults[topic] = make(map[int32]*PartitionResult, numPartitions) + for j := 0; j < numPartitions; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + result := new(PartitionResult) + if err := result.decode(pd, r.Version); err != nil { + return err + } + r.ReplicaElectionResults[topic][partition] = result + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return nil +} + +func (r *ElectLeadersResponse) key() int16 { + return 43 +} + +func (r *ElectLeadersResponse) version() int16 { + return r.Version +} + +func (r *ElectLeadersResponse) headerVersion() int16 { + return 1 +} + +func (r *ElectLeadersResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *ElectLeadersResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V2_4_0_0 + case 1: + return V0_11_0_0 + case 0: + return V0_10_0_0 + default: + return V2_4_0_0 + } +} + +func (r *ElectLeadersResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/vendor/github.com/IBM/sarama/election_type.go b/vendor/github.com/IBM/sarama/election_type.go new file mode 100644 index 000000000..01f3b65b3 --- /dev/null +++ b/vendor/github.com/IBM/sarama/election_type.go @@ -0,0 +1,10 @@ +package sarama + +type ElectionType int8 + +const ( + // PreferredElection constant type + PreferredElection ElectionType = 0 + // UncleanElection constant type + UncleanElection ElectionType = 1 +) diff --git a/vendor/github.com/IBM/sarama/entrypoint.sh b/vendor/github.com/IBM/sarama/entrypoint.sh index 9fe9a44b1..516a8dc38 100644 --- a/vendor/github.com/IBM/sarama/entrypoint.sh +++ b/vendor/github.com/IBM/sarama/entrypoint.sh @@ -3,7 +3,7 @@ set -eu set -o pipefail -KAFKA_VERSION="${KAFKA_VERSION:-3.6.0}" +KAFKA_VERSION="${KAFKA_VERSION:-3.6.2}" KAFKA_HOME="/opt/kafka-${KAFKA_VERSION}" if [ ! -d "${KAFKA_HOME}" ]; then diff --git a/vendor/github.com/IBM/sarama/mockresponses.go b/vendor/github.com/IBM/sarama/mockresponses.go index d09415b49..2c352797f 100644 --- a/vendor/github.com/IBM/sarama/mockresponses.go +++ b/vendor/github.com/IBM/sarama/mockresponses.go @@ -778,6 +778,28 @@ func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) return res } +type MockElectLeadersResponse struct { + t TestReporter +} + +func NewMockElectLeadersResponse(t TestReporter) *MockElectLeadersResponse { + return &MockElectLeadersResponse{t: t} +} + +func (mr *MockElectLeadersResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*ElectLeadersRequest) + res := &ElectLeadersResponse{Version: req.version(), ReplicaElectionResults: map[string]map[int32]*PartitionResult{}} + + for topic, partitions := range req.TopicPartitions { + for _, partition := range partitions { + res.ReplicaElectionResults[topic] = map[int32]*PartitionResult{ + partition: {ErrorCode: ErrNoError}, + } + } + } + return res +} + type MockDeleteRecordsResponse struct { t TestReporter } diff --git a/vendor/github.com/IBM/sarama/request.go b/vendor/github.com/IBM/sarama/request.go index e8e74ca34..8f0c2b579 100644 --- a/vendor/github.com/IBM/sarama/request.go +++ b/vendor/github.com/IBM/sarama/request.go @@ -194,7 +194,8 @@ func allocateBody(key, version int16) protocolBody { // 41: DescribeDelegationTokenRequest case 42: return &DeleteGroupsRequest{Version: version} - // 43: ElectLeadersRequest + case 43: + return &ElectLeadersRequest{Version: version} case 44: return &IncrementalAlterConfigsRequest{Version: version} case 45: diff --git a/vendor/github.com/IBM/sarama/utils.go b/vendor/github.com/IBM/sarama/utils.go index feadc0065..d5b77e0d9 100644 --- a/vendor/github.com/IBM/sarama/utils.go +++ b/vendor/github.com/IBM/sarama/utils.go @@ -44,11 +44,10 @@ func withRecover(fn func()) { } func safeAsyncClose(b *Broker) { - tmp := b // local var prevents clobbering in goroutine go withRecover(func() { - if connected, _ := tmp.Connected(); connected { - if err := tmp.Close(); err != nil { - Logger.Println("Error closing broker", tmp.ID(), ":", err) + if connected, _ := b.Connected(); connected { + if err := b.Close(); err != nil { + Logger.Println("Error closing broker", b.ID(), ":", err) } } }) @@ -198,7 +197,15 @@ var ( V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) + V3_5_2_0 = newKafkaVersion(3, 5, 2, 0) V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) + V3_6_1_0 = newKafkaVersion(3, 6, 1, 0) + V3_6_2_0 = newKafkaVersion(3, 6, 2, 0) + V3_7_0_0 = newKafkaVersion(3, 7, 0, 0) + V3_7_1_0 = newKafkaVersion(3, 7, 1, 0) + V3_8_0_0 = newKafkaVersion(3, 8, 0, 0) + V3_8_1_0 = newKafkaVersion(3, 8, 1, 0) + V3_9_0_0 = newKafkaVersion(3, 9, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -237,8 +244,10 @@ var ( V2_6_0_0, V2_6_1_0, V2_6_2_0, + V2_6_3_0, V2_7_0_0, V2_7_1_0, + V2_7_2_0, V2_8_0_0, V2_8_1_0, V2_8_2_0, @@ -259,10 +268,18 @@ var ( V3_4_1_0, V3_5_0_0, V3_5_1_0, + V3_5_2_0, V3_6_0_0, + V3_6_1_0, + V3_6_2_0, + V3_7_0_0, + V3_7_1_0, + V3_8_0_0, + V3_8_1_0, + V3_9_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_6_0_0 + MaxVersion = V3_9_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test @@ -274,11 +291,11 @@ var ( V2_0_1_0, V2_2_2_0, V2_4_1_0, - V2_6_2_0, + V2_6_3_0, V2_8_2_0, V3_1_2_0, V3_3_2_0, - V3_6_0_0, + V3_6_2_0, } ) diff --git a/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go b/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go index e96465460..04aaca848 100644 --- a/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go +++ b/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go @@ -246,7 +246,7 @@ func (b *FrameDataBlock) Compress(f *Frame, src []byte, level lz4block.Compressi b.src = src // keep track of the source for content checksum if f.Descriptor.Flags.BlockChecksum() { - b.Checksum = xxh32.ChecksumZero(src) + b.Checksum = xxh32.ChecksumZero(b.Data) } return b } @@ -328,7 +328,7 @@ func (b *FrameDataBlock) Uncompress(f *Frame, dst, dict []byte, sum bool) ([]byt dst = dst[:n] } if f.Descriptor.Flags.BlockChecksum() { - if c := xxh32.ChecksumZero(dst); c != b.Checksum { + if c := xxh32.ChecksumZero(b.data); c != b.Checksum { err := fmt.Errorf("%w: got %x; expected %x", lz4errors.ErrInvalidBlockChecksum, c, b.Checksum) return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 019373a3d..ec5ab4578 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -44,8 +44,8 @@ cloud.google.com/go/pubsub/pstest ## explicit; go 1.18 github.com/BurntSushi/toml github.com/BurntSushi/toml/internal -# github.com/IBM/sarama v1.43.3 -## explicit; go 1.19 +# github.com/IBM/sarama v1.44.0 +## explicit; go 1.20 github.com/IBM/sarama # github.com/NYTimes/gziphandler v1.1.1 ## explicit; go 1.11 @@ -254,7 +254,7 @@ github.com/munnerz/goautoneg ## explicit; go 1.13 # github.com/onsi/gomega v1.10.1 ## explicit -# github.com/pierrec/lz4/v4 v4.1.21 +# github.com/pierrec/lz4/v4 v4.1.22 ## explicit; go 1.14 github.com/pierrec/lz4/v4 github.com/pierrec/lz4/v4/internal/lz4block