Skip to content

Commit

Permalink
Added kafka connect exponential backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Mar 6, 2024
1 parent e6bf94c commit 6e10ba9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
31 changes: 20 additions & 11 deletions coverage.out
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ github.com/ShatteredRealms/go-backend/pkg/helpers/config.go:45.9,45.23 1 12
github.com/ShatteredRealms/go-backend/pkg/helpers/config.go:45.23,47.4 1 6
github.com/ShatteredRealms/go-backend/pkg/helpers/config.go:49.3,49.54 1 12
github.com/ShatteredRealms/go-backend/pkg/helpers/conversions.go:3.48,5.28 2 1
github.com/ShatteredRealms/go-backend/pkg/helpers/conversions.go:5.28,7.3 1 44
github.com/ShatteredRealms/go-backend/pkg/helpers/conversions.go:5.28,7.3 1 49
github.com/ShatteredRealms/go-backend/pkg/helpers/conversions.go:9.2,9.13 1 1
github.com/ShatteredRealms/go-backend/pkg/helpers/errors.go:8.63,9.16 1 4
github.com/ShatteredRealms/go-backend/pkg/helpers/errors.go:9.16,11.3 1 0
Expand Down Expand Up @@ -94,9 +94,9 @@ github.com/ShatteredRealms/go-backend/pkg/helpers/srv.go:144.2,144.18 1 2
github.com/ShatteredRealms/go-backend/pkg/helpers/srv.go:144.18,146.3 1 1
github.com/ShatteredRealms/go-backend/pkg/helpers/srv.go:148.2,148.28 1 1
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:9.59,11.39 2 3
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:11.39,13.17 2 28
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:11.39,13.17 2 30
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:13.17,15.4 1 2
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:16.3,16.17 1 26
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:16.3,16.17 1 28
github.com/ShatteredRealms/go-backend/pkg/helpers/uuid.go:19.2,19.17 1 1
github.com/ShatteredRealms/go-backend/pkg/model/character.go:49.38,50.43 1 10
github.com/ShatteredRealms/go-backend/pkg/model/character.go:50.43,52.3 1 1
Expand Down Expand Up @@ -320,14 +320,23 @@ github.com/ShatteredRealms/go-backend/pkg/repository/inventory_r.go:30.16,32.3 1
github.com/ShatteredRealms/go-backend/pkg/repository/inventory_r.go:34.2,34.23 1 4
github.com/ShatteredRealms/go-backend/pkg/repository/inventory_r.go:38.111,41.2 2 5
github.com/ShatteredRealms/go-backend/pkg/repository/inventory_r.go:43.71,45.2 1 11
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:17.70,18.24 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:18.24,20.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:21.2,21.27 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:21.27,23.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:24.2,27.16 3 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:27.16,29.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:31.2,32.16 2 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:19.70,20.24 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:20.24,22.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:23.2,23.27 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:23.27,25.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:26.2,28.27 2 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:28.27,31.3 2 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:32.2,32.16 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:32.16,34.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:36.2,37.16 2 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:37.16,39.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:41.2,41.28 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:41.2,41.27 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:41.27,44.3 2 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:45.2,45.16 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:45.16,47.3 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:49.2,49.28 1 1
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:52.35,56.46 4 2
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:56.46,57.39 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:57.39,59.4 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:61.3,61.13 1 0
github.com/ShatteredRealms/go-backend/pkg/repository/kafka.go:64.2,64.12 1 2
27 changes: 25 additions & 2 deletions pkg/repository/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"net"
"strconv"
"time"

"github.com/ShatteredRealms/go-backend/pkg/config"
"github.com/cenkalti/backoff/v4"
"github.com/segmentio/kafka-go"
)

Expand All @@ -23,7 +25,10 @@ func ConnectKafka(address config.ServerAddress) (*kafka.Conn, error) {
}
var err error

currentConn, err = kafka.Dial("tcp", address.Address())
err = retry(func() error {
currentConn, err = kafka.Dial("tcp", address.Address())
return err
})
if err != nil {
return nil, fmt.Errorf("kafka connect: %v", err)
}
Expand All @@ -33,10 +38,28 @@ func ConnectKafka(address config.ServerAddress) (*kafka.Conn, error) {
return nil, fmt.Errorf("controller: %v", err)
}

controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
err = retry(func() error {
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
return err
})
if err != nil {
return nil, fmt.Errorf("kafka controller connection: %v", err)
}

return controllerConn, nil
}

func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = time.Second * 5
bo.MaxElapsedTime = time.Minute
if err := backoff.Retry(op, bo); err != nil {
if bo.NextBackOff() == backoff.Stop {
return fmt.Errorf("reached retry deadline")
}

return err
}

return nil
}

0 comments on commit 6e10ba9

Please sign in to comment.