Skip to content

Commit

Permalink
sia: parallel download block pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Dec 1, 2023
1 parent 234d65b commit 0dcfdcf
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions sia/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -43,20 +44,48 @@ func (bs *RenterdBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) {

func (bs *RenterdBlockStore) downloadBlock(ctx context.Context, cm Block) (blocks.Block, error) {
var data, metadata []byte
var err error

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
errCh := make(chan error, 2)
if cm.Data.BlockSize != 0 {
data, err = downloadPartialData(ctx, bs.renterd, cm.Data.Bucket, cm.Data.Key, cm.Data.Offset, cm.Data.BlockSize)
if err != nil {
return nil, fmt.Errorf("failed to download data: %w", err)
}
wg.Add(1)
go func() {
defer wg.Done()

var err error
data, err = downloadPartialData(ctx, bs.renterd, cm.Data.Bucket, cm.Data.Key, cm.Data.Offset, cm.Data.BlockSize)
if err != nil {
bs.log.Error("failed to download block data", zap.Error(err), zap.Stringer("cid", cm.CID))
errCh <- fmt.Errorf("failed to download block data: %w", err)
}
}()
}

if cm.Metadata.Length != 0 {
metadata, err = downloadPartialData(ctx, bs.renterd, cm.Metadata.Bucket, cm.Metadata.Key, cm.Metadata.Offset, cm.Metadata.Length)
if err != nil {
return nil, fmt.Errorf("failed to download object: %w", err)
}
wg.Add(1)
go func() {
defer wg.Done()

var err error
metadata, err = downloadPartialData(ctx, bs.renterd, cm.Metadata.Bucket, cm.Metadata.Key, cm.Metadata.Offset, cm.Metadata.Length)
if err != nil {
bs.log.Error("failed to download block metadata", zap.Error(err), zap.Stringer("cid", cm.CID))
errCh <- fmt.Errorf("failed to download block metadata: %w", err)
}
}()
}

// Wait for all downloads to complete.
go func() {
wg.Wait()
close(errCh)
}()

for err := range errCh {
return nil, err
}

if n := len(data); n != int(cm.Data.BlockSize) {
Expand Down

0 comments on commit 0dcfdcf

Please sign in to comment.