Skip to content

Commit

Permalink
blockservice: add WithProvider option
Browse files Browse the repository at this point in the history
This allows to recreate the behavior of advertising added blocks the bitswap server used to do.
  • Loading branch information
Jorropo committed Jan 11, 2024
1 parent 76d9292 commit 507356b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

* 🛠 `pinning/pinner`: you can now give a custom name when pinning a CID. To reflect this, the `Pinner` has been adjusted.
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.

### Changed

### Removed
Expand Down
69 changes: 63 additions & 6 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/provider"
"github.com/ipfs/boxo/verifcid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -71,10 +72,19 @@ type BoundedBlockService interface {
Allowlist() verifcid.Allowlist
}

// ProvidingBlockService is a [Blockservice] which announces new cids to a [provider.Provider].
type ProvidingBlockService interface {
BlockService

// Provider can return [nil] if there is no provider used.
Provider() provider.Provider
}

type blockService struct {
allowlist verifcid.Allowlist
blockstore blockstore.Blockstore
exchange exchange.Interface
provider provider.Provider
// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
Expand All @@ -97,6 +107,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option {
}
}

// WithProvider allows to advertise anything that is added through the blockservice.
func WithProvider(prov provider.Provider) Option {
return func(bs *blockService) {
bs.provider = prov
}
}

// New creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService {
if exchange == nil {
Expand Down Expand Up @@ -139,6 +156,10 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
return s.allowlist
}

func (s *blockService) Provider() provider.Provider {
return s.provider
}

// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
Expand All @@ -149,6 +170,12 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
if bbs, ok := bs.(BoundedBlockService); ok {
allowlist = bbs.Allowlist()
}

var prov provider.Provider
if bprov, ok := bs.(ProvidingBlockService); ok {
prov = bprov.Provider()
}

exch := bs.Exchange()
if sessEx, ok := exch.(exchange.SessionExchange); ok {
return &Session{
Expand All @@ -158,6 +185,7 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
sessEx: sessEx,
bs: bs.Blockstore(),
notifier: exch,
provider: prov,
}
}
return &Session{
Expand All @@ -166,6 +194,7 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
sessCtx: ctx,
bs: bs.Blockstore(),
notifier: exch,
provider: prov,
}
}

Expand Down Expand Up @@ -196,6 +225,11 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
if s.provider != nil {
if err := s.provider.Provide(o.Cid()); err != nil {
logger.Errorf("Provide: %s", err.Error())
}
}

return nil
}
Expand Down Expand Up @@ -242,6 +276,14 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
if s.provider != nil {
for _, o := range toput {
if err := s.provider.Provide(o.Cid()); err != nil {
logger.Errorf("Provide: %s", err.Error())
}
}
}

return nil
}

Expand All @@ -256,14 +298,14 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, s.allowlist, f)
return getBlock(ctx, c, s.blockstore, s.allowlist, f, s.provider)
}

func (s *blockService) getExchange() notifiableFetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher, prov provider.Provider) (blocks.Block, error) {
err := verifcid.ValidateCid(allowlist, c) // hash security
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,6 +335,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlis
if err != nil {
return nil, err
}
if prov != nil {
err = prov.Provide(blk.Cid())
if err != nil {
return nil, err
}
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand All @@ -313,10 +361,10 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, s.allowlist, f)
return getBlocks(ctx, ks, s.blockstore, s.allowlist, f, s.provider)
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher, prov provider.Provider) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -398,6 +446,14 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
}
cache[0] = nil // early gc

if prov != nil {
err = prov.Provide(b.Cid())
if err != nil {
logger.Errorf("could not tell the provider about new blocks: %s", err)
return
}
}

select {
case out <- b:
case <-ctx.Done():
Expand Down Expand Up @@ -440,6 +496,7 @@ type Session struct {
sessEx exchange.SessionExchange
sessCtx context.Context
notifier notifier
provider provider.Provider
lk sync.Mutex
}

Expand Down Expand Up @@ -483,15 +540,15 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error)
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory())
return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory(), s.provider)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory())
return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory(), s.provider)
}

var _ BlockGetter = (*Session)(nil)
28 changes: 28 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,31 @@ func TestAllowlist(t *testing.T) {
check(blockservice.GetBlock)
check(NewSession(ctx, blockservice).GetBlock)
}

type mockProvider []cid.Cid

func (p *mockProvider) Provide(c cid.Cid) error {
*p = append(*p, c)
return nil
}

func TestProviding(t *testing.T) {
t.Parallel()
a := assert.New(t)

bgen := butil.NewBlockGenerator()
blocks := bgen.Blocks(3)

prov := mockProvider{}
blockservice := New(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), nil, WithProvider(&prov))
var added []cid.Cid

a.NoError(blockservice.AddBlock(context.Background(), blocks[0]))
added = append(added, blocks[0].Cid())

a.NoError(blockservice.AddBlocks(context.Background(), blocks[1:]))
added = append(added, blocks[1].Cid())
added = append(added, blocks[2].Cid())

a.ElementsMatch(added, []cid.Cid(prov))
}
1 change: 1 addition & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-cidutil v0.1.0 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ github.com/ipfs/go-blockservice v0.5.0 h1:B2mwhhhVQl2ntW2EIpaWPwSCxSuqr5fFA93Ms4
github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q=
github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA=
github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk=
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
Expand Down

0 comments on commit 507356b

Please sign in to comment.