diff --git a/p2p/node/api.go b/p2p/node/api.go index 9ae75fed5d..5e2fba16e7 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -57,7 +57,12 @@ func (p *P2PNode) Start() error { } func (p *P2PNode) Subscribe(location common.Location, datatype interface{}) error { - return p.pubsub.Subscribe(location, datatype) + err := p.pubsub.Subscribe(location, datatype) + if err != nil { + return err + } + + return p.peerManager.Provide(p.ctx, location, datatype) } func (p *P2PNode) Broadcast(location common.Location, data interface{}) error { diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index 16da9ddf58..2e5ce8e208 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -71,6 +71,9 @@ type PeerManager interface { // Sets the DHT provided from the Host interface SetDHT(*dual.DHT) + // Announces to the DHT that we are providing this data + Provide(context.Context, common.Location, interface{}) error + // Manages stream lifecycles streamManager.StreamManager @@ -253,6 +256,10 @@ func (pm *BasicPeerManager) CloseStream(peerID p2p.PeerID) error { return pm.streamManager.CloseStream(peerID) } +func (pm *BasicPeerManager) Provide(ctx context.Context, location common.Location, data interface{}) error { + return pm.dht.Provide(ctx, pubsubManager.LocationToCid(location), true) +} + func (pm *BasicPeerManager) SetP2PBackend(p2pBackend quaiprotocol.QuaiP2PNode) { pm.streamManager.SetP2PBackend(p2pBackend) }