Skip to content

Commit

Permalink
refact/must-gather: isolate and tune leaky bucket processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mtulio committed Jul 30, 2024
1 parent aae828b commit 10ccd49
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 95 deletions.
149 changes: 149 additions & 0 deletions internal/openshift/mustgather/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,45 @@ package mustgather

import (
"bytes"
"os"
"strconv"
"sync"
"time"

"github.com/redhat-openshift-ecosystem/provider-certification-tool/internal/opct/archive"
log "github.com/sirupsen/logrus"
)

var (
// maxRateItemsToProcessQueue is the max number of items to process in parallel.
defaultBufferLeakyBucket = 20
// queueMaxSize is the max number of items to be queued in the bucket/memory before
// unblocked by the rate limiter.
defaultSizeLeakyBucket = 50
// rateLimitIntervalMillisec lower values will increase the rate of processing,
// but it will increase the risk of exhausting resources.
defaultRateLimitIntervalLeakyBucket = 10 * time.Millisecond
)

func init() {
// allow to override the rate limit to control the processing speed,
// and consume less resources.
overrideRateLimit := os.Getenv("OPCT_MUSTGATHER_RATELIMIT")
if overrideRateLimit == "" {
return
}
rate, err := strconv.Atoi(overrideRateLimit)
if err != nil {
log.Errorf("error parsing rate limit environment var OPCT_MUSTGATHER_RATELIMIT: %v", err)
return
}
if rate <= 0 || rate > 100 {
log.Errorf("invalid rate limit value, must be between 1 and 100: %d", rate)
return
}
defaultRateLimitIntervalLeakyBucket = time.Duration(rate) * time.Millisecond
}

// MustGatherLog hold the must-gather findings in logs.
type MustGatherLog struct {
Path string
Expand All @@ -29,3 +64,117 @@ func (mgl *MustGatherLog) Processed() bool {
}
return false
}

// Leaky bucket implementation (queue limit) to parallel process must-gather items
// without exhausting resources. Increase the leakRate to process more items.
// The value of 10 (ms) is a ideal value, if want to decrease the CPU usage while
// processing the must-gather logs, increase the value to 100 (ms) by setting
// the environment variable OPCT_MUSTGATHER_RATELIMIT.
type leakyBucket struct {
// bucketSize is the maximum number of items that can be stored in the bucket.
bucketSize int
// leakRate is the number of items that are removed from the bucket every second.
leakRate time.Duration
// bucket is the current number of items in the bucket.
bucket int

queue chan *MustGatherLog
queueCount int
rateLimiter chan struct{}
semaphore chan struct{}
waiter sync.WaitGroup
locker sync.Mutex

// activeReading is a flag to indicate if the bucket is being read.
activeReading bool

// processor function to be called when the bucket is full.
processor func(*MustGatherLog)
}

func newLeakyBucket(bucketSize int, leakRate time.Duration, fn func(*MustGatherLog)) *leakyBucket {
lb := &leakyBucket{
bucketSize: bucketSize,
leakRate: leakRate,
bucket: 0,
queue: make(chan *MustGatherLog, bucketSize),
queueCount: 0,
rateLimiter: make(chan struct{}, defaultBufferLeakyBucket),
semaphore: make(chan struct{}, defaultBufferLeakyBucket),
processor: fn,
activeReading: true,
}

for i := 0; i < cap(lb.rateLimiter); i++ {
lb.rateLimiter <- struct{}{}
}

// leaky bucket ticker pausing the rate of processing every
// rateLimitIntervalMillisec.
go func() {
log.Debug("Leaky bucket ticker - starting")
ticker := time.NewTicker(lb.leakRate)
defer ticker.Stop()
for range ticker.C {
_, ok := <-lb.rateLimiter
// if this isn't going to run indefinitely, signal
// this to return by closing the rate channel.
if !ok {
print("Leaky bucket rate limiter - closing")
return
}
}
}()

// consume the queued pod logs to be processed/extracted information.
go func() {
log.Debug("Leaky bucket processor - starting")
for data := range lb.queue {
lb.processor(data)
lb.decrement()
}
}()

// monitor the queue size
go func() {
log.Debug("Leaky bucket monitor - starting")
for lb.activeReading {
log.Debugf("Must-gather processor - queue size monitor: %d", lb.queueCount)
time.Sleep(10 * time.Second)
}
}()

return lb
}

// decrement decrements the number of items in the queue.
func (lb *leakyBucket) decrement() {
lb.waiter.Done()
lb.locker.Lock()
lb.queueCount -= 1
lb.locker.Unlock()
}

// Incremet increments the number of items in the queue.
func (lb *leakyBucket) Incremet() {
lb.waiter.Add(1)
lb.locker.Lock()
lb.queueCount += 1
lb.locker.Unlock()
}

