Skip to content

Commit

Permalink
Merge pull request #178 from grepplabs/kafka-3.8
Browse files Browse the repository at this point in the history
kafka 3.8 / 3.9 support
  • Loading branch information
everesio authored Nov 15, 2024
2 parents 71d0761 + e62408c commit f8b1a88
Show file tree
Hide file tree
Showing 20 changed files with 87 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 as builder
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.all
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 as builder
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.3.11"
TAG ?= "v0.3.12"
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
GOARM ?= $(TARGETVARIANT)
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ As not every Kafka release adds new messages/versions which are relevant to the
| 0.2.9 | to 2.8.0 |
| 0.3.1 | to 3.4.0 |
| 0.3.11 | to 3.7.0 |
| 0.3.12 | to 3.9.0 |

### Install binary release

1. Download the latest release

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.11/kafka-proxy-v0.3.11-linux-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-linux-amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.11/kafka-proxy-v0.3.11-darwin-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-darwin-amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand All @@ -69,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k
You can launch a kafka-proxy container for trying it out with
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.11 \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12 \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand All @@ -88,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta
You can launch a kafka-proxy container with auth-ldap plugin for trying it out with
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.11-all \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12-all \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand Down
3 changes: 1 addition & 2 deletions cmd/plugin-auth-ldap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/ioutil"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -317,7 +316,7 @@ func getTlsConfig(caCertFile string, insecureSkipVerify bool) (*tls.Config, erro
if caCertFile == "" {
return &tls.Config{InsecureSkipVerify: insecureSkipVerify}, nil
} else {
certData, err := ioutil.ReadFile(caCertFile)
certData, err := os.ReadFile(caCertFile)
if err != nil {
return nil, errors.Wrapf(err, "reading certificate file %s", caCertFile)
}
Expand Down
4 changes: 2 additions & 2 deletions config/jaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
)
Expand All @@ -19,7 +19,7 @@ type JaasCredentials struct {
}

