-
Notifications
You must be signed in to change notification settings - Fork 2
Proposal for a 'per-request prefetch' #62
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package stoker | ||
|
||
import ( | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
type writeAheadSynchronizer struct { | ||
max int | ||
min int | ||
current atomic.Int32 | ||
notify chan struct{} | ||
} | ||
|
||
func NewWAS(max, min int) *writeAheadSynchronizer { | ||
return &writeAheadSynchronizer{ | ||
max: max, | ||
min: min, | ||
current: atomic.Int32{}, | ||
notify: make(chan struct{}, 1), | ||
} | ||
} | ||
|
||
// requst to add 1 to the queue, potentially blocking until that's cokay | ||
func (w *writeAheadSynchronizer) ReqAdd() { | ||
n := w.current.Add(1) | ||
if n >= int32(w.max) { | ||
w.wait() | ||
} | ||
} | ||
|
||
func (w *writeAheadSynchronizer) wait() { | ||
done := false | ||
t := time.NewTimer(15 * time.Millisecond) | ||
for { | ||
select { | ||
case <-t.C: | ||
case <-w.notify: | ||
} | ||
|
||
curr := w.current.Load() | ||
if curr < int32(w.min) { | ||
done = true | ||
} | ||
|
||
if !t.Stop() { | ||
// Exhaust expired timer's chan. | ||
select { | ||
case <-t.C: | ||
default: | ||
} | ||
} | ||
if done { | ||
return | ||
} | ||
t.Reset(15 * time.Millisecond) | ||
} | ||
} | ||
|
||
// decrement 1 from the queue, potentially unblocking writers | ||
func (w *writeAheadSynchronizer) Dec() { | ||
curr := w.current.Add(-1) | ||
if curr < 0 { | ||
// todo: okay that not atomic? i think we're okay leaking in this direction | ||
w.current.Store(0) | ||
} | ||
if curr <= int32(w.min) { | ||
select { | ||
case w.notify <- struct{}{}: | ||
default: | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Stoker loads a dag from a remote host into an in-memory blockstore with the | ||
// intention that it will be processed in a traversal-order compatible way such | ||
// that data can be efficiently streamed and not fully buffered in memory. | ||
package stoker | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/filecoin-saturn/caboose" | ||
"github.com/ipld/go-car" | ||
|
||
lru "github.com/hashicorp/golang-lru" | ||
"github.com/ipfs/go-cid" | ||
ipfsblockstore "github.com/ipfs/go-ipfs-blockstore" | ||
"github.com/ipfs/go-libipfs/blocks" | ||
golog "github.com/ipfs/go-log/v2" | ||
) | ||
|
||
var goLogger = golog.Logger("caboose/stoker") | ||
|
||
type Stoker struct { | ||
*caboose.Caboose | ||
} | ||
|
||
func New(c *caboose.Caboose) *Stoker { | ||
return &Stoker{c} | ||
} | ||
|
||
const SessionCacheSize = 1024 | ||
|
||
func (s *Stoker) NewSession(ctx context.Context, path string) ipfsblockstore.Blockstore { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will bifrost directly use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lidel Is it easy for Bifrost to pass the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does passing the path to Lassie make more sense than passing the "affinity key" to Lassie ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also @lidel -> Would it make sense to share the block cache across requests given IPFS GW access patterns ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
My understanding is we won't be able to use anything other than blockstore interface until we finish go-libipfs/gateway refactor. When done, bifrost-gateway may use
Caboose already reads and uses the requested Content Path as the Affinity key here. This is the parent, normalized content path that the block request comes from (so you get uniform behavior across path, subdomain, and dnslink gateway requests based on The problem is, you don't know how much you need to prefetch to satisfy client's request.
Probably when you know the context (full file OR only dir enumeration vs ALWAYS full dag). If you don't know the context, sending path to lassie will force preload of full dag (too much) or only root level (too little if request was for a file), and most likely impact earnings of L1s.
We already have global block cache in bifrost-gateway – 2Q cache is based on frequency+recency to match gateway usage patterns, and has hit ratio of 40%-50%. If you add any additional cache, add a metric which will let us see if it brings any value (prior art in blockstore_cache.go) |
||
cache, err := lru.NewARC(SessionCacheSize) | ||
if err != nil { | ||
goLogger.Warnw("failed to allocate cache for session", "err", err) | ||
return nil | ||
} | ||
|
||
ss := stokerSession{ | ||
c: s.Caboose, | ||
cache: cache, | ||
writeAheadSynchronizer: NewWAS(SessionCacheSize*3/4, SessionCacheSize*1/4), | ||
} | ||
go ss.fillCache(ctx, path) | ||
|
||
return &ss | ||
} | ||
|
||
type stokerSession struct { | ||
c *caboose.Caboose | ||
cache *lru.ARCCache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: avoid using ARC, it comes with uncertainty related to patent by IBM and someone will ask for removal: ipfs/kubo#6590 |
||
*writeAheadSynchronizer | ||
} | ||
|
||
func (ss *stokerSession) fillCache(ctx context.Context, path string) error { | ||
err := ss.c.Fetch(ctx, path, func(resource string, reader io.Reader) error { | ||
cr, err := car.NewCarReader(reader) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
n := 0 | ||
for { | ||
ss.writeAheadSynchronizer.ReqAdd() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this is to limit reading of incoming blocks from the CAR stream to keep memory usage in check, right ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically, what would happen if we didn't use a |
||
blk, err := cr.Next() | ||
if err == nil { | ||
n++ | ||
ss.cache.Add(blk.Cid(), blk) | ||
} else { | ||
// for now, fall back to blocks on partial | ||
if n > 0 { | ||
return caboose.ErrPartialResponse{} | ||
} | ||
return err | ||
} | ||
if ctx.Err() != nil { | ||
return ctx.Err() | ||
} | ||
} | ||
}) | ||
return err | ||
} | ||
|
||
func (ss *stokerSession) Has(ctx context.Context, it cid.Cid) (bool, error) { | ||
if v, ok := ss.cache.Get(it); ok { | ||
return v != nil, nil | ||
} | ||
|
||
blk, err := ss.getUncached(ctx, it) | ||
return blk != nil, err | ||
} | ||
|
||
func (ss *stokerSession) getUncached(ctx context.Context, it cid.Cid) (blocks.Block, error) { | ||
blk, err := ss.c.Get(ctx, it) | ||
if err != nil { | ||
return nil, err | ||
} | ||
ss.cache.Add(it, blk) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we NOT need to increment the count on the |
||
return blk, nil | ||
} | ||
|
||
func (ss *stokerSession) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) { | ||
if cached, ok := ss.cache.Get(it); ok { | ||
// todo: maybe we should only do this on reads against the cids coming | ||
// from pre-fetching. that said, this will more likely read out the | ||
// fill pre-fetch stream, which is still a reasonable choice. | ||
ss.writeAheadSynchronizer.Dec() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does calling |
||
return cached.(blocks.Block), nil | ||
} | ||
|
||
return ss.getUncached(ctx, it) | ||
} | ||
|
||
// GetSize returns the CIDs mapped BlockSize | ||
func (ss *stokerSession) GetSize(ctx context.Context, it cid.Cid) (int, error) { | ||
if cached, ok := ss.cache.Get(it); ok { | ||
blk := cached.(blocks.Block) | ||
return len(blk.RawData()), nil | ||
} | ||
|
||
blk, err := ss.getUncached(ctx, it) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(blk.RawData()), nil | ||
} | ||
|
||
func (ss *stokerSession) HashOnRead(enabled bool) { | ||
} | ||
|
||
/* Mutable blockstore methods */ | ||
func (ss *stokerSession) Put(context.Context, blocks.Block) error { | ||
return caboose.ErrNotImplemented | ||
} | ||
|
||
func (ss *stokerSession) PutMany(context.Context, []blocks.Block) error { | ||
return caboose.ErrNotImplemented | ||
} | ||
func (ss *stokerSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { | ||
return nil, caboose.ErrNotImplemented | ||
} | ||
func (ss *stokerSession) DeleteBlock(context.Context, cid.Cid) error { | ||
return caboose.ErrNotImplemented | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can go below zero because the same cached block can get read multiple times by Bifrost, right ? Do we want to decrement the count in this case where the same cached block gets read multiple times ?