From 72d21f763687010c0d8da0d522ee489ea015c701 Mon Sep 17 00:00:00 2001
From: Will Scott <will.scott@protocol.ai>
Date: Mon, 6 Mar 2023 12:34:03 +0100
Subject: [PATCH] add a basic request cache stoker

---
 stoker/loosesync.go |  73 ++++++++++++++++++++++
 stoker/stoker.go    | 145 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 218 insertions(+)
 create mode 100644 stoker/loosesync.go
 create mode 100644 stoker/stoker.go

diff --git a/stoker/loosesync.go b/stoker/loosesync.go
new file mode 100644
index 0000000..ce51263
--- /dev/null
+++ b/stoker/loosesync.go
@@ -0,0 +1,73 @@
+package stoker
+
+import (
+	"sync/atomic"
+	"time"
+)
+
+type writeAheadSynchronizer struct {
+	max     int
+	min     int
+	current atomic.Int32
+	notify  chan struct{}
+}
+
+func NewWAS(max, min int) *writeAheadSynchronizer {
+	return &writeAheadSynchronizer{
+		max:     max,
+		min:     min,
+		current: atomic.Int32{},
+		notify:  make(chan struct{}, 1),
+	}
+}
+
+// requst to add 1 to the queue, potentially blocking until that's cokay
+func (w *writeAheadSynchronizer) ReqAdd() {
+	n := w.current.Add(1)
+	if n >= int32(w.max) {
+		w.wait()
+	}
+}
+
+func (w *writeAheadSynchronizer) wait() {
+	done := false
+	t := time.NewTimer(15 * time.Millisecond)
+	for {
+		select {
+		case <-t.C:
+		case <-w.notify:
+		}
+
+		curr := w.current.Load()
+		if curr < int32(w.min) {
+			done = true
+		}
+
+		if !t.Stop() {
+			// Exhaust expired timer's chan.
+			select {
+			case <-t.C:
+			default:
+			}
+		}
+		if done {
+			return
+		}
+		t.Reset(15 * time.Millisecond)
+	}
+}
+
+// decrement 1 from the queue, potentially unblocking writers
+func (w *writeAheadSynchronizer) Dec() {
+	curr := w.current.Add(-1)
+	if curr < 0 {
+		// todo: okay that not atomic? i think we're okay leaking in this direction
+		w.current.Store(0)
+	}
+	if curr <= int32(w.min) {
+		select {
+		case w.notify <- struct{}{}:
+		default:
+		}
+	}
+}
diff --git a/stoker/stoker.go b/stoker/stoker.go
new file mode 100644
index 0000000..fc53530
--- /dev/null
+++ b/stoker/stoker.go
@@ -0,0 +1,145 @@
+// Stoker loads a dag from a remote host into an in-memory blockstore with the
+// intention that it will be processed in a traversal-order compatible way such
+// that data can be efficiently streamed and not fully buffered in memory.
+package stoker
+
+import (
+	"context"
+	"io"
+
+	"github.com/filecoin-saturn/caboose"
+	"github.com/ipld/go-car"
+
+	lru "github.com/hashicorp/golang-lru"
+	"github.com/ipfs/go-cid"
+	ipfsblockstore "github.com/ipfs/go-ipfs-blockstore"
+	"github.com/ipfs/go-libipfs/blocks"
+	golog "github.com/ipfs/go-log/v2"
+)
+
+var goLogger = golog.Logger("caboose/stoker")
+
+type Stoker struct {
+	*caboose.Caboose
+}
+
+func New(c *caboose.Caboose) *Stoker {
+	return &Stoker{c}
+}
+
+const SessionCacheSize = 1024
+
+func (s *Stoker) NewSession(ctx context.Context, path string) ipfsblockstore.Blockstore {
+	cache, err := lru.NewARC(SessionCacheSize)
+	if err != nil {
+		goLogger.Warnw("failed to allocate cache for session", "err", err)
+		return nil
+	}
+
+	ss := stokerSession{
+		c:                      s.Caboose,
+		cache:                  cache,
+		writeAheadSynchronizer: NewWAS(SessionCacheSize*3/4, SessionCacheSize*1/4),
+	}
+	go ss.fillCache(ctx, path)
+
+	return &ss
+}
+
+type stokerSession struct {
+	c     *caboose.Caboose
+	cache *lru.ARCCache
+	*writeAheadSynchronizer
+}
+
+func (ss *stokerSession) fillCache(ctx context.Context, path string) error {
+	err := ss.c.Fetch(ctx, path, func(resource string, reader io.Reader) error {
+		cr, err := car.NewCarReader(reader)
+		if err != nil {
+			return err
+		}
+
+		n := 0
+		for {
+			ss.writeAheadSynchronizer.ReqAdd()
+			blk, err := cr.Next()
+			if err == nil {
+				n++
+				ss.cache.Add(blk.Cid(), blk)
+			} else {
+				// for now, fall back to blocks on partial
+				if n > 0 {
+					return caboose.ErrPartialResponse{}
+				}
+				return err
+			}
+			if ctx.Err() != nil {
+				return ctx.Err()
+			}
+		}
+	})
+	return err
+}
+
+func (ss *stokerSession) Has(ctx context.Context, it cid.Cid) (bool, error) {
+	if v, ok := ss.cache.Get(it); ok {
+		return v != nil, nil
+	}
+
+	blk, err := ss.getUncached(ctx, it)
+	return blk != nil, err
+}
+
+func (ss *stokerSession) getUncached(ctx context.Context, it cid.Cid) (blocks.Block, error) {
+	blk, err := ss.c.Get(ctx, it)
+	if err != nil {
+		return nil, err
+	}
+	ss.cache.Add(it, blk)
+	return blk, nil
+}
+
+func (ss *stokerSession) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
+	if cached, ok := ss.cache.Get(it); ok {
+		// todo: maybe we should only do this on reads against the cids coming
+		// from pre-fetching. that said, this will more likely read out the
+		// fill pre-fetch stream, which is still a reasonable choice.
+		ss.writeAheadSynchronizer.Dec()
+
+		return cached.(blocks.Block), nil
+	}
+
+	return ss.getUncached(ctx, it)
+}
+
+// GetSize returns the CIDs mapped BlockSize
+func (ss *stokerSession) GetSize(ctx context.Context, it cid.Cid) (int, error) {
+	if cached, ok := ss.cache.Get(it); ok {
+		blk := cached.(blocks.Block)
+		return len(blk.RawData()), nil
+	}
+
+	blk, err := ss.getUncached(ctx, it)
+	if err != nil {
+		return 0, err
+	}
+	return len(blk.RawData()), nil
+}
+
+func (ss *stokerSession) HashOnRead(enabled bool) {
+}
+
+/* Mutable blockstore methods */
+func (ss *stokerSession) Put(context.Context, blocks.Block) error {
+	return caboose.ErrNotImplemented
+}
+
+func (ss *stokerSession) PutMany(context.Context, []blocks.Block) error {
+	return caboose.ErrNotImplemented
+}
+func (ss *stokerSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
+	return nil, caboose.ErrNotImplemented
+}
+func (ss *stokerSession) DeleteBlock(context.Context, cid.Cid) error {
+	return caboose.ErrNotImplemented
+}