Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

Commit

Permalink
pass log child to analyzers
Browse files Browse the repository at this point in the history
  • Loading branch information
jirfag committed Jul 6, 2019
1 parent 5c20fbd commit 6e6a3cf
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/worker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,25 @@ func (a *App) buildDeps() {
}
}

func (a App) buildMultiplexer() *consumers.Multiplexer {
func (a App) buildMultiplexer(log logutil.Log) *consumers.Multiplexer {
rpf := processors.NewRepoProcessorFactory(&processors.StaticRepoConfig{})

// it's important to use a.log, not a.trackedLog
repoAnalyzer := analyzesConsumers.NewAnalyzeRepo(rpf, a.log, a.errTracker, a.cfg, a.ec)
// it's important to use log (a.log.Child), not a.trackedLog
repoAnalyzer := analyzesConsumers.NewAnalyzeRepo(rpf, log, a.errTracker, a.cfg, a.ec)
repoAnalyzesRunner := repoanalyzesqueue.NewConsumer(repoAnalyzer)

// it's important to use a.log, not a.trackedLog
pullAnalyzer := analyzesConsumers.NewAnalyzePR(a.ppf, a.log, a.errTracker, a.cfg, a.ec)
// it's important to use log (a.log.Child), not a.trackedLog
pullAnalyzer := analyzesConsumers.NewAnalyzePR(a.ppf, log, a.errTracker, a.cfg, a.ec)
pullAnalyzesRunner := pullanalyzesqueue.NewConsumer(pullAnalyzer)

multiplexer := consumers.NewMultiplexer()
multiplexer.SetResultLogger(func(error) {}) // already logged, no double logging

if err := repoAnalyzesRunner.Register(multiplexer, a.distLockFactory); err != nil {
a.log.Fatalf("Failed to register repo analyzer consumer: %s", err)
log.Fatalf("Failed to register repo analyzer consumer: %s", err)
}
if err := pullAnalyzesRunner.Register(multiplexer, a.distLockFactory); err != nil {
a.log.Fatalf("Failed to register pull analyzer consumer: %s", err)
log.Fatalf("Failed to register pull analyzer consumer: %s", err)
}

return multiplexer
Expand Down Expand Up @@ -144,8 +144,6 @@ func (a App) BuildTestDeps() *TestDeps {
}

func (a App) Run() {
consumerMultiplexer := a.buildMultiplexer()

consumersCount := a.cfg.GetInt("CONSUMERS_COUNT", 1)
a.log.Infof("Starting %d consumers...", consumersCount)

Expand All @@ -155,7 +153,9 @@ func (a App) Run() {
go func(i int) {
defer wg.Done()

trackedLog := a.trackedLog.Child(fmt.Sprintf("consumer #%d", i))
logName := fmt.Sprintf("consumer #%d", i)
consumerMultiplexer := a.buildMultiplexer(a.log.Child(logName))
trackedLog := a.trackedLog.Child(logName)
analyzesSQS := sqs.NewQueue(a.cfg.GetString("SQS_ANALYZES_QUEUE_URL"),
a.awsSess, trackedLog, analyzesqueue.VisibilityTimeoutSec)
consumer := consumer.NewSQS(trackedLog, a.cfg, analyzesSQS,
Expand Down

0 comments on commit 6e6a3cf

Please sign in to comment.