Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap: move providing -> Exchange-layer, providerQueryManager -> routing #641

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,52 @@ The following emojis are used to highlight certain changes:

## [Unreleased]

- `bitswap`, `routing`, `exchange` ([#641](https://github.com/ipfs/boxo/pull/641)):
- ✨ Bitswap is no longer in charge of providing blocks to the newtork: providing functionality is now handled by a `exchange/providing.Exchange`, meant to be used with `provider.System` so that all provides follow the same rules (multiple parts of the code where handling provides) before.
- 🛠 `bitswap/client/internal/providerquerymanager` has been moved to `routing/providerquerymanager` where it belongs. In order to keep compatibility, Bitswap now receives a `routing.ContentDiscovery` parameter which implements `FindProvidersAsync(...)` and uses it to create a `providerquerymanager` with the default settings as before. Custom settings can be used by using a custom `providerquerymanager` to manually wrap a `ContentDiscovery` object and pass that in as `ContentDiscovery` on initialization while setting `bitswap.WithDefaultProviderQueryManager(false)` (to avoid re-wrapping it again).
- The renovated `providedQueryManager` will trigger lookups until it manages to connect to `MaxProviders`. Before it would lookup at most `MaxInProcessRequests*MaxProviders` and connection failures may have limited the actual number of providers found.
- 🛠 We have aligned our routing-related interfaces with the libp2p [`routing`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/routing#ContentRouting) ones, including in the `reprovider.System`.
- In order to obtain exactly the same behaviour as before (i.e. particularly ensuring that new blocks are still provided), what was done like:

```go
bswapnet := network.NewFromIpfsHost(host, contentRouter)
bswap := bitswap.New(p.ctx, bswapnet, blockstore)
bserv = blockservice.New(blockstore, bswap)
```
- becomes:

```go
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter, usually the "contentRouter"
// which does both discovery and providing.
bswap := bitswap.New(p.ctx, bswapnet, discovery, blockstore)
// A provider system that handles concurrent provides etc. "contentProvider"
// is usually the "contentRouter" which does both discovery and providing.
// "contentProvider" could be used directly without wrapping, but it is recommended
// to do so to provide more efficiently.
provider := provider.New(datastore, provider.Online(contentProvider)
// A wrapped providing exchange using the previous exchange and the provider.
exch := providing.New(bswap, provider)

// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
...
```

- The above is only necessary if content routing is needed. Otherwise:

```
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter set to nil (disable content discovery)
bswap := bitswap.New(p.ctx, bswapnet, nil, blockstore)
// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
```



### Added

- `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724)
Expand Down Expand Up @@ -117,6 +163,7 @@ The following emojis are used to highlight certain changes:
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636)
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651)
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629)
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to fix this before merge


## [v0.21.0]

Expand Down
43 changes: 22 additions & 21 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-test/random"
protocol "github.com/libp2p/go-libp2p/core/protocol"

"github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipfs/go-test/random"
protocol "github.com/libp2p/go-libp2p/core/protocol"
)

type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
Expand Down Expand Up @@ -135,24 +134,25 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond)
bstoreLatency := time.Duration(0)
router := mockrouting.NewServer()

for _, bch := range mixedBenches {
b.Run(bch.name, func(b *testing.B) {
fetcherCount := bch.fetcherCount
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
net := tn.VirtualNetwork(fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne}
oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)}
oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)}
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts)
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, oldNetOpts, oldBsOpts)

// Regular new Bitswap node
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil)
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
var instances []testinstance.Instance

// Create new nodes (fetchers + seeds)
Expand Down Expand Up @@ -294,9 +294,10 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
numblks := 1000

for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
Expand All @@ -312,9 +313,9 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
net := tn.VirtualNetwork(d)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)

instances := ig.Instances(numnodes)
rootBlock := random.BlocksOfSize(1, rootBlockSize)
Expand All @@ -327,9 +328,9 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
Expand Down Expand Up @@ -437,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil {
b.Fatal(err)
}
}
Expand All @@ -452,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil {
b.Fatal(err)
}
}
Expand All @@ -473,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
even := i%2 == 0
third := i%3 == 0
if third || even {
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
if err := bill.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
if third || !even {
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
if err := jeff.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
Expand All @@ -490,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk)
if err != nil {
b.Fatal(err)
}
Expand Down
16 changes: 4 additions & 12 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand All @@ -58,7 +56,7 @@ type Bitswap struct {
net network.BitSwapNetwork
}

func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Bitswap {
bs := &Bitswap{
net: net,
}
Expand All @@ -85,14 +83,10 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once

return bs
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
Loading