Skip to content

Commit

Permalink
Merge pull request #321 from Interhyp/async-kafka
Browse files Browse the repository at this point in the history
RELTEC-11165: Produce messages async
  • Loading branch information
Roshick authored Jul 26, 2024
2 parents f5835fb + 0eef0a7 commit d018920
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
github.com/IBM/sarama v1.43.2
github.com/Roshick/go-autumn-kafka v0.6.3
github.com/Roshick/go-autumn-kafka v0.7.1
github.com/Roshick/go-autumn-synchronisation v0.6.4
github.com/StephanHCB/go-autumn-config-api v0.2.2
github.com/StephanHCB/go-autumn-config-env v0.2.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0k
github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
github.com/Roshick/go-autumn-kafka v0.6.3 h1:BA2vmOZOtlW0Im76VAFxDwsxGRdlj9EZPuKRZX1uIXk=
github.com/Roshick/go-autumn-kafka v0.6.3/go.mod h1:Zfx0X2B856i8yr7A856bzGsOWiHDR0CrtG7VghZy/gU=
github.com/Roshick/go-autumn-kafka v0.7.1 h1:bemkEH0NH2Dph/UhWc6euY9fvEUUD48efpps77ROXco=
github.com/Roshick/go-autumn-kafka v0.7.1/go.mod h1:Zfx0X2B856i8yr7A856bzGsOWiHDR0CrtG7VghZy/gU=
github.com/Roshick/go-autumn-synchronisation v0.6.4 h1:4OXyqJJrje3YgjvjJpDvooIHPsgrIAnlHODWhVGi6Do=
github.com/Roshick/go-autumn-synchronisation v0.6.4/go.mod h1:Whsa6SW9DKh+gdI0LOgp/D4wYQVdQUoOuHqDFle1b1I=
github.com/StephanHCB/go-autumn-acorn-registry v0.3.2 h1:eW+KndxdD8p+TdQ6HsqXLdqkowlcWb9qkVeUjdchAfA=
Expand Down
7 changes: 4 additions & 3 deletions internal/acorn/config/customconfigint.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package config

import (
"github.com/Interhyp/metadata-service/internal/types"
"github.com/Roshick/go-autumn-kafka/pkg/aukafka"
"regexp"

"github.com/Interhyp/metadata-service/internal/types"
"github.com/Roshick/go-autumn-kafka/pkg/kafka"

librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository"
)