// AppendQueue checks the rate limiter and semaphore, then
// add a new item to the queue.
func (lb *leakyBucket) AppendQueue(mgl *MustGatherLog) {
// wait for the rate limiter
lb.rateLimiter <- struct{}{}

// check the concurrency semaphore
lb.semaphore <- struct{}{}
defer func() {
<-lb.semaphore
}()

// Sending the item to the queue
lb.queue <- mgl
}
109 changes: 15 additions & 94 deletions internal/openshift/mustgather/mustgather.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/redhat-openshift-ecosystem/provider-certification-tool/internal/opct/archive"
Expand Down Expand Up @@ -166,101 +165,31 @@ func (mg *MustGather) calculateCountersEtcd() {
// extract read the tarball and extract the required information, scanning all files.
func (mg *MustGather) extract(tarball *tar.Reader) error {

// Create must-gather directory
// Create must-gather directory under the result path.
if _, err := os.Stat(mg.path); err != nil {
if err := os.MkdirAll(mg.path, 0755); err != nil {
return err
}
}

// TODO()#1: create a queue package with a instance of MustGatherLog.
// TODO()#2: increase the parallelism targetting to decrease the total proc time.
// Leaky bucket implementation (queue limit) to parallel process must-gather items
// without exhausting resources.
// Benckmark info: this parallel processing decreased 3 times the total processing time.
// Samples: Serial=~100s, rate(100)=~30s, rate(150)=~25s.
keepReading := true
procQueueSize := 0
var procQueueLocker sync.Mutex
// Creating queue monitor as Waiter group does not provide interface to check the
// queue size.
procQueueInc := func() {
procQueueLocker.Lock()
procQueueSize += 1
procQueueLocker.Unlock()
}
procQueueDec := func() {
procQueueLocker.Lock()
procQueueSize -= 1
procQueueLocker.Unlock()
}
go func() {
for keepReading {
log.Debugf("Must-gather processor - queue size monitor: %d", procQueueSize)
time.Sleep(10 * time.Second)
}
}()

const (
// maxRateItemsToProcessQueue is the max number of items to process in parallel.
maxRateItemsToProcessQueue = 20
// queueMaxSize is the max number of items to be queued in the bucket/memory before
// unblocked by the rate limiter.
queueMaxSize = 50
// rateLimitIntervalMillisec lower values will increase the rate of processing,
// but it will increase the risk of exhausting resources.
rateLimitIntervalMillisec = 10 * time.Millisecond
)

waiterProcNS := &sync.WaitGroup{}
chProcNSErrors := make(chan *MustGatherLog, queueMaxSize)
semaphore := make(chan struct{}, maxRateItemsToProcessQueue)
// have a max rate of N/sec
rate := make(chan struct{}, maxRateItemsToProcessQueue)
for i := 0; i < cap(rate); i++ {
rate <- struct{}{}
}
// leaky bucket ticker pausing the rate of processing every
// rateLimitIntervalMillisec.
go func() {
ticker := time.NewTicker(rateLimitIntervalMillisec)
defer ticker.Stop()
for range ticker.C {
_, ok := <-rate
// if this isn't going to run indefinitely, signal
// this to return by closing the rate channel.
if !ok {
return
}
}
}()

// consume the queued pod logs to be processed/extracted information.
go func() {
for mgLog := range chProcNSErrors {
mg.processNamespaceErrors(mgLog)
waiterProcNS.Done()
procQueueDec()
}
}()
processorBucket := newLeakyBucket(defaultSizeLeakyBucket, defaultRateLimitIntervalLeakyBucket, mg.processNamespaceErrors)

// Walk through files in must-gather tarball file.
for keepReading {
for processorBucket.activeReading {
header, err := tarball.Next()

switch {
// no more files
case err == io.EOF:
log.Debugf("Must-gather processor queued, queue size: %d", procQueueSize)
waiterProcNS.Wait()
keepReading = false
log.Debugf("Must-gather processor finished, queue size: %d", procQueueSize)
log.Debugf("Must-gather processor queued, queue size: %d", processorBucket.queueCount)
processorBucket.waiter.Wait()
processorBucket.activeReading = false
log.Debugf("Must-gather processor finished, queue size: %d", processorBucket.queueCount)
return nil

// return on error
case err != nil:
return errors.Wrapf(err, "error reading tarball")
// return err

// skip it when the headr isn't set (not sure how this happens)
case header == nil:
Expand All @@ -269,7 +198,7 @@ func (mg *MustGather) extract(tarball *tar.Reader) error {

// the target location where the dir/file should be created.
target := filepath.Join(mg.path, header.Name)
ok, typ := mg.matchToExtract(target)
ok, itemType := matchToExtract(target)
if !ok {
continue
}
Expand All @@ -280,9 +209,9 @@ func (mg *MustGather) extract(tarball *tar.Reader) error {
// fi := header.FileInfo()

switch header.Typeflag {

// directories in tarball.
case tar.TypeDir:

// creating subdirectories structures will be ignored and need
// sub-directories under mg.path must be created previously if needed.
/*
Expand All @@ -299,29 +228,21 @@ func (mg *MustGather) extract(tarball *tar.Reader) error {
case tar.TypeReg:
// Save/Process only files matching known types, it will prevent processing && saving
// all the files in must-gather, extracting only information required by OPCT.
switch typ {
switch itemType {
case patternNamePodLogs:
// parallel processing the logs
buf := bytes.Buffer{}
if _, err := io.Copy(&buf, tarball); err != nil {
return err
}
waiterProcNS.Add(1)
procQueueInc()
// waiterProcNS.Add(1)
// procQueueInc()
processorBucket.Incremet()
go func(filename string, buffer *bytes.Buffer) {
// wait for the rate limiter
rate <- struct{}{}

// check the concurrency semaphore
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
// log.Debugf("Producing log processor for file: %s", mgLog.Path)
chProcNSErrors <- &MustGatherLog{
processorBucket.AppendQueue(&MustGatherLog{
Path: filename,
buffer: buffer,
}
})
}(targetAlias, &buf)

case patternNameEvents:
Expand Down
2 changes: 1 addition & 1 deletion internal/openshift/mustgather/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
// matchToExtract define patterns to continue the must-gather processor.
// the pattern must be defined if the must be extracted. It will return
// a boolean with match and the file group (pattern type).
func (mg *MustGather) matchToExtract(path string) (bool, string) {
func matchToExtract(path string) (bool, string) {
for typ, pattern := range mustGatherFilePatterns {
re := regexp.MustCompile(pattern)
if re.MatchString(path) {
Expand Down

0 comments on commit 10ccd49

Please sign in to comment.