func NewJaasCredentialFromFile(filename string) (*JaasCredentials, error) {
bytes, err := ioutil.ReadFile(filename)
bytes, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions config/jaas_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package config

import (
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractsJaasCredentials(t *testing.T) {
Expand All @@ -30,7 +30,7 @@ func TestExtractsJaasCredentialsFromFile(t *testing.T) {
password="veyaiThai5que0ieb5le";
};
`
tmpFile, err := ioutil.TempFile("", "kafka-proxy-jaas-test")
tmpFile, err := os.CreateTemp("", "kafka-proxy-jaas-test")
assert.Nil(t, err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/googleid/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"encoding/json"
"fmt"
"golang.org/x/net/context/ctxhttp"
"io/ioutil"
"io"
"math/big"
"net/http"
"time"
Expand Down Expand Up @@ -59,7 +59,7 @@ func GetCerts(ctx context.Context) (*Certs, error) {
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/libs/googleid/service_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jws"
"google.golang.org/api/oauth2/v2"
"io/ioutil"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -32,7 +33,7 @@ type ServiceAccountTokenSource struct {
}

func NewServiceAccountTokenSource(credentialsFile string, targetAudience string) (*ServiceAccountTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,7 +131,7 @@ func doExchange(ctx context.Context, token string) ([]byte, error) {
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/libs/oidc-provider/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"time"
Expand Down Expand Up @@ -210,7 +209,7 @@ func getTokenResponse(token string, status int) (apis.TokenResponse, error) {
}

func getTokenSource(credentialsFilePath string, targetAud string) (idTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFilePath)
data, err := os.ReadFile(credentialsFilePath)

if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/oidc/password_grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"os"

"github.com/sirupsen/logrus"
"golang.org/x/oauth2"
Expand All @@ -28,7 +28,7 @@ type PasswordGrantTokenSource struct {
}

func NewPasswordGrantTokenSource(credentialsFile string, targetAudience string) (*PasswordGrantTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
source := &PasswordGrantTokenSource{}

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/oidc/service_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"os"

"github.com/sirupsen/logrus"
"golang.org/x/oauth2/clientcredentials"
Expand All @@ -26,7 +26,7 @@ type ServiceAccountTokenSource struct {
}

func NewServiceAccountTokenSource(credentialsFile string, targetAudience string) (*ServiceAccountTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
source := &ServiceAccountTokenSource{}

if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions pkg/libs/util/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package util
import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
Expand All @@ -16,18 +15,18 @@ func TestWatchRegularFileChange(t *testing.T) {

a := assert.New(t)

dirName, err := ioutil.TempDir("", "watcher-test-")
dirName, err := os.MkdirTemp("", "watcher-test-")
a.Nil(err)
defer os.Remove(dirName)

targetSecret, err := ioutil.TempFile(dirName, "secret-")
targetSecret, err := os.CreateTemp(dirName, "secret-")
a.Nil(err)
defer os.Remove(targetSecret.Name())

_, err = targetSecret.WriteString("secret1")
a.Nil(err)

data, err := ioutil.ReadFile(targetSecret.Name())
data, err := os.ReadFile(targetSecret.Name())
a.Nil(err)
a.Equal("secret1", string(data))

Expand Down Expand Up @@ -59,7 +58,7 @@ func TestWatchRegularFileChange(t *testing.T) {
opsFinal := atomic.LoadInt32(&ops)
a.Equal(int32(1), opsFinal)

data, err = ioutil.ReadFile(targetSecret.Name())
data, err = os.ReadFile(targetSecret.Name())
a.Nil(err)
a.Equal("secret1addition", string(data))
}
Expand All @@ -74,23 +73,23 @@ func TestWatchLinkedFileChange(t *testing.T) {
*/
a := assert.New(t)

dirName, err := ioutil.TempDir("", "watcher-test-")
dirName, err := os.MkdirTemp("", "watcher-test-")
a.Nil(err)
defer os.Remove(dirName)

dirTmp1, err := ioutil.TempDir(dirName, "tmp1-")
dirTmp1, err := os.MkdirTemp(dirName, "tmp1-")
a.Nil(err)
defer os.Remove(dirTmp1)

dirTmp2, err := ioutil.TempDir(dirName, "tmp2-")
dirTmp2, err := os.MkdirTemp(dirName, "tmp2-")
a.Nil(err)
defer os.Remove(dirTmp2)

targetSecret1, err := ioutil.TempFile(dirTmp1, "secret-")
targetSecret1, err := os.CreateTemp(dirTmp1, "secret-")
a.Nil(err)
defer os.Remove(targetSecret1.Name())

targetSecret2, err := ioutil.TempFile(dirTmp2, "secret-")
targetSecret2, err := os.CreateTemp(dirTmp2, "secret-")
a.Nil(err)
defer os.Remove(targetSecret2.Name())

Expand All @@ -110,7 +109,7 @@ func TestWatchLinkedFileChange(t *testing.T) {
a.Nil(err)
defer os.Remove(secretLink)

data, err := ioutil.ReadFile(secretLink)
data, err := os.ReadFile(secretLink)
a.Nil(err)
a.Equal("secret1", string(data))

Expand Down Expand Up @@ -145,7 +144,7 @@ func TestWatchLinkedFileChange(t *testing.T) {
opsFinal := atomic.LoadInt32(&ops)
a.Equal(int32(1), opsFinal)

data, err = ioutil.ReadFile(secretLink)
data, err = os.ReadFile(secretLink)
a.Nil(err)
a.Equal("secret2", string(data))
}
2 changes: 1 addition & 1 deletion proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
return false, nil, err
}

case 3, 4, 5, 6, 7, 8, 9, 10:
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
// CorrelationID + ClientID
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
return false, nil, err
Expand Down
26 changes: 26 additions & 0 deletions proxy/protocol/request_key_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,32 @@ func (r *RequestKeyVersion) ResponseHeaderVersion() int16 {
return 1
case 74: // ListClientMetricsResources
return 1
case 75: // DescribeTopicPartitions
return 1
case 76: // ShareGroupHeartbeat
return 1
case 77: // ShareGroupDescribe
return 1
case 78: // ShareFetch
return 1
case 79: // ShareAcknowledge
return 1
case 80: // AddRaftVoter
return 1
case 81: // RemoveRaftVoter
return 1
case 82: // UpdateRaftVoter
return 1
case 83: // InitializeShareGroupState
return 1
case 84: // ReadShareGroupState
return 1
case 85: // WriteShareGroupState
return 1
case 86: // DeleteShareGroupState
return 1
case 87: // ReadShareGroupStateSummary
return 1
default:
// throw new UnsupportedVersionException("Unsupported API key " + apiKey);
return -1
Expand Down
3 changes: 1 addition & 2 deletions proxy/protocol/request_produce_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protocol
import (
"encoding/binary"
"io"
"io/ioutil"
)

type RequestAcksReader struct {
Expand All @@ -18,7 +17,7 @@ func (r RequestAcksReader) readAndDiscardNullableString(reader io.Reader) (err e
return errInvalidStringLength
}
if length > 0 {
if _, err = io.CopyN(ioutil.Discard, reader, int64(length)); err != nil {
if _, err = io.CopyN(io.Discard, reader, int64(length)); err != nil {
return err
}
}
Expand Down
3 changes: 1 addition & 2 deletions proxy/protocol/response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/pkg/errors"
"io"
"io/ioutil"
)

type ResponseHeader struct {
Expand Down Expand Up @@ -78,7 +77,7 @@ func (r *ResponseHeaderTaggedFields) MaybeRead(reader io.Reader) ([]byte, error)
} else if size == 0 {
continue
} else {
if _, err := io.CopyN(ioutil.Discard, reader, int64(size)); err != nil {
if _, err := io.CopyN(io.Discard, reader, int64(size)); err != nil {
return nil, errors.Wrap(err, "error while reading tagged field data")
}
}
Expand Down
5 changes: 4 additions & 1 deletion proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
&CompactArray{Name: coordinatorsKeyName, Ty: findCoordinatorCoordinatorsSchema4},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4}
findCoordinatorResponseV5 := findCoordinatorResponseV4
findCoordinatorResponseV6 := findCoordinatorResponseV5

return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
Expand Down
Loading

0 comments on commit f8b1a88

Please sign in to comment.