Skip to content

Commit

Permalink
cmd, downloader: initialize workers in constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 3, 2024
1 parent b3a92d6 commit 2ed1429
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
3 changes: 1 addition & 2 deletions cmd/fsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ func main() {
workerClient := worker.NewClient(cfg.Renterd.WorkerAddress, cfg.Renterd.WorkerPassword)
busClient := bus.NewClient(cfg.Renterd.BusAddress, cfg.Renterd.BusPassword)

bd, err := downloader.NewBlockDownloader(cfg.Renterd.Bucket, cfg.BlockStore.CacheSize, workerClient, log.Named("downloader"))
bd, err := downloader.NewBlockDownloader(cfg.Renterd.Bucket, cfg.BlockStore.CacheSize, cfg.BlockStore.MaxConcurrent, workerClient, log.Named("downloader"))
if err != nil {
log.Fatal("failed to create block downloader", zap.Error(err))
}
bd.StartWorkers(ctx, cfg.BlockStore.MaxConcurrent)

bs, err := renterd.NewBlockStore(
renterd.WithBucket(cfg.Renterd.Bucket),
Expand Down
13 changes: 5 additions & 8 deletions renterd/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,23 +193,17 @@ func (bd *BlockDownloader) Get(ctx context.Context, c cid.Cid) (blocks.Block, er
return bd.getResponse(c, downloadPriorityHigh).block(ctx, c)
}

// StartWorkers starts n workers to download blocks.
func (bd *BlockDownloader) StartWorkers(ctx context.Context, n int) {
for i := 0; i < n; i++ {
go bd.downloadWorker(ctx, i)
}
}

func cidKey(c cid.Cid) string {
return cid.NewCidV1(c.Type(), c.Hash()).String()
}

// NewBlockDownloader creates a new BlockDownloader.
func NewBlockDownloader(bucket string, cacheSize int, workerClient *worker.Client, log *zap.Logger) (*BlockDownloader, error) {
func NewBlockDownloader(bucket string, cacheSize, workers int, workerClient *worker.Client, log *zap.Logger) (*BlockDownloader, error) {
cache, err := lru.New2Q[string, *blockResponse](cacheSize)
if err != nil {
return nil, err
}

bd := &BlockDownloader{
workerClient: workerClient,
log: log,
Expand All @@ -220,5 +214,8 @@ func NewBlockDownloader(bucket string, cacheSize int, workerClient *worker.Clien
bucket: bucket,
}
heap.Init(bd.queue)
for i := 0; i < workers; i++ {
go bd.downloadWorker(context.Background(), i)
}
return bd, nil
}

0 comments on commit 2ed1429

Please sign in to comment.