From 8287b6d45b34f9d7bb13dc6cc07c86eeae271883 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Fri, 6 Oct 2023 11:19:07 -0600 Subject: [PATCH] add blockservice and exchange --- cmd/siapfsd/ipfs.go | 5 +++ ipfs/blockstore/dag.go | 85 +++++++++++++++++++++++++++++++++++++ ipfs/blockstore/exchange.go | 33 ++++++++++++++ 3 files changed, 123 insertions(+) create mode 100644 ipfs/blockstore/dag.go create mode 100644 ipfs/blockstore/exchange.go diff --git a/cmd/siapfsd/ipfs.go b/cmd/siapfsd/ipfs.go index 9a506f3..60ee153 100644 --- a/cmd/siapfsd/ipfs.go +++ b/cmd/siapfsd/ipfs.go @@ -8,6 +8,7 @@ import ( iface "github.com/ipfs/boxo/coreiface" "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core" "github.com/ipfs/kubo/core/coreapi" @@ -70,7 +71,11 @@ func createNode(ctx context.Context, repoPath string, db *badger.Store, renterd return nil, nil, fmt.Errorf("failed to create node: %w", err) } + bs := blockstore.New(bucket, db, cfg.Renterd) + bserv := blockstore.NewBlockstoreService(bs) node.Blockstore = blockstore.New(bucket, db, cfg.Renterd) + node.DAG = merkledag.NewDAGService(bserv) + coreAPI, err := coreapi.NewCoreAPI(node) if err != nil { return nil, nil, fmt.Errorf("failed to create coreapi: %w", err) diff --git a/ipfs/blockstore/dag.go b/ipfs/blockstore/dag.go new file mode 100644 index 0000000..1fcc081 --- /dev/null +++ b/ipfs/blockstore/dag.go @@ -0,0 +1,85 @@ +package blockstore + +import ( + "context" + "errors" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +type BlockService struct { + store blockstore.Blockstore +} + +func (bs *BlockService) Close() error { + return nil +} + +// GetBlock gets the requested block. +func (bs *BlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return bs.store.Get(ctx, c) +} + +// GetBlocks does a batch request for the given cids, returning blocks as +// they are found, in no particular order. +// +// It may not be able to find all requested blocks (or the context may +// be canceled). In that case, it will close the channel early. It is up +// to the consumer to detect this situation and keep track which blocks +// it has received and which it hasn't. +func (bs *BlockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + out := make(chan blocks.Block) + go func() { + defer close(out) + for _, k := range ks { + b, err := bs.store.Get(ctx, k) + if err != nil { + continue + } + select { + case out <- b: + case <-ctx.Done(): + return + } + } + }() + return out +} + +// Blockstore returns a reference to the underlying blockstore +func (bs *BlockService) Blockstore() blockstore.Blockstore { + return bs.store +} + +// Exchange returns a reference to the underlying exchange (usually bitswap) +func (bs *BlockService) Exchange() exchange.Interface { + return &Exchange{ + bserv: bs, + } +} + +// AddBlock puts a given block to the underlying datastore +func (bs *BlockService) AddBlock(ctx context.Context, o blocks.Block) error { + return errors.New("cannot put blocks") +} + +// AddBlocks adds a slice of blocks at the same time using batching +// capabilities of the underlying datastore whenever possible. +func (bs *BlockService) AddBlocks(ctx context.Context, b []blocks.Block) error { + return errors.New("cannot put blocks") +} + +// DeleteBlock deletes the given block from the blockservice. +func (bs *BlockService) DeleteBlock(ctx context.Context, o cid.Cid) error { + return errors.New("cannot delete blocks") +} + +// NewBlockstoreService returns a new BlockService backed by the given blockstore. +func NewBlockstoreService(bs blockstore.Blockstore) *BlockService { + return &BlockService{ + store: bs, + } +} diff --git a/ipfs/blockstore/exchange.go b/ipfs/blockstore/exchange.go new file mode 100644 index 0000000..dab502d --- /dev/null +++ b/ipfs/blockstore/exchange.go @@ -0,0 +1,33 @@ +package blockstore + +import ( + "context" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +type Exchange struct { + bserv *BlockService +} + +func (e *Exchange) Close() error { + return nil +} + +// GetBlock returns the block associated with a given cid. +func (e *Exchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return e.bserv.GetBlock(ctx, c) +} + +// GetBlocks returns the blocks associated with the given cids. +// If the requested blocks are not found immediately, this function should hang until +// they are found. If they can't be found later, it's also acceptable to terminate. +func (e *Exchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + return e.bserv.GetBlocks(ctx, cids), nil +} + +// NotifyNewBlocks tells the exchange that new blocks are available and can be served. +func (e *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + return nil // TODO +}