diff --git a/conf/config-dev.yaml b/conf/config-dev.yaml index c7c17e6a..3c7f5cc5 100644 --- a/conf/config-dev.yaml +++ b/conf/config-dev.yaml @@ -43,12 +43,14 @@ cache: threshold: 0.2 workqueue: - # the workqueue type available: redis, kafka, database - type: database + # the workqueue type available: redis, kafka, database, inmemory + type: inmemory redis: concurrency: 10 kafka: {} database: {} + inmemory: + concurrency: 1024 locker: # the locker type available: redis, database diff --git a/conf/config-full.yaml b/conf/config-full.yaml index a95eb8ea..ec4bcfab 100644 --- a/conf/config-full.yaml +++ b/conf/config-full.yaml @@ -46,12 +46,14 @@ cache: threshold: 0.2 workqueue: - # the workqueue type available: redis, kafka, database + # the workqueue type available: redis, kafka, database, inmemory type: redis redis: concurrency: 10 kafka: {} database: {} + inmemory: + concurrency: 1024 locker: # the locker type available: redis, database diff --git a/conf/config.yaml b/conf/config.yaml index fcb27448..20edf292 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -23,10 +23,6 @@ database: dbname: sigma sslmode: disable -# deploy available: single, replica -# replica should use external redis -deploy: single - redis: # redis type available: none, external # none: means never use redis @@ -47,12 +43,14 @@ cache: threshold: 0.2 workqueue: - # the workqueue type available: redis, kafka, database - type: redis + # the workqueue type available: redis, kafka, database, inmemory + type: database redis: concurrency: 10 kafka: {} database: {} + inmemory: + concurrency: 1024 locker: # the locker type available: redis, database diff --git a/docs/docs/configuration.mdx b/docs/docs/configuration.mdx index 4c8702fa..f6bad0c5 100644 --- a/docs/docs/configuration.mdx +++ b/docs/docs/configuration.mdx @@ -46,12 +46,14 @@ cache: threshold: 0.2 workqueue: - # the workqueue type available: redis, kafka, database + # the workqueue type available: redis, kafka, database, inmemory type: redis redis: concurrency: 10 kafka: {} database: {} + inmemory: + concurrency: 1024 locker: # the locker type available: redis, database diff --git a/pkg/cmds/server/server.go b/pkg/cmds/server/server.go index 79a8c3eb..427d7b11 100644 --- a/pkg/cmds/server/server.go +++ b/pkg/cmds/server/server.go @@ -126,11 +126,7 @@ func Serve(serverConfig ServerConfig) error { if err != nil { return err } - err = workq.Initialize(configs.Configuration{ - WorkQueue: configs.ConfigurationWorkQueue{ - Type: enums.WorkQueueTypeDatabase, - }, - }) + err = workq.Initialize(config) if err != nil { return err } diff --git a/pkg/cmds/worker/worker.go b/pkg/cmds/worker/worker.go index 9d80bfb9..b4a45c32 100644 --- a/pkg/cmds/worker/worker.go +++ b/pkg/cmds/worker/worker.go @@ -43,11 +43,7 @@ func Worker() error { return err } - err = workq.Initialize(configs.Configuration{ - WorkQueue: configs.ConfigurationWorkQueue{ - Type: enums.WorkQueueTypeDatabase, - }, - }) + err = workq.Initialize(config) if err != nil { return err } diff --git a/pkg/configs/configuration.go b/pkg/configs/configuration.go index 7b2fa773..2f7f4b67 100644 --- a/pkg/configs/configuration.go +++ b/pkg/configs/configuration.go @@ -135,12 +135,17 @@ type ConfigurationWorkQueueDatabase struct { type ConfigurationWorkQueueKafka struct { } +type ConfigurationWorkQueueInmemmory struct { + Concurrency int `yaml:"concurrency"` +} + // ConfigurationWorkQueue ... type ConfigurationWorkQueue struct { - Type enums.WorkQueueType `yaml:"type"` - Redis ConfigurationWorkQueueRedis `yaml:"redis"` - Database ConfigurationWorkQueueDatabase `yaml:"database"` - Kafka ConfigurationWorkQueueKafka `yaml:"kafka"` + Type enums.WorkQueueType `yaml:"type"` + Redis ConfigurationWorkQueueRedis `yaml:"redis"` + Database ConfigurationWorkQueueDatabase `yaml:"database"` + Kafka ConfigurationWorkQueueKafka `yaml:"kafka"` + Inmemory ConfigurationWorkQueueInmemmory `yaml:"inmemory"` } // ConfigurationLocker ... diff --git a/pkg/configs/default.go b/pkg/configs/default.go index d797cabf..dd92fdde 100644 --- a/pkg/configs/default.go +++ b/pkg/configs/default.go @@ -54,6 +54,9 @@ func defaultSettings() { if configuration.Daemon.Builder.Podman.URI == "" { configuration.Daemon.Builder.Podman.URI = "unix:///run/podman/podman.sock" } + if configuration.WorkQueue.Inmemory.Concurrency == 0 { + configuration.WorkQueue.Inmemory.Concurrency = 1024 + } if configuration.Cache.Inmemory.Size == 0 { configuration.Cache.Inmemory.Size = 10240 } diff --git a/pkg/cronjob/builder/builder.go b/pkg/cronjob/builder/builder.go index 8c7791fc..22325e92 100644 --- a/pkg/cronjob/builder/builder.go +++ b/pkg/cronjob/builder/builder.go @@ -122,7 +122,7 @@ func (r builderRunner) runner(ctx context.Context, tw timewheel.TimeWheel) { return err } - err = workq.ProducerClient.Produce(ctx, string(enums.DaemonBuilder), types.DaemonBuilderPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{ Action: enums.DaemonBuilderActionStart, BuilderID: builderObj.ID, RunnerID: runner.ID, diff --git a/pkg/daemon/builder/builder.go b/pkg/daemon/builder/builder.go index 7e0cdaa1..24bf3cb8 100644 --- a/pkg/daemon/builder/builder.go +++ b/pkg/daemon/builder/builder.go @@ -35,7 +35,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonBuilder.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonBuilder] = definition.Consumer{ Handler: builderRunner, MaxRetry: 1, Concurrency: 10, diff --git a/pkg/daemon/coderepo/coderepo.go b/pkg/daemon/coderepo/coderepo.go index f9a105d2..22cc7c47 100644 --- a/pkg/daemon/coderepo/coderepo.go +++ b/pkg/daemon/coderepo/coderepo.go @@ -35,7 +35,7 @@ const ( ) func init() { - workq.TopicHandlers[enums.DaemonCodeRepository.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonCodeRepository] = definition.Consumer{ Handler: crRunner, MaxRetry: 6, Concurrency: 10, diff --git a/pkg/daemon/gc/decorator.go b/pkg/daemon/gc/decorator.go index eb1ac7c5..efdabda7 100644 --- a/pkg/daemon/gc/decorator.go +++ b/pkg/daemon/gc/decorator.go @@ -262,7 +262,7 @@ func initGc(ctx context.Context, daemon enums.Daemon, runnerChan chan decoratorS } func triggerWebhook(ctx context.Context, webhook decoratorWebhook, producerClient definition.WorkQueueProducer) error { - err := producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err := producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: webhook.NamespaceID, Action: webhook.Meta.Action, Type: enums.WebhookTypeSend, diff --git a/pkg/daemon/gc/gc_artifact.go b/pkg/daemon/gc/gc_artifact.go index 0a245695..aa2cb599 100644 --- a/pkg/daemon/gc/gc_artifact.go +++ b/pkg/daemon/gc/gc_artifact.go @@ -37,7 +37,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonGcArtifact.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonGcArtifact] = definition.Consumer{ Handler: decorator(enums.DaemonGcArtifact), MaxRetry: 6, Concurrency: 10, diff --git a/pkg/daemon/gc/gc_blob.go b/pkg/daemon/gc/gc_blob.go index bf42fea0..e5a80092 100644 --- a/pkg/daemon/gc/gc_blob.go +++ b/pkg/daemon/gc/gc_blob.go @@ -38,7 +38,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonGcBlob.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonGcBlob] = definition.Consumer{ Handler: decorator(enums.DaemonGcBlob), MaxRetry: 6, Concurrency: 10, diff --git a/pkg/daemon/gc/gc_repository.go b/pkg/daemon/gc/gc_repository.go index 64b2d03c..4cc5b735 100644 --- a/pkg/daemon/gc/gc_repository.go +++ b/pkg/daemon/gc/gc_repository.go @@ -36,7 +36,7 @@ import ( // deleteRepositoryWithNamespace -> deleteRepositoryCheckEmpty -> deleteRepository -> collectRecord func init() { - workq.TopicHandlers[enums.DaemonGcRepository.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonGcRepository] = definition.Consumer{ Handler: decorator(enums.DaemonGcRepository), MaxRetry: 6, Concurrency: 10, diff --git a/pkg/daemon/gc/gc_tag.go b/pkg/daemon/gc/gc_tag.go index 6730b80b..9e7ceba8 100644 --- a/pkg/daemon/gc/gc_tag.go +++ b/pkg/daemon/gc/gc_tag.go @@ -37,7 +37,7 @@ import ( // deleteTagWithNamespace -> deleteTagWithRepository -> deleteTagCheckPattern -> deleteTag -> collectRecord func init() { - workq.TopicHandlers[enums.DaemonGcTag.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonGcTag] = definition.Consumer{ Handler: decorator(enums.DaemonGcTag), MaxRetry: 6, Concurrency: 10, diff --git a/pkg/daemon/pushed/artifact.go b/pkg/daemon/pushed/artifact.go index d4ef88fe..89747de4 100644 --- a/pkg/daemon/pushed/artifact.go +++ b/pkg/daemon/pushed/artifact.go @@ -31,7 +31,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonArtifactPushed.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonArtifactPushed] = definition.Consumer{ Handler: func(ctx context.Context, data []byte) error { var payload types.DaemonArtifactPushedPayload err := json.Unmarshal(data, &payload) diff --git a/pkg/daemon/pushed/tag.go b/pkg/daemon/pushed/tag.go index 01344aa8..864e65f2 100644 --- a/pkg/daemon/pushed/tag.go +++ b/pkg/daemon/pushed/tag.go @@ -33,7 +33,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonTagPushed.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonTagPushed] = definition.Consumer{ Handler: func(ctx context.Context, data []byte) error { var payload types.DaemonTagPushedPayload err := json.Unmarshal(data, &payload) diff --git a/pkg/daemon/scan/sbom.go b/pkg/daemon/scan/sbom.go index e71d3bb1..88d5ebc9 100644 --- a/pkg/daemon/scan/sbom.go +++ b/pkg/daemon/scan/sbom.go @@ -43,7 +43,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonSbom.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonSbom] = definition.Consumer{ Handler: decorator(runnerSbom), MaxRetry: 1, Concurrency: 10, diff --git a/pkg/daemon/scan/vulnerability.go b/pkg/daemon/scan/vulnerability.go index 9e5cd48b..ae9125c2 100644 --- a/pkg/daemon/scan/vulnerability.go +++ b/pkg/daemon/scan/vulnerability.go @@ -42,7 +42,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonVulnerability.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonVulnerability] = definition.Consumer{ Handler: decorator(runnerVulnerability), MaxRetry: 1, Concurrency: 1, diff --git a/pkg/daemon/webhook/webhook.go b/pkg/daemon/webhook/webhook.go index 66fb2785..eee5d437 100644 --- a/pkg/daemon/webhook/webhook.go +++ b/pkg/daemon/webhook/webhook.go @@ -43,7 +43,7 @@ import ( ) func init() { - workq.TopicHandlers[enums.DaemonWebhook.String()] = definition.Consumer{ + workq.TopicHandlers[enums.DaemonWebhook] = definition.Consumer{ Handler: webhookRunner, MaxRetry: 6, Concurrency: 10, diff --git a/pkg/dal/dao/repository.go b/pkg/dal/dao/repository.go index 02c2d0a3..cba28a31 100644 --- a/pkg/dal/dao/repository.go +++ b/pkg/dal/dao/repository.go @@ -157,7 +157,7 @@ func (s *repositoryService) Create(ctx context.Context, repositoryObj *models.Re } if autoCreateNamespace.ProducerClient != nil { - err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: ptr.Of(namespaceObj.ID), Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeNamespace, @@ -192,7 +192,7 @@ func (s *repositoryService) Create(ctx context.Context, repositoryObj *models.Re return err } if autoCreateNamespace.ProducerClient != nil { - err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: ptr.Of(namespaceObj.ID), Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeRepository, diff --git a/pkg/dal/dao/workq_test.go b/pkg/dal/dao/workq_test.go index fcb2a0bb..5184cdc7 100644 --- a/pkg/dal/dao/workq_test.go +++ b/pkg/dal/dao/workq_test.go @@ -62,7 +62,7 @@ func TestWorkQueueService(t *testing.T) { assert.NotNil(t, workqService) workqObj := &models.WorkQueue{ - Topic: "topic", + Topic: enums.DaemonGc, Payload: []byte("payload"), Version: "version", } @@ -72,7 +72,7 @@ func TestWorkQueueService(t *testing.T) { err = workqService.UpdateStatus(ctx, workqObj.ID, "version", "newVersion", 1, enums.TaskCommonStatusPending) assert.NoError(t, err) - workqNewObj, err := workqService.Get(ctx, "topic") + workqNewObj, err := workqService.Get(ctx, enums.DaemonGc.String()) assert.NoError(t, err) assert.Equal(t, workqObj.ID, workqNewObj.ID) assert.Equal(t, workqObj.Topic, workqNewObj.Topic) diff --git a/pkg/dal/models/workq.go b/pkg/dal/models/workq.go index 4ccd7a4a..f9442a39 100644 --- a/pkg/dal/models/workq.go +++ b/pkg/dal/models/workq.go @@ -27,7 +27,7 @@ type WorkQueue struct { DeletedAt soft_delete.DeletedAt `gorm:"softDelete:milli"` ID int64 `gorm:"primaryKey"` - Topic string + Topic enums.Daemon Payload []byte Times int Version string diff --git a/pkg/handlers/builders/builders_runners_rerun.go b/pkg/handlers/builders/builders_runners_rerun.go index 78de8f13..d60943e2 100644 --- a/pkg/handlers/builders/builders_runners_rerun.go +++ b/pkg/handlers/builders/builders_runners_rerun.go @@ -86,7 +86,7 @@ func (h *handler) GetRunnerRerun(c echo.Context) error { log.Error().Err(err).Msg("Create builder runner failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create builder runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{ Action: enums.DaemonBuilderActionStart, RepositoryID: req.RepositoryID, BuilderID: req.BuilderID, diff --git a/pkg/handlers/builders/builders_runners_run.go b/pkg/handlers/builders/builders_runners_run.go index 612584e8..85525097 100644 --- a/pkg/handlers/builders/builders_runners_run.go +++ b/pkg/handlers/builders/builders_runners_run.go @@ -72,7 +72,7 @@ func (h *handler) PostRunnerRun(c echo.Context) error { log.Error().Err(err).Msg("Create builder runner failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create builder runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{ Action: enums.DaemonBuilderActionStart, RepositoryID: req.RepositoryID, BuilderID: req.BuilderID, diff --git a/pkg/handlers/builders/builders_runners_stop.go b/pkg/handlers/builders/builders_runners_stop.go index 23894599..5e76bf8b 100644 --- a/pkg/handlers/builders/builders_runners_stop.go +++ b/pkg/handlers/builders/builders_runners_stop.go @@ -78,7 +78,7 @@ func (h *handler) GetRunnerStop(c echo.Context) error { log.Error().Err(err).Msg("Update runner status failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update runner status failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{ Action: enums.DaemonBuilderActionStop, RepositoryID: req.RepositoryID, BuilderID: req.BuilderID, diff --git a/pkg/handlers/coderepos/coderepos_resync.go b/pkg/handlers/coderepos/coderepos_resync.go index a8cac116..10188591 100644 --- a/pkg/handlers/coderepos/coderepos_resync.go +++ b/pkg/handlers/coderepos/coderepos_resync.go @@ -91,7 +91,7 @@ func (h *handler) Resync(c echo.Context) error { if err != nil { return xerrors.HTTPErrCodeInternalError.Detail("Update user status failed") } - err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository, types.DaemonCodeRepositoryPayload{User3rdPartyID: user3rdPartyObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Int64("user_id", user3rdPartyObj.UserID).Msg("Publish sync code repository failed") diff --git a/pkg/handlers/daemons/daemons_gc_artifact.go b/pkg/handlers/daemons/daemons_gc_artifact.go index c4d77a01..3c34ae6f 100644 --- a/pkg/handlers/daemons/daemons_gc_artifact.go +++ b/pkg/handlers/daemons/daemons_gc_artifact.go @@ -108,7 +108,7 @@ func (h *handler) UpdateGcArtifactRule(c echo.Context) error { log.Error().Err(err).Msg("Update gc artifact rule failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc artifact rule failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionUpdate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcArtifactRule, @@ -299,13 +299,13 @@ func (h *handler) CreateGcArtifactRunner(c echo.Context) error { log.Error().Int64("ruleID", ruleObj.ID).Msgf("Create gc artifact runner failed: %v", err) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc artifact runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonGcArtifact.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonGcArtifact, types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcArtifact.String()) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcArtifact.String())) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcArtifactRunner, diff --git a/pkg/handlers/daemons/daemons_gc_blob.go b/pkg/handlers/daemons/daemons_gc_blob.go index f3d7da4a..6ecf3872 100644 --- a/pkg/handlers/daemons/daemons_gc_blob.go +++ b/pkg/handlers/daemons/daemons_gc_blob.go @@ -109,7 +109,7 @@ func (h *handler) UpdateGcBlobRule(c echo.Context) error { log.Error().Err(err).Msg("Update gc blob rule failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc blob rule failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ Action: enums.WebhookActionUpdate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcBlobRule, Payload: utils.MustMarshal(req), @@ -305,13 +305,13 @@ func (h *handler) CreateGcBlobRunner(c echo.Context) error { log.Error().Int64("RuleID", ruleObj.ID).Msgf("Create gc blob runner failed: %v", err) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc blob runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonGcBlob.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonGcBlob, types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcBlob.String()) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcBlob.String())) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcBlobRunner, Payload: utils.MustMarshal(req), diff --git a/pkg/handlers/daemons/daemons_gc_repository.go b/pkg/handlers/daemons/daemons_gc_repository.go index b20f0277..4d92c3d8 100644 --- a/pkg/handlers/daemons/daemons_gc_repository.go +++ b/pkg/handlers/daemons/daemons_gc_repository.go @@ -108,7 +108,7 @@ func (h *handler) UpdateGcRepositoryRule(c echo.Context) error { log.Error().Err(err).Msg("Update gc repository rule failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc repository rule failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionUpdate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcRepositoryRule, @@ -299,13 +299,13 @@ func (h *handler) CreateGcRepositoryRunner(c echo.Context) error { log.Error().Int64("ruleID", ruleObj.ID).Msgf("Create gc repository runner failed: %v", err) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc repository runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonGcRepository.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonGcRepository, types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcRepository.String()) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcRepository.String())) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcRepositoryRunner, diff --git a/pkg/handlers/daemons/daemons_gc_tags.go b/pkg/handlers/daemons/daemons_gc_tags.go index bd2baed9..1db541d7 100644 --- a/pkg/handlers/daemons/daemons_gc_tags.go +++ b/pkg/handlers/daemons/daemons_gc_tags.go @@ -116,7 +116,7 @@ func (h *handler) UpdateGcTagRule(c echo.Context) error { log.Error().Err(err).Msg("Update gc tag rule failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc tag rule failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionUpdate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcTagRule, @@ -309,13 +309,13 @@ func (h *handler) CreateGcTagRunner(c echo.Context) error { log.Error().Int64("RuleID", ruleObj.ID).Msgf("Create gc tag runner failed: %v", err) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc tag runner failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonGcTag.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonGcTag, types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcTag.String()) return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcTag.String())) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: namespaceID, Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeDaemonTaskGcTagRule, diff --git a/pkg/handlers/distribution/manifest/manifest_put.go b/pkg/handlers/distribution/manifest/manifest_put.go index de5c0d12..d5b7561f 100644 --- a/pkg/handlers/distribution/manifest/manifest_put.go +++ b/pkg/handlers/distribution/manifest/manifest_put.go @@ -267,7 +267,7 @@ func (h *handler) putManifestManifest(ctx context.Context, user *models.User, di return xerrors.DSErrCodeUnknown } if workq.ProducerClient != nil { // TODO: init in test - err = workq.ProducerClient.Produce(ctx, enums.DaemonTagPushed.String(), types.DaemonTagPushedPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonTagPushed, types.DaemonTagPushedPayload{ RepositoryID: repositoryObj.ID, Tag: refs.Tag, }, definition.ProducerOption{Tx: tx}) @@ -278,7 +278,7 @@ func (h *handler) putManifestManifest(ctx context.Context, user *models.User, di } } if workq.ProducerClient != nil { - err = workq.ProducerClient.Produce(ctx, enums.DaemonArtifactPushed.String(), types.DaemonArtifactPushedPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonArtifactPushed, types.DaemonArtifactPushedPayload{ RepositoryID: repositoryObj.ID, }, definition.ProducerOption{Tx: tx}) if err != nil { @@ -365,7 +365,7 @@ func (h *handler) putManifestIndex(ctx context.Context, user *models.User, diges return xerrors.DSErrCodeUnknown } } - err = workq.ProducerClient.Produce(ctx, enums.DaemonTagPushed.String(), types.DaemonTagPushedPayload{ + err = workq.ProducerClient.Produce(ctx, enums.DaemonTagPushed, types.DaemonTagPushedPayload{ RepositoryID: repositoryObj.ID, Tag: refs.Tag, }, definition.ProducerOption{Tx: tx}) @@ -400,7 +400,7 @@ func (h *handler) putManifestAsyncTaskSbom(ctx context.Context, artifactObj *mod taskSbomPayload := types.TaskSbom{ ArtifactID: artifactObj.ID, } - err = workq.ProducerClient.Produce(ctx, enums.DaemonSbom.String(), taskSbomPayload, definition.ProducerOption{}) + err = workq.ProducerClient.Produce(ctx, enums.DaemonSbom, taskSbomPayload, definition.ProducerOption{}) if err != nil { log.Error().Err(err).Interface("artifactObj", artifactObj).Msg("Enqueue task failed") return @@ -421,7 +421,7 @@ func (h *handler) putManifestAsyncTaskVulnerability(ctx context.Context, artifac taskVulnerabilityPayload := types.TaskVulnerability{ ArtifactID: artifactObj.ID, } - err = workq.ProducerClient.Produce(ctx, enums.DaemonVulnerability.String(), taskVulnerabilityPayload, definition.ProducerOption{}) + err = workq.ProducerClient.Produce(ctx, enums.DaemonVulnerability, taskVulnerabilityPayload, definition.ProducerOption{}) if err != nil { log.Error().Err(err).Interface("artifactObj", artifactObj).Msg("Enqueue task failed") return diff --git a/pkg/handlers/namespaces/namespaces_create.go b/pkg/handlers/namespaces/namespaces_create.go index 8c75c72c..2e0f754a 100644 --- a/pkg/handlers/namespaces/namespaces_create.go +++ b/pkg/handlers/namespaces/namespaces_create.go @@ -120,7 +120,7 @@ func (h *handler) PostNamespace(c echo.Context) error { log.Error().Err(err).Msg("Create audit failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create audit failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: ptr.Of(namespaceObj.ID), Action: enums.WebhookActionCreate, ResourceType: enums.WebhookResourceTypeNamespace, diff --git a/pkg/handlers/namespaces/namespaces_create_test.go b/pkg/handlers/namespaces/namespaces_create_test.go index c1b56ec6..677906a1 100644 --- a/pkg/handlers/namespaces/namespaces_create_test.go +++ b/pkg/handlers/namespaces/namespaces_create_test.go @@ -37,6 +37,7 @@ import ( "github.com/go-sigma/sigma/pkg/modules/workq/definition" workqmocks "github.com/go-sigma/sigma/pkg/modules/workq/definition/mocks" "github.com/go-sigma/sigma/pkg/tests" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils/ptr" "github.com/go-sigma/sigma/pkg/validators" ) @@ -57,7 +58,7 @@ func TestPostNamespace(t *testing.T) { defer ctrl.Finish() workQueueProducer := workqmocks.NewMockWorkQueueProducer(ctrl) - workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { + workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { return nil }).Times(3) diff --git a/pkg/handlers/namespaces/namespaces_delete.go b/pkg/handlers/namespaces/namespaces_delete.go index 365d6fa6..270d0c69 100644 --- a/pkg/handlers/namespaces/namespaces_delete.go +++ b/pkg/handlers/namespaces/namespaces_delete.go @@ -110,7 +110,7 @@ func (h *handler) DeleteNamespace(c echo.Context) error { log.Error().Err(err).Msg("Create audit for delete namespace failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create audit for delete namespace failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: ptr.Of(namespaceObj.ID), Action: enums.WebhookActionDelete, ResourceType: enums.WebhookResourceTypeNamespace, diff --git a/pkg/handlers/namespaces/namespaces_delete_test.go b/pkg/handlers/namespaces/namespaces_delete_test.go index fd86f1fb..3091bb43 100644 --- a/pkg/handlers/namespaces/namespaces_delete_test.go +++ b/pkg/handlers/namespaces/namespaces_delete_test.go @@ -62,7 +62,7 @@ func TestDeleteNamespace(t *testing.T) { defer ctrl.Finish() workQueueProducer := workqmocks.NewMockWorkQueueProducer(ctrl) - workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { + workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { return nil }).Times(2) diff --git a/pkg/handlers/namespaces/namespaces_get_test.go b/pkg/handlers/namespaces/namespaces_get_test.go index 24ab0913..219cf7a0 100644 --- a/pkg/handlers/namespaces/namespaces_get_test.go +++ b/pkg/handlers/namespaces/namespaces_get_test.go @@ -38,6 +38,7 @@ import ( "github.com/go-sigma/sigma/pkg/modules/workq/definition" workqmocks "github.com/go-sigma/sigma/pkg/modules/workq/definition/mocks" "github.com/go-sigma/sigma/pkg/tests" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils/ptr" "github.com/go-sigma/sigma/pkg/validators" ) @@ -59,7 +60,7 @@ func TestGetNamespace(t *testing.T) { defer ctrl.Finish() workQueueProducer := workqmocks.NewMockWorkQueueProducer(ctrl) - workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { + workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { return nil }).Times(1) diff --git a/pkg/handlers/namespaces/namespaces_list_test.go b/pkg/handlers/namespaces/namespaces_list_test.go index a8cb14db..3af3eaaa 100644 --- a/pkg/handlers/namespaces/namespaces_list_test.go +++ b/pkg/handlers/namespaces/namespaces_list_test.go @@ -40,6 +40,7 @@ import ( workqmocks "github.com/go-sigma/sigma/pkg/modules/workq/definition/mocks" "github.com/go-sigma/sigma/pkg/tests" "github.com/go-sigma/sigma/pkg/types" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils/ptr" "github.com/go-sigma/sigma/pkg/validators" ) @@ -61,7 +62,7 @@ func TestListNamespace(t *testing.T) { defer ctrl.Finish() workQueueProducer := workqmocks.NewMockWorkQueueProducer(ctrl) - workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { + workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { return nil }).Times(1) diff --git a/pkg/handlers/namespaces/namespaces_update.go b/pkg/handlers/namespaces/namespaces_update.go index f234db73..00297331 100644 --- a/pkg/handlers/namespaces/namespaces_update.go +++ b/pkg/handlers/namespaces/namespaces_update.go @@ -137,7 +137,7 @@ func (h *handler) PutNamespace(c echo.Context) error { log.Error().Err(err).Msg("Create audit for update namespace failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create audit for update namespace failed: %v", err)) } - err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: ptr.Of(namespaceObj.ID), Action: enums.WebhookActionUpdate, ResourceType: enums.WebhookResourceTypeNamespace, diff --git a/pkg/handlers/namespaces/namespaces_update_test.go b/pkg/handlers/namespaces/namespaces_update_test.go index 1ee4d113..a2936615 100644 --- a/pkg/handlers/namespaces/namespaces_update_test.go +++ b/pkg/handlers/namespaces/namespaces_update_test.go @@ -62,7 +62,7 @@ func TestPutNamespace(t *testing.T) { defer ctrl.Finish() workQueueProducer := workqmocks.NewMockWorkQueueProducer(ctrl) - workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { + workQueueProducer.EXPECT().Produce(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { return nil }).Times(3) diff --git a/pkg/handlers/oauth2/oauth2_callback.go b/pkg/handlers/oauth2/oauth2_callback.go index eb4cd199..f9b1d091 100644 --- a/pkg/handlers/oauth2/oauth2_callback.go +++ b/pkg/handlers/oauth2/oauth2_callback.go @@ -228,7 +228,7 @@ func (h *handler) Callback(c echo.Context) error { log.Error().Err(err).Msg("Create user failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create user failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository, types.DaemonCodeRepositoryPayload{User3rdPartyID: user3rdPartyObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Int64("user_id", user3rdPartyObj.UserID).Msg("Publish sync code repository failed") @@ -255,7 +255,7 @@ func (h *handler) Callback(c echo.Context) error { log.Error().Err(err).Msg("Create user failed") return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create user failed: %v", err)) } - err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository.String(), + err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository, types.DaemonCodeRepositoryPayload{User3rdPartyID: user3rdPartyObj.ID}, definition.ProducerOption{Tx: tx}) if err != nil { log.Error().Err(err).Int64("user_id", user3rdPartyObj.UserID).Msg("Publish sync code repository failed") diff --git a/pkg/handlers/webhooks/webhooks_log_resend.go b/pkg/handlers/webhooks/webhooks_log_resend.go index c00cbac6..d277ca57 100644 --- a/pkg/handlers/webhooks/webhooks_log_resend.go +++ b/pkg/handlers/webhooks/webhooks_log_resend.go @@ -102,7 +102,7 @@ func (h *handler) GetWebhookLogResend(c echo.Context) error { } err = query.Q.Transaction(func(tx *query.Query) error { - err := h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err := h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: webhookLogObj.Webhook.NamespaceID, WebhookID: webhookLogObj.Webhook.ID, WebhookLogID: ptr.Of(req.WebhookLogID), diff --git a/pkg/handlers/webhooks/webhooks_ping.go b/pkg/handlers/webhooks/webhooks_ping.go index 46e0dfb2..e38ba23d 100644 --- a/pkg/handlers/webhooks/webhooks_ping.go +++ b/pkg/handlers/webhooks/webhooks_ping.go @@ -101,7 +101,7 @@ func (h *handler) GetWebhookPing(c echo.Context) error { } err = query.Q.Transaction(func(tx *query.Query) error { - err := h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{ + err := h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{ NamespaceID: webhookObj.NamespaceID, WebhookID: webhookObj.ID, Type: enums.WebhookTypePing, diff --git a/pkg/modules/workq/database/consumer.go b/pkg/modules/workq/database/consumer.go index 59f3c989..eafb94b8 100644 --- a/pkg/modules/workq/database/consumer.go +++ b/pkg/modules/workq/database/consumer.go @@ -30,9 +30,9 @@ import ( ) // NewWorkQueueConsumer ... -func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[string]definition.Consumer) error { +func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[enums.Daemon]definition.Consumer) error { for topic, c := range topicHandlers { - go func(consumer definition.Consumer, topic string) { + go func(consumer definition.Consumer, topic enums.Daemon) { handler := &consumerHandler{ processingSemaphore: make(chan struct{}, consumer.Concurrency), consumer: consumer, @@ -48,7 +48,7 @@ type consumerHandler struct { consumer definition.Consumer } -func (h *consumerHandler) Consume(topic string) { +func (h *consumerHandler) Consume(topic enums.Daemon) { for { h.processingSemaphore <- struct{}{} go func() { @@ -61,14 +61,14 @@ func (h *consumerHandler) Consume(topic string) { } } -func (h *consumerHandler) consume(topic string) error { +func (h *consumerHandler) consume(topic enums.Daemon) error { defer func() { <-h.processingSemaphore }() workQueueService := dao.NewWorkQueueServiceFactory().New() // daoCtx := log.Logger.WithContext(context.Background()) daoCtx := context.Background() - wq, err := workQueueService.Get(daoCtx, topic) + wq, err := workQueueService.Get(daoCtx, topic.String()) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { log.Trace().Err(err).Msgf("None task in topic(%s)", topic) @@ -90,7 +90,7 @@ func (h *consumerHandler) consume(topic string) error { err = h.consumer.Handler(ctx, wq.Payload) wq.Times++ if err != nil { - log.Error().Err(err).Str("Topic", topic).Int64("WorkQueueID", wq.ID).Msg("Daemon task run failed") + log.Error().Err(err).Str("Topic", topic.String()).Int64("WorkQueueID", wq.ID).Msg("Daemon task run failed") if wq.Times < h.consumer.MaxRetry { return workQueueService.UpdateStatus(daoCtx, wq.ID, newVersion, uuid.New().String(), wq.Times, enums.TaskCommonStatusPending) } diff --git a/pkg/modules/workq/database/producer.go b/pkg/modules/workq/database/producer.go index dd8f2670..ef8cbf42 100644 --- a/pkg/modules/workq/database/producer.go +++ b/pkg/modules/workq/database/producer.go @@ -24,6 +24,7 @@ import ( "github.com/go-sigma/sigma/pkg/dal/models" "github.com/go-sigma/sigma/pkg/dal/query" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils" ) @@ -32,7 +33,7 @@ type producer struct { } // NewWorkQueueProducer ... -func NewWorkQueueProducer(_ configs.Configuration, _ map[string]definition.Consumer) (definition.WorkQueueProducer, error) { +func NewWorkQueueProducer(_ configs.Configuration, _ map[enums.Daemon]definition.Consumer) (definition.WorkQueueProducer, error) { p := &producer{ workQueueServiceFactory: dao.NewWorkQueueServiceFactory(), } @@ -40,7 +41,7 @@ func NewWorkQueueProducer(_ configs.Configuration, _ map[string]definition.Consu } // Produce ... -func (p *producer) Produce(ctx context.Context, topic string, payload any, option definition.ProducerOption) error { +func (p *producer) Produce(ctx context.Context, topic enums.Daemon, payload any, option definition.ProducerOption) error { tx := query.Q if option.Tx != nil { tx = option.Tx diff --git a/pkg/modules/workq/definition/definition.go b/pkg/modules/workq/definition/definition.go index 8bf76db7..86ecd39a 100644 --- a/pkg/modules/workq/definition/definition.go +++ b/pkg/modules/workq/definition/definition.go @@ -19,6 +19,7 @@ import ( "time" "github.com/go-sigma/sigma/pkg/dal/query" + "github.com/go-sigma/sigma/pkg/types/enums" ) //go:generate mockgen -destination=mocks/workq.go -package=mocks github.com/go-sigma/sigma/pkg/modules/workq/definition WorkQueueProducer @@ -39,5 +40,5 @@ type ProducerOption struct { // WorkQueueProducer ... type WorkQueueProducer interface { // Produce ... - Produce(ctx context.Context, topic string, payload any, option ProducerOption) error + Produce(ctx context.Context, topic enums.Daemon, payload any, option ProducerOption) error } diff --git a/pkg/modules/workq/definition/mocks/workq.go b/pkg/modules/workq/definition/mocks/workq.go index 2ca95b4f..7ccb37cc 100644 --- a/pkg/modules/workq/definition/mocks/workq.go +++ b/pkg/modules/workq/definition/mocks/workq.go @@ -14,6 +14,7 @@ import ( reflect "reflect" definition "github.com/go-sigma/sigma/pkg/modules/workq/definition" + enums "github.com/go-sigma/sigma/pkg/types/enums" gomock "go.uber.org/mock/gomock" ) @@ -41,7 +42,7 @@ func (m *MockWorkQueueProducer) EXPECT() *MockWorkQueueProducerMockRecorder { } // Produce mocks base method. -func (m *MockWorkQueueProducer) Produce(arg0 context.Context, arg1 string, arg2 any, arg3 definition.ProducerOption) error { +func (m *MockWorkQueueProducer) Produce(arg0 context.Context, arg1 enums.Daemon, arg2 any, arg3 definition.ProducerOption) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Produce", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) diff --git a/pkg/modules/workq/inmemory/consumer.go b/pkg/modules/workq/inmemory/consumer.go new file mode 100644 index 00000000..fcc96d85 --- /dev/null +++ b/pkg/modules/workq/inmemory/consumer.go @@ -0,0 +1,86 @@ +// Copyright 2023 sigma +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inmemory + +import ( + "context" + + "github.com/rs/zerolog/log" + + "github.com/go-sigma/sigma/pkg/configs" + "github.com/go-sigma/sigma/pkg/dal/models" + "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" +) + +// This is only for small-scale deployment, a message queue with 1024 messages should suffice, and it can be adjusted appropriately if necessary. +var packs = make(map[enums.Daemon]chan *models.WorkQueue, 10) + +// NewWorkQueueConsumer ... +func NewWorkQueueConsumer(config configs.Configuration, topicHandlers map[enums.Daemon]definition.Consumer) error { + for topic, c := range topicHandlers { + packs[topic] = make(chan *models.WorkQueue, config.WorkQueue.Inmemory.Concurrency) + go func(consumer definition.Consumer, topic enums.Daemon) { + handler := &consumerHandler{ + processingSemaphore: make(chan struct{}, consumer.Concurrency), + consumer: consumer, + } + handler.Consume(topic) + }(c, topic) + } + return nil +} + +type consumerHandler struct { + processingSemaphore chan struct{} + consumer definition.Consumer +} + +func (h *consumerHandler) Consume(topic enums.Daemon) { + for { + h.processingSemaphore <- struct{}{} + go func() { + err := h.consume(topic) + if err != nil { + log.Error().Err(err).Msg("Consume topic failed") + } + }() + } +} + +func (h *consumerHandler) consume(topic enums.Daemon) error { // nolint: unparam + defer func() { + <-h.processingSemaphore + }() + wq := <-packs[topic] + ctx := context.Background() + if h.consumer.Timeout != 0 { + var ctxCancel context.CancelFunc + ctx, ctxCancel = context.WithTimeout(ctx, h.consumer.Timeout) + defer ctxCancel() + } + err := h.consumer.Handler(ctx, wq.Payload) + wq.Times++ + if err != nil { + log.Error().Err(err).Str("Topic", topic.String()).Msg("Daemon task run failed") + if wq.Times < h.consumer.MaxRetry { + packs[topic] <- wq + return nil + } + log.Error().Err(err).Str("Topic", topic.String()).Msg("Daemon task run failed and reach max retry") + return nil + } + return nil +} diff --git a/pkg/modules/workq/inmemory/consumer_test.go b/pkg/modules/workq/inmemory/consumer_test.go new file mode 100644 index 00000000..c0800723 --- /dev/null +++ b/pkg/modules/workq/inmemory/consumer_test.go @@ -0,0 +1,77 @@ +// Copyright 2024 sigma +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inmemory + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/go-sigma/sigma/pkg/configs" + "github.com/go-sigma/sigma/pkg/dal/models" + "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" +) + +func TestConsumer(t *testing.T) { + var times int + var topicHandlers = map[enums.Daemon]definition.Consumer{ + enums.DaemonBuilder: { + Handler: func(ctx context.Context, payload []byte) error { + times++ + return nil + }, + Concurrency: 3, + MaxRetry: 3, + Timeout: time.Second * 3, + }, + } + err := NewWorkQueueConsumer(configs.Configuration{}, topicHandlers) + assert.NoError(t, err) + + packs[enums.DaemonBuilder] <- &models.WorkQueue{Topic: enums.DaemonBuilder, Payload: []byte{}} + <-time.After(time.Second) + assert.Equal(t, 1, times) +} + +func TestConsumerWithTimeout(t *testing.T) { + var times int + var topicHandlers = map[enums.Daemon]definition.Consumer{ + enums.DaemonBuilder: { + Handler: func(ctx context.Context, payload []byte) error { + <-time.After(time.Second * 2) + select { + case <-ctx.Done(): + times++ + return fmt.Errorf("operation timeout") + default: + } + return nil + }, + Concurrency: 3, + MaxRetry: 3, + Timeout: time.Second, + }, + } + err := NewWorkQueueConsumer(configs.Configuration{}, topicHandlers) + assert.NoError(t, err) + + packs[enums.DaemonBuilder] <- &models.WorkQueue{Topic: enums.DaemonBuilder, Payload: []byte{}} + <-time.After(time.Second * 10) + assert.Equal(t, 3, times) +} diff --git a/pkg/modules/workq/inmemory/producer.go b/pkg/modules/workq/inmemory/producer.go new file mode 100644 index 00000000..680a8e29 --- /dev/null +++ b/pkg/modules/workq/inmemory/producer.go @@ -0,0 +1,43 @@ +// Copyright 2024 sigma +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inmemory + +import ( + "context" + + "github.com/go-sigma/sigma/pkg/configs" + "github.com/go-sigma/sigma/pkg/dal/models" + "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" + "github.com/go-sigma/sigma/pkg/utils" +) + +type producer struct{} + +// NewWorkQueueProducer ... +func NewWorkQueueProducer(_ configs.Configuration, _ map[enums.Daemon]definition.Consumer) (definition.WorkQueueProducer, error) { + p := &producer{} + return p, nil +} + +// Produce ... +func (p *producer) Produce(ctx context.Context, topic enums.Daemon, payload any, _ definition.ProducerOption) error { + wq := &models.WorkQueue{ + Topic: topic, + Payload: utils.MustMarshal(payload), + } + packs[topic] <- wq + return nil +} diff --git a/pkg/modules/workq/inmemory/producer_test.go b/pkg/modules/workq/inmemory/producer_test.go new file mode 100644 index 00000000..00c04619 --- /dev/null +++ b/pkg/modules/workq/inmemory/producer_test.go @@ -0,0 +1,37 @@ +// Copyright 2024 sigma +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inmemory + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/go-sigma/sigma/pkg/configs" + "github.com/go-sigma/sigma/pkg/dal/models" + "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" +) + +func TestProducer(t *testing.T) { + producer, err := NewWorkQueueProducer(configs.Configuration{}, nil) + assert.NoError(t, err) + assert.NotNil(t, producer) + + packs[enums.DaemonBuilder] = make(chan *models.WorkQueue, 10) + err = producer.Produce(context.Background(), enums.DaemonBuilder, "test", definition.ProducerOption{}) + assert.NoError(t, err) +} diff --git a/pkg/modules/workq/kafka/consumer.go b/pkg/modules/workq/kafka/consumer.go index 8a254d1c..f11bec22 100644 --- a/pkg/modules/workq/kafka/consumer.go +++ b/pkg/modules/workq/kafka/consumer.go @@ -26,6 +26,7 @@ import ( "github.com/go-sigma/sigma/pkg/configs" "github.com/go-sigma/sigma/pkg/consts" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils" ) @@ -96,7 +97,7 @@ type MessageWrapper struct { } // NewWorkQueueConsumer ... -func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[string]definition.Consumer) error { +func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[enums.Daemon]definition.Consumer) error { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll @@ -120,16 +121,16 @@ func NewWorkQueueConsumer(_ configs.Configuration, topicHandlers map[string]defi if err != nil { return err } - go func(consumer definition.Consumer, topic string) { + go func(consumer definition.Consumer, topic enums.Daemon) { for { handler := &ConsumerGroupHandler{ processingSemaphore: make(chan struct{}, consumer.Concurrency), consumer: consumer, producer: producer, } - err := consumerGroup.Consume(context.Background(), []string{topic}, handler) + err := consumerGroup.Consume(context.Background(), []string{topic.String()}, handler) if err != nil { - log.Error().Err(err).Str("topic", topic).Msg("Consume topics failed") + log.Error().Err(err).Str("topic", topic.String()).Msg("Consume topics failed") return } } diff --git a/pkg/modules/workq/kafka/producer.go b/pkg/modules/workq/kafka/producer.go index 105abd5e..32a974af 100644 --- a/pkg/modules/workq/kafka/producer.go +++ b/pkg/modules/workq/kafka/producer.go @@ -22,11 +22,12 @@ import ( "github.com/go-sigma/sigma/pkg/configs" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils" ) // NewWorkQueueProducer ... -func NewWorkQueueProducer(_ configs.Configuration, _ map[string]definition.Consumer) (definition.WorkQueueProducer, error) { +func NewWorkQueueProducer(_ configs.Configuration, _ map[enums.Daemon]definition.Consumer) (definition.WorkQueueProducer, error) { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll @@ -50,13 +51,13 @@ type producer struct { producer sarama.SyncProducer } -func (p *producer) Produce(_ context.Context, topic string, payload any, _ definition.ProducerOption) error { +func (p *producer) Produce(_ context.Context, topic enums.Daemon, payload any, _ definition.ProducerOption) error { message := MessageWrapper{ Times: 0, Payload: utils.MustMarshal(payload), } _, _, err := p.producer.SendMessage(&sarama.ProducerMessage{ - Topic: topic, + Topic: topic.String(), Value: sarama.ByteEncoder(utils.MustMarshal(message)), }) return err diff --git a/pkg/modules/workq/redis/consumer.go b/pkg/modules/workq/redis/consumer.go index ade18f77..d11d84f8 100644 --- a/pkg/modules/workq/redis/consumer.go +++ b/pkg/modules/workq/redis/consumer.go @@ -24,10 +24,11 @@ import ( "github.com/go-sigma/sigma/pkg/configs" "github.com/go-sigma/sigma/pkg/logger" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" ) // NewWorkQueueConsumer ... -func NewWorkQueueConsumer(config configs.Configuration, topicHandlers map[string]definition.Consumer) error { +func NewWorkQueueConsumer(config configs.Configuration, topicHandlers map[enums.Daemon]definition.Consumer) error { redisOpt, err := asynq.ParseRedisURI(config.Redis.Url) if err != nil { return fmt.Errorf("asynq.ParseRedisURI error: %v", err) @@ -41,7 +42,7 @@ func NewWorkQueueConsumer(config configs.Configuration, topicHandlers map[string ) mux := asynq.NewServeMux() for topic, handler := range topicHandlers { - mux.HandleFunc(topic, func(consumer definition.Consumer) func(context.Context, *asynq.Task) error { + mux.HandleFunc(topic.String(), func(consumer definition.Consumer) func(context.Context, *asynq.Task) error { return func(ctx context.Context, task *asynq.Task) error { return consumer.Handler(ctx, task.Payload()) } diff --git a/pkg/modules/workq/redis/producer.go b/pkg/modules/workq/redis/producer.go index 6ba2c99f..01a5061a 100644 --- a/pkg/modules/workq/redis/producer.go +++ b/pkg/modules/workq/redis/producer.go @@ -23,16 +23,17 @@ import ( "github.com/go-sigma/sigma/pkg/configs" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/types/enums" "github.com/go-sigma/sigma/pkg/utils" ) type producer struct { client *asynq.Client - topicHandlers map[string]definition.Consumer + topicHandlers map[enums.Daemon]definition.Consumer } // NewWorkQueueProducer ... -func NewWorkQueueProducer(config configs.Configuration, topicHandlers map[string]definition.Consumer) (definition.WorkQueueProducer, error) { +func NewWorkQueueProducer(config configs.Configuration, topicHandlers map[enums.Daemon]definition.Consumer) (definition.WorkQueueProducer, error) { redisOpt, err := asynq.ParseRedisURI(config.Redis.Url) if err != nil { return nil, fmt.Errorf("asynq.ParseRedisURI error: %v", err) @@ -45,7 +46,7 @@ func NewWorkQueueProducer(config configs.Configuration, topicHandlers map[string } // Produce ... -func (p *producer) Produce(ctx context.Context, topic string, payload any, _ definition.ProducerOption) error { +func (p *producer) Produce(ctx context.Context, topic enums.Daemon, payload any, _ definition.ProducerOption) error { consumer, ok := p.topicHandlers[topic] if !ok { return fmt.Errorf("Topic %s not registered", topic) @@ -61,6 +62,6 @@ func (p *producer) Produce(ctx context.Context, topic string, payload any, _ def } else { opts = append(opts, asynq.Timeout(time.Hour)) } - _, err := p.client.Enqueue(asynq.NewTask(topic, utils.MustMarshal(payload)), opts...) + _, err := p.client.Enqueue(asynq.NewTask(topic.String(), utils.MustMarshal(payload)), opts...) return err } diff --git a/pkg/modules/workq/workq.go b/pkg/modules/workq/workq.go index 61f62759..8795b68c 100644 --- a/pkg/modules/workq/workq.go +++ b/pkg/modules/workq/workq.go @@ -20,6 +20,7 @@ import ( "github.com/go-sigma/sigma/pkg/configs" "github.com/go-sigma/sigma/pkg/modules/workq/database" "github.com/go-sigma/sigma/pkg/modules/workq/definition" + "github.com/go-sigma/sigma/pkg/modules/workq/inmemory" "github.com/go-sigma/sigma/pkg/modules/workq/kafka" "github.com/go-sigma/sigma/pkg/modules/workq/redis" "github.com/go-sigma/sigma/pkg/types/enums" @@ -31,21 +32,24 @@ type Message struct { Payload []byte } -var TopicHandlers = make(map[string]definition.Consumer) +var TopicHandlers = make(map[enums.Daemon]definition.Consumer) // ProducerClient ... var ProducerClient definition.WorkQueueProducer // Initialize ... func Initialize(config configs.Configuration) error { + fmt.Println(42, config.WorkQueue.Type) var err error switch config.WorkQueue.Type { case enums.WorkQueueTypeDatabase: - ProducerClient, err = database.NewWorkQueueProducer(config, TopicHandlers) + err = database.NewWorkQueueConsumer(config, TopicHandlers) case enums.WorkQueueTypeKafka: - ProducerClient, err = kafka.NewWorkQueueProducer(config, TopicHandlers) + err = kafka.NewWorkQueueConsumer(config, TopicHandlers) case enums.WorkQueueTypeRedis: - ProducerClient, err = redis.NewWorkQueueProducer(config, TopicHandlers) + err = redis.NewWorkQueueConsumer(config, TopicHandlers) + case enums.WorkQueueTypeInmemory: + err = inmemory.NewWorkQueueConsumer(config, TopicHandlers) default: return fmt.Errorf("Workq %s not support", config.WorkQueue.Type.String()) } @@ -54,11 +58,13 @@ func Initialize(config configs.Configuration) error { } switch config.WorkQueue.Type { case enums.WorkQueueTypeDatabase: - err = database.NewWorkQueueConsumer(config, TopicHandlers) + ProducerClient, err = database.NewWorkQueueProducer(config, TopicHandlers) case enums.WorkQueueTypeKafka: - err = kafka.NewWorkQueueConsumer(config, TopicHandlers) + ProducerClient, err = kafka.NewWorkQueueProducer(config, TopicHandlers) case enums.WorkQueueTypeRedis: - err = redis.NewWorkQueueConsumer(config, TopicHandlers) + ProducerClient, err = redis.NewWorkQueueProducer(config, TopicHandlers) + case enums.WorkQueueTypeInmemory: + ProducerClient, err = inmemory.NewWorkQueueProducer(config, TopicHandlers) default: return fmt.Errorf("Workq %s not support", config.WorkQueue.Type.String()) }