diff --git a/go.mod b/go.mod index c922d62..9e94206 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( github.com/IBM/sarama v1.43.2 - github.com/Roshick/go-autumn-kafka v0.6.3 + github.com/Roshick/go-autumn-kafka v0.7.1 github.com/Roshick/go-autumn-synchronisation v0.6.4 github.com/StephanHCB/go-autumn-config-api v0.2.2 github.com/StephanHCB/go-autumn-config-env v0.2.3 diff --git a/go.sum b/go.sum index f50e787..a29a625 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0k github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/Roshick/go-autumn-kafka v0.6.3 h1:BA2vmOZOtlW0Im76VAFxDwsxGRdlj9EZPuKRZX1uIXk= github.com/Roshick/go-autumn-kafka v0.6.3/go.mod h1:Zfx0X2B856i8yr7A856bzGsOWiHDR0CrtG7VghZy/gU= +github.com/Roshick/go-autumn-kafka v0.7.1 h1:bemkEH0NH2Dph/UhWc6euY9fvEUUD48efpps77ROXco= +github.com/Roshick/go-autumn-kafka v0.7.1/go.mod h1:Zfx0X2B856i8yr7A856bzGsOWiHDR0CrtG7VghZy/gU= github.com/Roshick/go-autumn-synchronisation v0.6.4 h1:4OXyqJJrje3YgjvjJpDvooIHPsgrIAnlHODWhVGi6Do= github.com/Roshick/go-autumn-synchronisation v0.6.4/go.mod h1:Whsa6SW9DKh+gdI0LOgp/D4wYQVdQUoOuHqDFle1b1I= github.com/StephanHCB/go-autumn-acorn-registry v0.3.2 h1:eW+KndxdD8p+TdQ6HsqXLdqkowlcWb9qkVeUjdchAfA= diff --git a/internal/acorn/config/customconfigint.go b/internal/acorn/config/customconfigint.go index 675905a..748ef0e 100644 --- a/internal/acorn/config/customconfigint.go +++ b/internal/acorn/config/customconfigint.go @@ -1,10 +1,11 @@ package config import ( - "github.com/Interhyp/metadata-service/internal/types" - "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "regexp" + "github.com/Interhyp/metadata-service/internal/types" + "github.com/Roshick/go-autumn-kafka/pkg/kafka" + librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" ) @@ -63,7 +64,7 @@ type CustomConfiguration interface { AllowedFileCategories() []string - Kafka() *aukafka.Config + Kafka() *kafka.Config KafkaGroupIdOverride() string RedisUrl() string diff --git a/internal/repository/config/accessors.go b/internal/repository/config/accessors.go index f17a66b..19c1230 100644 --- a/internal/repository/config/accessors.go +++ b/internal/repository/config/accessors.go @@ -1,11 +1,12 @@ package config import ( - "github.com/Interhyp/metadata-service/internal/acorn/config" - "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "os" "regexp" "strings" + + "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Roshick/go-autumn-kafka/pkg/kafka" ) func (c *CustomConfigImpl) BasicAuthUsername() string { @@ -163,7 +164,7 @@ func (c *CustomConfigImpl) AllowedFileCategories() []string { return c.VAllowedFileCategories } -func (c *CustomConfigImpl) Kafka() *aukafka.Config { +func (c *CustomConfigImpl) Kafka() *kafka.Config { return c.VKafkaConfig } diff --git a/internal/repository/config/plumbing.go b/internal/repository/config/plumbing.go index e6fc5ff..12d2971 100644 --- a/internal/repository/config/plumbing.go +++ b/internal/repository/config/plumbing.go @@ -3,17 +3,18 @@ package config import ( "encoding/json" "fmt" + "regexp" + "strconv" + "strings" + "github.com/Interhyp/metadata-service/internal/acorn/config" openapi "github.com/Interhyp/metadata-service/internal/types" - "github.com/Roshick/go-autumn-kafka/pkg/aukafka" + "github.com/Roshick/go-autumn-kafka/pkg/kafka" auconfigapi "github.com/StephanHCB/go-autumn-config-api" auconfigenv "github.com/StephanHCB/go-autumn-config-env" librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" libconfig "github.com/StephanHCB/go-backend-service-common/repository/config" "github.com/StephanHCB/go-backend-service-common/repository/vault" - "regexp" - "strconv" - "strings" ) const ( @@ -66,13 +67,13 @@ type CustomConfigImpl struct { VPullRequestBuildUrl string VPullRequestBuildKey string - VKafkaConfig *aukafka.Config + VKafkaConfig *kafka.Config BitbucketGitUrlMatcher *regexp.Regexp } func New() (librepo.Configuration, config.CustomConfiguration) { instance := &CustomConfigImpl{ - VKafkaConfig: aukafka.NewConfig(), + VKafkaConfig: kafka.NewConfig(), BitbucketGitUrlMatcher: regexp.MustCompile(`/([^/]+)/([^/]+).git$`), } configItems := make([]auconfigapi.ConfigItem, 0) diff --git a/internal/repository/kafka/kafka.go b/internal/repository/kafka/kafka.go index d7c5638..9e1ae5e 100644 --- a/internal/repository/kafka/kafka.go +++ b/internal/repository/kafka/kafka.go @@ -4,16 +4,17 @@ import ( "context" "errors" "fmt" + "strings" + "time" + "github.com/IBM/sarama" "github.com/Interhyp/metadata-service/internal/acorn/config" "github.com/Interhyp/metadata-service/internal/acorn/repository" - "github.com/Roshick/go-autumn-kafka/pkg/aukafka" + "github.com/Roshick/go-autumn-kafka/pkg/kafka" aulogging "github.com/StephanHCB/go-autumn-logging" auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" "github.com/rcrowley/go-metrics" - "strings" - "time" ) import _ "github.com/go-git/go-git/v5" @@ -26,8 +27,8 @@ type Impl struct { HostIP repository.HostIP Callback repository.ReceiverCallback - KafkaProducer *aukafka.SyncProducer[repository.UpdateEvent] - KafkaConsumer *aukafka.Consumer[repository.UpdateEvent] + KafkaProducer *kafka.SyncProducer[repository.UpdateEvent] + KafkaConsumer *kafka.Consumer[repository.UpdateEvent] } func New( @@ -81,10 +82,19 @@ func (r *Impl) Send(ctx context.Context, event repository.UpdateEvent) error { return nil } - return r.KafkaProducer.Produce(ctx, nil, &event) + asyncCtx := context.WithoutCancel(ctx) + asyncCtx, cancel := context.WithTimeout(asyncCtx, 60*time.Second) + go func() { + defer cancel() + + if err := r.KafkaProducer.Produce(asyncCtx, nil, &event); err != nil { + aulogging.Logger.Ctx(asyncCtx).Warn().WithErr(err).Printf("failed to send event") + } + }() + return nil } -func (r *Impl) topicConfig(ctx context.Context) (*aukafka.TopicConfig, error) { +func (r *Impl) topicConfig(ctx context.Context) (*kafka.TopicConfig, error) { if r.CustomConfiguration.Kafka() != nil { if topicConfig, ok := r.CustomConfiguration.Kafka().TopicConfigs()[MetadataChangeEventsTopicKey]; ok { if topicConfig.Password == "" { @@ -143,7 +153,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error { configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.consumer.") configPreset.Consumer.Offsets.Initial = sarama.OffsetNewest - consumer, err := aukafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset) + consumer, err := kafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset) if err != nil { return err } @@ -167,7 +177,7 @@ func (r *Impl) ConnectProducer(ctx context.Context) error { configPreset.Producer.Compression = sarama.CompressionNone configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.producer.") - producer, err := aukafka.CreateSyncProducer[repository.UpdateEvent](ctx, *topicConfig, configPreset) + producer, err := kafka.CreateSyncProducer[repository.UpdateEvent](ctx, *topicConfig, configPreset) if err != nil { return err } diff --git a/test/mock/configmock/configmock.go b/test/mock/configmock/configmock.go index 59952f3..d684190 100644 --- a/test/mock/configmock/configmock.go +++ b/test/mock/configmock/configmock.go @@ -1,9 +1,10 @@ package configmock import ( - "github.com/Interhyp/metadata-service/internal/acorn/config" - "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "regexp" + + "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Roshick/go-autumn-kafka/pkg/kafka" ) type MockConfig struct { @@ -244,7 +245,7 @@ func (c *MockConfig) AllowedFileCategories() []string { panic("implement me") } -func (c *MockConfig) Kafka() *aukafka.Config { +func (c *MockConfig) Kafka() *kafka.Config { //TODO implement me panic("implement me") }