Skip to content

Commit

Permalink
✨ Add workqueue inmemory implement (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
tosone authored Mar 20, 2024
1 parent 4de7c8c commit 145a6a4
Show file tree
Hide file tree
Showing 57 changed files with 368 additions and 105 deletions.
6 changes: 4 additions & 2 deletions conf/config-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ cache:
threshold: 0.2

workqueue:
# the workqueue type available: redis, kafka, database
type: database
# the workqueue type available: redis, kafka, database, inmemory
type: inmemory
redis:
concurrency: 10
kafka: {}
database: {}
inmemory:
concurrency: 1024

locker:
# the locker type available: redis, database
Expand Down
4 changes: 3 additions & 1 deletion conf/config-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ cache:
threshold: 0.2

workqueue:
# the workqueue type available: redis, kafka, database
# the workqueue type available: redis, kafka, database, inmemory
type: redis
redis:
concurrency: 10
kafka: {}
database: {}
inmemory:
concurrency: 1024

locker:
# the locker type available: redis, database
Expand Down
10 changes: 4 additions & 6 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ database:
dbname: sigma
sslmode: disable

# deploy available: single, replica
# replica should use external redis
deploy: single

redis:
# redis type available: none, external
# none: means never use redis
Expand All @@ -47,12 +43,14 @@ cache:
threshold: 0.2

workqueue:
# the workqueue type available: redis, kafka, database
type: redis
# the workqueue type available: redis, kafka, database, inmemory
type: database
redis:
concurrency: 10
kafka: {}
database: {}
inmemory:
concurrency: 1024

locker:
# the locker type available: redis, database
Expand Down
4 changes: 3 additions & 1 deletion docs/docs/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ cache:
threshold: 0.2

workqueue:
# the workqueue type available: redis, kafka, database
# the workqueue type available: redis, kafka, database, inmemory
type: redis
redis:
concurrency: 10
kafka: {}
database: {}
inmemory:
concurrency: 1024

locker:
# the locker type available: redis, database
Expand Down
6 changes: 1 addition & 5 deletions pkg/cmds/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ func Serve(serverConfig ServerConfig) error {
if err != nil {
return err
}
err = workq.Initialize(configs.Configuration{
WorkQueue: configs.ConfigurationWorkQueue{
Type: enums.WorkQueueTypeDatabase,
},
})
err = workq.Initialize(config)
if err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/cmds/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func Worker() error {
return err
}

err = workq.Initialize(configs.Configuration{
WorkQueue: configs.ConfigurationWorkQueue{
Type: enums.WorkQueueTypeDatabase,
},
})
err = workq.Initialize(config)
if err != nil {
return err
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/configs/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,17 @@ type ConfigurationWorkQueueDatabase struct {
type ConfigurationWorkQueueKafka struct {
}

type ConfigurationWorkQueueInmemmory struct {
Concurrency int `yaml:"concurrency"`
}

// ConfigurationWorkQueue ...
type ConfigurationWorkQueue struct {
Type enums.WorkQueueType `yaml:"type"`
Redis ConfigurationWorkQueueRedis `yaml:"redis"`
Database ConfigurationWorkQueueDatabase `yaml:"database"`
Kafka ConfigurationWorkQueueKafka `yaml:"kafka"`
Type enums.WorkQueueType `yaml:"type"`
Redis ConfigurationWorkQueueRedis `yaml:"redis"`
Database ConfigurationWorkQueueDatabase `yaml:"database"`
Kafka ConfigurationWorkQueueKafka `yaml:"kafka"`
Inmemory ConfigurationWorkQueueInmemmory `yaml:"inmemory"`
}

// ConfigurationLocker ...
Expand Down
3 changes: 3 additions & 0 deletions pkg/configs/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func defaultSettings() {
if configuration.Daemon.Builder.Podman.URI == "" {
configuration.Daemon.Builder.Podman.URI = "unix:///run/podman/podman.sock"
}
if configuration.WorkQueue.Inmemory.Concurrency == 0 {
configuration.WorkQueue.Inmemory.Concurrency = 1024
}
if configuration.Cache.Inmemory.Size == 0 {
configuration.Cache.Inmemory.Size = 10240
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cronjob/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r builderRunner) runner(ctx context.Context, tw timewheel.TimeWheel) {
return err
}

err = workq.ProducerClient.Produce(ctx, string(enums.DaemonBuilder), types.DaemonBuilderPayload{
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{
Action: enums.DaemonBuilderActionStart,
BuilderID: builderObj.ID,
RunnerID: runner.ID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonBuilder.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonBuilder] = definition.Consumer{
Handler: builderRunner,
MaxRetry: 1,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/coderepo/coderepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
)

func init() {
workq.TopicHandlers[enums.DaemonCodeRepository.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonCodeRepository] = definition.Consumer{
Handler: crRunner,
MaxRetry: 6,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/gc/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func initGc(ctx context.Context, daemon enums.Daemon, runnerChan chan decoratorS
}

func triggerWebhook(ctx context.Context, webhook decoratorWebhook, producerClient definition.WorkQueueProducer) error {
err := producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err := producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
NamespaceID: webhook.NamespaceID,
Action: webhook.Meta.Action,
Type: enums.WebhookTypeSend,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/gc/gc_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonGcArtifact.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonGcArtifact] = definition.Consumer{
Handler: decorator(enums.DaemonGcArtifact),
MaxRetry: 6,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/gc/gc_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonGcBlob.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonGcBlob] = definition.Consumer{
Handler: decorator(enums.DaemonGcBlob),
MaxRetry: 6,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/gc/gc_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// deleteRepositoryWithNamespace -> deleteRepositoryCheckEmpty -> deleteRepository -> collectRecord

func init() {
workq.TopicHandlers[enums.DaemonGcRepository.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonGcRepository] = definition.Consumer{
Handler: decorator(enums.DaemonGcRepository),
MaxRetry: 6,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/gc/gc_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// deleteTagWithNamespace -> deleteTagWithRepository -> deleteTagCheckPattern -> deleteTag -> collectRecord

func init() {
workq.TopicHandlers[enums.DaemonGcTag.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonGcTag] = definition.Consumer{
Handler: decorator(enums.DaemonGcTag),
MaxRetry: 6,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/pushed/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonArtifactPushed.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonArtifactPushed] = definition.Consumer{
Handler: func(ctx context.Context, data []byte) error {
var payload types.DaemonArtifactPushedPayload
err := json.Unmarshal(data, &payload)
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/pushed/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonTagPushed.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonTagPushed] = definition.Consumer{
Handler: func(ctx context.Context, data []byte) error {
var payload types.DaemonTagPushedPayload
err := json.Unmarshal(data, &payload)
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/scan/sbom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonSbom.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonSbom] = definition.Consumer{
Handler: decorator(runnerSbom),
MaxRetry: 1,
Concurrency: 10,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/scan/vulnerability.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonVulnerability.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonVulnerability] = definition.Consumer{
Handler: decorator(runnerVulnerability),
MaxRetry: 1,
Concurrency: 1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func init() {
workq.TopicHandlers[enums.DaemonWebhook.String()] = definition.Consumer{
workq.TopicHandlers[enums.DaemonWebhook] = definition.Consumer{
Handler: webhookRunner,
MaxRetry: 6,
Concurrency: 10,
Expand Down
4 changes: 2 additions & 2 deletions pkg/dal/dao/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *repositoryService) Create(ctx context.Context, repositoryObj *models.Re
}

if autoCreateNamespace.ProducerClient != nil {
err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
NamespaceID: ptr.Of(namespaceObj.ID),
Action: enums.WebhookActionCreate,
ResourceType: enums.WebhookResourceTypeNamespace,
Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *repositoryService) Create(ctx context.Context, repositoryObj *models.Re
return err
}
if autoCreateNamespace.ProducerClient != nil {
err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = autoCreateNamespace.ProducerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
NamespaceID: ptr.Of(namespaceObj.ID),
Action: enums.WebhookActionCreate,
ResourceType: enums.WebhookResourceTypeRepository,
Expand Down
4 changes: 2 additions & 2 deletions pkg/dal/dao/workq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWorkQueueService(t *testing.T) {
assert.NotNil(t, workqService)

workqObj := &models.WorkQueue{
Topic: "topic",
Topic: enums.DaemonGc,
Payload: []byte("payload"),
Version: "version",
}
Expand All @@ -72,7 +72,7 @@ func TestWorkQueueService(t *testing.T) {
err = workqService.UpdateStatus(ctx, workqObj.ID, "version", "newVersion", 1, enums.TaskCommonStatusPending)
assert.NoError(t, err)

workqNewObj, err := workqService.Get(ctx, "topic")
workqNewObj, err := workqService.Get(ctx, enums.DaemonGc.String())
assert.NoError(t, err)
assert.Equal(t, workqObj.ID, workqNewObj.ID)
assert.Equal(t, workqObj.Topic, workqNewObj.Topic)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dal/models/workq.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type WorkQueue struct {
DeletedAt soft_delete.DeletedAt `gorm:"softDelete:milli"`
ID int64 `gorm:"primaryKey"`

Topic string
Topic enums.Daemon
Payload []byte
Times int
Version string
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/builders/builders_runners_rerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *handler) GetRunnerRerun(c echo.Context) error {
log.Error().Err(err).Msg("Create builder runner failed")
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create builder runner failed: %v", err))
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{
Action: enums.DaemonBuilderActionStart,
RepositoryID: req.RepositoryID,
BuilderID: req.BuilderID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/builders/builders_runners_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (h *handler) PostRunnerRun(c echo.Context) error {
log.Error().Err(err).Msg("Create builder runner failed")
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create builder runner failed: %v", err))
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{
Action: enums.DaemonBuilderActionStart,
RepositoryID: req.RepositoryID,
BuilderID: req.BuilderID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/builders/builders_runners_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (h *handler) GetRunnerStop(c echo.Context) error {
log.Error().Err(err).Msg("Update runner status failed")
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update runner status failed: %v", err))
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder.String(), types.DaemonBuilderPayload{
err = workq.ProducerClient.Produce(ctx, enums.DaemonBuilder, types.DaemonBuilderPayload{
Action: enums.DaemonBuilderActionStop,
RepositoryID: req.RepositoryID,
BuilderID: req.BuilderID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/coderepos/coderepos_resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (h *handler) Resync(c echo.Context) error {
if err != nil {
return xerrors.HTTPErrCodeInternalError.Detail("Update user status failed")
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository.String(),
err = workq.ProducerClient.Produce(ctx, enums.DaemonCodeRepository,
types.DaemonCodeRepositoryPayload{User3rdPartyID: user3rdPartyObj.ID}, definition.ProducerOption{Tx: tx})
if err != nil {
log.Error().Err(err).Int64("user_id", user3rdPartyObj.UserID).Msg("Publish sync code repository failed")
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/daemons/daemons_gc_artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (h *handler) UpdateGcArtifactRule(c echo.Context) error {
log.Error().Err(err).Msg("Update gc artifact rule failed")
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc artifact rule failed: %v", err))
}
err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
NamespaceID: namespaceID,
Action: enums.WebhookActionUpdate,
ResourceType: enums.WebhookResourceTypeDaemonTaskGcArtifactRule,
Expand Down Expand Up @@ -299,13 +299,13 @@ func (h *handler) CreateGcArtifactRunner(c echo.Context) error {
log.Error().Int64("ruleID", ruleObj.ID).Msgf("Create gc artifact runner failed: %v", err)
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc artifact runner failed: %v", err))
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonGcArtifact.String(),
err = workq.ProducerClient.Produce(ctx, enums.DaemonGcArtifact,
types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx})
if err != nil {
log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcArtifact.String())
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcArtifact.String()))
}
err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
NamespaceID: namespaceID,
Action: enums.WebhookActionCreate,
ResourceType: enums.WebhookResourceTypeDaemonTaskGcArtifactRunner,
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/daemons/daemons_gc_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (h *handler) UpdateGcBlobRule(c echo.Context) error {
log.Error().Err(err).Msg("Update gc blob rule failed")
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Update gc blob rule failed: %v", err))
}
err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
Action: enums.WebhookActionUpdate,
ResourceType: enums.WebhookResourceTypeDaemonTaskGcBlobRule,
Payload: utils.MustMarshal(req),
Expand Down Expand Up @@ -305,13 +305,13 @@ func (h *handler) CreateGcBlobRunner(c echo.Context) error {
log.Error().Int64("RuleID", ruleObj.ID).Msgf("Create gc blob runner failed: %v", err)
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Create gc blob runner failed: %v", err))
}
err = workq.ProducerClient.Produce(ctx, enums.DaemonGcBlob.String(),
err = workq.ProducerClient.Produce(ctx, enums.DaemonGcBlob,
types.DaemonGcPayload{RunnerID: runnerObj.ID}, definition.ProducerOption{Tx: tx})
if err != nil {
log.Error().Err(err).Msgf("Send topic %s to work queue failed", enums.DaemonGcBlob.String())
return xerrors.HTTPErrCodeInternalError.Detail(fmt.Sprintf("Send topic %s to work queue failed", enums.DaemonGcBlob.String()))
}
err = h.producerClient.Produce(ctx, enums.DaemonWebhook.String(), types.DaemonWebhookPayload{
err = h.producerClient.Produce(ctx, enums.DaemonWebhook, types.DaemonWebhookPayload{
Action: enums.WebhookActionCreate,
ResourceType: enums.WebhookResourceTypeDaemonTaskGcBlobRunner,
Payload: utils.MustMarshal(req),
Expand Down
Loading

0 comments on commit 145a6a4

Please sign in to comment.