Skip to content

Commit

Permalink
downloader: refactor worker scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 12, 2024
1 parent 95e035e commit c5647d0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 68 deletions.
4 changes: 1 addition & 3 deletions ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ func (n *Node) Close() error {

// GetBlock fetches a block from the IPFS network
func (n *Node) GetBlock(ctx context.Context, c cid.Cid) (format.Node, error) {
// it should never take more than a minute to fetch a single block
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
return n.dagService.Get(ctx, c)
}

Expand Down Expand Up @@ -231,6 +228,7 @@ func NewNode(ctx context.Context, privateKey crypto.PrivKey, cfg config.IPFS, ds
bitswap.EngineBlockstoreWorkerCount(600),
bitswap.TaskWorkerCount(600),
bitswap.MaxOutstandingBytesPerPeer(int(5 << 20)),
bitswap.ProvideEnabled(true),
}

bitswapNet := bnetwork.NewFromIpfsHost(host, frt)
Expand Down
124 changes: 59 additions & 65 deletions renterd/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ type (

bucket string

ch chan struct{}

mu sync.Mutex // protects the fields below
cond sync.Cond
inflight map[string]*blockResponse
queue *priorityQueue
dataCache *lru.TwoQueueCache[string, []byte]
Expand Down Expand Up @@ -150,67 +149,60 @@ func (bd *BlockDownloader) downloadBlockData(ctx context.Context, c cid.Cid, buc
return blockBuf.Bytes(), nil
}

func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) {
start := time.Now()
log = log.Named("doDownloadTask").With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority))
func (bd *BlockDownloader) queueRelated(c cid.Cid) {
log := bd.log.Named("queueRelated").With(zap.Stringer("cid", c))
siblings, err := bd.store.BlockSiblings(c, 10)
if err != nil {
log.Error("failed to get block siblings", zap.Error(err))
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
children, err := bd.store.BlockChildren(c, 10)
if err != nil {
log.Error("failed to get block siblings", zap.Error(err))
return
}

go func() {
siblings, err := bd.store.BlockSiblings(task.cid, 10)
bd.mu.Lock()
defer bd.mu.Unlock()

for _, sibling := range siblings {
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(sibling)
if err != nil {
log.Error("failed to get block siblings", zap.Error(err))
return
continue
}

for _, sibling := range siblings {
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(sibling)
if err != nil {
continue
if !bd.dataCache.Contains(cidKey(sibling)) {
if _, ok := bd.queueBlock(sibling, bucket, key, downloadPriorityLow); ok {
log.Debug("queued sibling", zap.Stringer("sibling", sibling))
}

func() {
bd.mu.Lock()
defer bd.mu.Unlock()

if bd.dataCache.Contains(cidKey(sibling)) {
return
} else if _, ok := bd.queueBlock(sibling, bucket, key, downloadPriorityLow); ok {
bd.log.Debug("queued sibling", zap.Stringer("cid", sibling))
}
}()
}
}

children, err := bd.store.BlockChildren(task.cid, 10)
for _, child := range children {
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(child)
if err != nil {
log.Error("failed to get block children", zap.Error(err))
return
continue
}

for _, child := range children {
// check if the block exists in the store
bucket, key, err := bd.store.BlockLocation(child)
if err != nil {
continue
if !bd.dataCache.Contains(cidKey(child)) {
if _, ok := bd.queueBlock(child, bucket, key, downloadPriorityLow); ok {
log.Debug("queued child", zap.Stringer("child", child))
}
}
}
}

func() {
bd.mu.Lock()
defer bd.mu.Unlock()
func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) {
start := time.Now()
log = log.Named("doDownloadTask").With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority))

if bd.dataCache.Contains(cidKey(child)) {
return
} else if _, ok := bd.queueBlock(child, bucket, key, downloadPriorityLow); ok {
bd.log.Debug("queued child", zap.Stringer("cid", child))
}
}()
}
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

buf, err := bd.downloadBlockData(ctx, task.cid, task.bucket, task.key)
bd.mu.Lock()
if err != nil {
log.Error("failed to download block", zap.Error(err))
task.err = err
Expand All @@ -220,30 +212,33 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
bd.dataCache.Add(cidKey(task.cid), buf)
}
close(task.ch)
delete(bd.inflight, cidKey(task.cid))
bd.mu.Unlock()

if task.priority >= downloadPriorityHigh {
go bd.queueRelated(task.cid)
}
}

func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) {
func (bd *BlockDownloader) downloadWorker(n int) {
log := bd.log.Named("worker").With(zap.Int("id", n))
for {
select {
case <-ctx.Done():
return
case <-bd.ch:
}

bd.mu.Lock()
if bd.queue.Len() == 0 {
bd.mu.Unlock()
continue
for bd.queue.Len() == 0 {
bd.cond.Wait()
}

// pop the highest priority task from the queue
task := heap.Pop(bd.queue).(*blockResponse)
bd.mu.Unlock() // unlock the mutex before doing the download

// download the block
log := log.With(zap.Stringer("cid", task.cid), zap.Stringer("priority", task.priority))
log.Debug("popped task from queue")
bd.mu.Unlock() // unlock prior to downloading to prevent blocking other workers
bd.doDownloadTask(task, log)

// delete the task from the inflight map after it's done
bd.mu.Lock()
delete(bd.inflight, cidKey(task.cid))
bd.mu.Unlock()
}
}

Expand All @@ -269,7 +264,7 @@ func (bd *BlockDownloader) queueBlock(c cid.Cid, bucket, key string, priority do
}
bd.inflight[cidKey(c)] = resp
heap.Push(bd.queue, resp)
bd.ch <- struct{}{}
bd.cond.Signal()
return resp, true
}

Expand Down Expand Up @@ -316,13 +311,12 @@ func NewBlockDownloader(store MetadataStore, bucket string, cacheSize, workers i
inflight: make(map[string]*blockResponse),
queue: &priorityQueue{},
dataCache: cache,

ch: make(chan struct{}, workers),
bucket: bucket,
bucket: bucket,
}
bd.cond.L = &bd.mu
heap.Init(bd.queue)
for i := 0; i < workers; i++ {
go bd.downloadWorker(context.Background(), i)
go bd.downloadWorker(i)
}
return bd, nil
}

0 comments on commit c5647d0

Please sign in to comment.