Skip to content

Commit

Permalink
improve piece selection
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Becker committed Apr 28, 2024
1 parent ecad50c commit d92400f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 80 deletions.
129 changes: 53 additions & 76 deletions lib/bittorrent/swarm/piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,44 @@ type cachedPiece struct {

// should we accept a piece data with offset and length ?
func (p *cachedPiece) accept(offset, length uint32) bool {
return offset+length <= p.length
if offset % BlockSize != 0 {
log.Errorf("Rejecting chunk where piece offset=%d % BlockSize=%d != 0", offset, BlockSize)
return false
}

if offset + length > p.length {
log.Errorf("Rejecting chunk where piece ending offset=%d > piece length=%d", offset + length, p.length)
return false
}

if p.bitfieldIndex(offset) == p.finalChunkBitfieldIndex() {
// last piece
if length != p.finalChunkLen() {
log.Errorf("Rejecting final chunk of piece where length=%d != finalChunkLen=%d", length, p.finalChunkLen())
return false
}
} else {
if length != BlockSize {
log.Errorf("Rejecting non-final chunk of piece where length=%d != BlockSize=%d", length, BlockSize)
return false
}
}

return true
}

func (p *cachedPiece) finalChunkBitfieldIndex() uint32 {
return p.bitfieldIndex(p.length - 1)
}

func (p *cachedPiece) finalChunkLen() uint32 {
rem := p.length % BlockSize

if rem == 0 {
return BlockSize
} else {
return rem
}
}

// is this piece done downloading ?
Expand Down Expand Up @@ -66,29 +103,17 @@ func (p *cachedPiece) nextRequest() (r *common.PieceRequest) {
if p.pending.Has(idx) || p.obtained.Has(idx) {
r.Begin += BlockSize
} else {
break
}
}

if r.Begin+r.Length > l {
// is this probably the last piece ?
if (r.Begin+r.Length)-l >= BlockSize {
// no, let's just say there are no more blocks left
log.Debugf("no next piece request for idx=%d", r.Index)
r = nil
return
} else {
// yes so let's correct the size
if p.pending.Has(p.bitfieldIndex(r.Begin)) {
log.Debugf("no next piece request for idx=%d", r.Index)
r = nil
return
if idx == p.finalChunkBitfieldIndex() {
r.Length = p.finalChunkLen()
}
r.Length = l - r.Begin
log.Debugf("next piece request made: idx=%d offset=%d len=%d total=%d", r.Index, r.Begin, r.Length, l)
p.pending.Set(idx)
return
}
}
log.Debugf("next piece request made: idx=%d offset=%d len=%d total=%d", r.Index, r.Begin, r.Length, l)
p.pending.Set(p.bitfieldIndex(r.Begin))

log.Debugf("no next piece request for idx=%d", r.Index)
r = nil
return
}

Expand All @@ -104,13 +129,6 @@ type pieceTracker struct {
nextPiece PiecePicker
}

// get number of pending pieces we are requesting
func (pt *pieceTracker) NumPending() int {
pt.mtx.Lock()
defer pt.mtx.Unlock()
return len(pt.requests)
}

func (pt *pieceTracker) visitCached(idx uint32, v func(*cachedPiece)) {
pt.mtx.Lock()
_, has := pt.requests[idx]
Expand Down Expand Up @@ -140,7 +158,7 @@ func (pt *pieceTracker) newPiece(piece uint32) bool {

sz := info.LengthOfPiece(piece)
bits := sz / BlockSize
if bits == 0 {
if sz % BlockSize != 0 {
bits++
}
log.Debugf("new piece idx=%d len=%d bits=%d", piece, sz, bits)
Expand All @@ -160,19 +178,6 @@ func (pt *pieceTracker) removePiece(piece uint32) {
pt.mtx.Unlock()
}

func (pt *pieceTracker) pendingPiece(remote *bittorrent.Bitfield) (idx uint32, old bool) {
pt.mtx.Lock()
for k := range pt.requests {
if remote.Has(k) {
idx = k
old = true
break
}
}
pt.mtx.Unlock()
return
}

func (pt *pieceTracker) iterCached(v func(*cachedPiece)) {
pieces := []uint32{}
pt.mtx.Lock()
Expand All @@ -187,14 +192,16 @@ func (pt *pieceTracker) iterCached(v func(*cachedPiece)) {

func (cp *cachedPiece) isExpired() (expired bool) {
now := time.Now()
expired = now.Sub(cp.lastActive) > time.Second*30
expired = now.Sub(cp.lastActive) > time.Minute*5
return
}

func (pt *pieceTracker) PendingPieces() (exclude []uint32) {
pt.mtx.Lock()
for k := range pt.requests {
exclude = append(exclude, k)
for k, cp := range pt.requests {
if cp.pending.CountSet() > 0 {
exclude = append(exclude, k)
}
}
pt.mtx.Unlock()
return
Expand Down Expand Up @@ -224,37 +231,6 @@ func (pt *pieceTracker) NextRequest(remote *bittorrent.Bitfield, lastReq *common
return
}

// deprceated
func (pt *pieceTracker) nextRequestForDownload(remote *bittorrent.Bitfield, req *common.PieceRequest, requestNew bool) bool {
var r *common.PieceRequest
idx, old := pt.pendingPiece(remote)
if old {
pt.visitCached(idx, func(cp *cachedPiece) {
r = cp.nextRequest()
})
}
if r == nil && requestNew {
var exclude []uint32
for k := range pt.requests {
exclude = append(exclude, k)
}
log.Debugf("get next piece excluding %d", exclude)
var has bool
idx, has = pt.nextPiece(remote, exclude)
if has {
pt.visitCached(idx, func(cp *cachedPiece) {
r = cp.nextRequest()
})
}
}
if r != nil && r.Length > 0 {
req.Copy(r)
} else {
return false
}
return true
}

// cancel previously requested piece request
func (pt *pieceTracker) canceledRequest(r *common.PieceRequest) {
if r.Length == 0 {
Expand All @@ -276,6 +252,7 @@ func (pt *pieceTracker) handlePieceData(d *common.PieceData) {
if err == nil {
pc.put(d.Begin)
} else {
pc.cancel(d.Begin)
log.Errorf("failed to put chunk %d: %s", idx, err.Error())
}
if pc.done() {
Expand Down
12 changes: 8 additions & 4 deletions lib/bittorrent/swarm/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,14 @@ func (t *Torrent) tick() {
// expire and cancel all timed out pieces
t.pt.iterCached(func(cp *cachedPiece) {
if cp.isExpired() {
t.VisitPeers(func(conn *PeerConn) {
conn.cancelPiece(cp.index)
})
t.pt.removePiece(cp.index)
if cp.pending.CountSet() > 0 {
t.VisitPeers(func(conn *PeerConn) {
conn.cancelPiece(cp.index)
})
cp.pending.Zero()
log.Debugf("Expired piece %d with no recent activity for torrent: %s", cp.index, t.Name())
}
cp.lastActive = time.Now()
}
})
t.VisitPeers(func(conn *PeerConn) {
Expand Down

0 comments on commit d92400f

Please sign in to comment.