Expand Down Expand Up @@ -63,7 +64,7 @@ type CustomConfiguration interface {

AllowedFileCategories() []string

Kafka() *aukafka.Config
Kafka() *kafka.Config
KafkaGroupIdOverride() string

RedisUrl() string
Expand Down
7 changes: 4 additions & 3 deletions internal/repository/config/accessors.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package config

import (
"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Roshick/go-autumn-kafka/pkg/aukafka"
"os"
"regexp"
"strings"

"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Roshick/go-autumn-kafka/pkg/kafka"
)

func (c *CustomConfigImpl) BasicAuthUsername() string {
Expand Down Expand Up @@ -163,7 +164,7 @@ func (c *CustomConfigImpl) AllowedFileCategories() []string {
return c.VAllowedFileCategories
}

func (c *CustomConfigImpl) Kafka() *aukafka.Config {
func (c *CustomConfigImpl) Kafka() *kafka.Config {
return c.VKafkaConfig
}

Expand Down
13 changes: 7 additions & 6 deletions internal/repository/config/plumbing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package config
import (
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/Interhyp/metadata-service/internal/acorn/config"
openapi "github.com/Interhyp/metadata-service/internal/types"
"github.com/Roshick/go-autumn-kafka/pkg/aukafka"
"github.com/Roshick/go-autumn-kafka/pkg/kafka"
auconfigapi "github.com/StephanHCB/go-autumn-config-api"
auconfigenv "github.com/StephanHCB/go-autumn-config-env"
librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository"
libconfig "github.com/StephanHCB/go-backend-service-common/repository/config"
"github.com/StephanHCB/go-backend-service-common/repository/vault"
"regexp"
"strconv"
"strings"
)

const (
Expand Down Expand Up @@ -66,13 +67,13 @@ type CustomConfigImpl struct {
VPullRequestBuildUrl string
VPullRequestBuildKey string

VKafkaConfig *aukafka.Config
VKafkaConfig *kafka.Config
BitbucketGitUrlMatcher *regexp.Regexp
}

func New() (librepo.Configuration, config.CustomConfiguration) {
instance := &CustomConfigImpl{
VKafkaConfig: aukafka.NewConfig(),
VKafkaConfig: kafka.NewConfig(),
BitbucketGitUrlMatcher: regexp.MustCompile(`/([^/]+)/([^/]+).git$`),
}
configItems := make([]auconfigapi.ConfigItem, 0)
Expand Down
28 changes: 19 additions & 9 deletions internal/repository/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/IBM/sarama"
"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Interhyp/metadata-service/internal/acorn/repository"
"github.com/Roshick/go-autumn-kafka/pkg/aukafka"
"github.com/Roshick/go-autumn-kafka/pkg/kafka"
aulogging "github.com/StephanHCB/go-autumn-logging"
auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog"
librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository"
"github.com/rcrowley/go-metrics"
"strings"
"time"
)
import _ "github.com/go-git/go-git/v5"

Expand All @@ -26,8 +27,8 @@ type Impl struct {
HostIP repository.HostIP

Callback repository.ReceiverCallback
KafkaProducer *aukafka.SyncProducer[repository.UpdateEvent]
KafkaConsumer *aukafka.Consumer[repository.UpdateEvent]
KafkaProducer *kafka.SyncProducer[repository.UpdateEvent]
KafkaConsumer *kafka.Consumer[repository.UpdateEvent]
}

func New(
Expand Down Expand Up @@ -81,10 +82,19 @@ func (r *Impl) Send(ctx context.Context, event repository.UpdateEvent) error {
return nil
}

return r.KafkaProducer.Produce(ctx, nil, &event)
asyncCtx := context.WithoutCancel(ctx)
asyncCtx, cancel := context.WithTimeout(asyncCtx, 60*time.Second)
go func() {
defer cancel()

if err := r.KafkaProducer.Produce(asyncCtx, nil, &event); err != nil {
aulogging.Logger.Ctx(asyncCtx).Warn().WithErr(err).Printf("failed to send event")
}
}()
return nil
}

func (r *Impl) topicConfig(ctx context.Context) (*aukafka.TopicConfig, error) {
func (r *Impl) topicConfig(ctx context.Context) (*kafka.TopicConfig, error) {
if r.CustomConfiguration.Kafka() != nil {
if topicConfig, ok := r.CustomConfiguration.Kafka().TopicConfigs()[MetadataChangeEventsTopicKey]; ok {
if topicConfig.Password == "" {
Expand Down Expand Up @@ -143,7 +153,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error {
configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.consumer.")
configPreset.Consumer.Offsets.Initial = sarama.OffsetNewest

consumer, err := aukafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset)
consumer, err := kafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset)
if err != nil {
return err
}
Expand All @@ -167,7 +177,7 @@ func (r *Impl) ConnectProducer(ctx context.Context) error {
configPreset.Producer.Compression = sarama.CompressionNone
configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.producer.")

producer, err := aukafka.CreateSyncProducer[repository.UpdateEvent](ctx, *topicConfig, configPreset)
producer, err := kafka.CreateSyncProducer[repository.UpdateEvent](ctx, *topicConfig, configPreset)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions test/mock/configmock/configmock.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package configmock

import (
"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Roshick/go-autumn-kafka/pkg/aukafka"
"regexp"

"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Roshick/go-autumn-kafka/pkg/kafka"
)

type MockConfig struct {
Expand Down Expand Up @@ -244,7 +245,7 @@ func (c *MockConfig) AllowedFileCategories() []string {
panic("implement me")
}

func (c *MockConfig) Kafka() *aukafka.Config {
func (c *MockConfig) Kafka() *kafka.Config {
//TODO implement me
panic("implement me")
}
Expand Down

0 comments on commit d018920

Please sign in to comment.