diff --git a/packages/kad-dht/package.json b/packages/kad-dht/package.json index fb7053b9ab..e54b2cf7ab 100644 --- a/packages/kad-dht/package.json +++ b/packages/kad-dht/package.json @@ -67,6 +67,7 @@ "@multiformats/multiaddr": "^12.2.3", "any-signal": "^4.1.1", "interface-datastore": "^8.3.0", + "it-all": "^3.0.6", "it-drain": "^3.0.7", "it-length": "^3.0.6", "it-length-prefixed": "^9.0.4", @@ -99,7 +100,6 @@ "datastore-core": "^10.0.0", "delay": "^6.0.0", "execa": "^9.1.0", - "it-all": "^3.0.6", "it-filter": "^3.1.0", "it-last": "^3.0.6", "it-pair": "^2.0.6", diff --git a/packages/kad-dht/src/content-fetching/index.ts b/packages/kad-dht/src/content-fetching/index.ts index adba1ef749..4e9d5cd0a7 100644 --- a/packages/kad-dht/src/content-fetching/index.ts +++ b/packages/kad-dht/src/content-fetching/index.ts @@ -31,6 +31,7 @@ export interface ContentFetchingInit { queryManager: QueryManager network: Network logPrefix: string + datastorePrefix: string } export class ContentFetching { @@ -48,7 +49,7 @@ export class ContentFetching { this.components = components this.log = components.logger.forComponent(`${logPrefix}:content-fetching`) - this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record` + this.datastorePrefix = `${init.datastorePrefix}/record` this.validators = validators this.selectors = selectors this.peerRouting = peerRouting diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 51a691e8a8..a024d3cad2 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -240,7 +240,8 @@ export class KadDHT extends TypedEventEmitter implements Ka peerRouting: this.peerRouting, queryManager: this.queryManager, network: this.network, - logPrefix + logPrefix, + datastorePrefix }) this.contentRouting = new KADDHTContentRouting(components, { network: this.network, @@ -262,6 +263,7 @@ export class KadDHT extends TypedEventEmitter implements Ka validators: this.validators, logPrefix, metricsPrefix, + datastorePrefix, peerInfoMapper: this.peerInfoMapper }) this.topologyListener = new TopologyListener(components, { @@ -315,7 +317,7 @@ export class KadDHT extends TypedEventEmitter implements Ka await this.onPeerConnect(peerData) }).catch(err => { - this.log.error('could not add %p to routing table', peerId, err) + this.log.error('could not add %p to routing table - %e', peerId, err) }) }) diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index 8c47152069..56ff87918c 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -316,14 +316,10 @@ export class PeerRouting { const requesterXor = uint8ArrayXor(closerThanKadId, keyKadId) for (const peerId of ids) { - if (peerId.equals(closerThan)) { - continue - } - const peerKadId = await convertPeerId(peerId) const peerXor = uint8ArrayXor(peerKadId, keyKadId) - // only include if peer isy closer than requester + // only include if peer is closer than requester if (uint8ArrayXorCompare(peerXor, requesterXor) !== -1) { continue } diff --git a/packages/kad-dht/src/providers.ts b/packages/kad-dht/src/providers.ts index 83890b0ec2..895a12e7a4 100644 --- a/packages/kad-dht/src/providers.ts +++ b/packages/kad-dht/src/providers.ts @@ -29,7 +29,7 @@ export class Providers { constructor (components: ProvidersComponents, init: ProvidersInit) { this.log = components.logger.forComponent(`${init.logPrefix}:providers`) - this.datastorePrefix = `/${init.datastorePrefix}/provider` + this.datastorePrefix = `${init.datastorePrefix}/provider` this.datastore = components.datastore this.lock = init.lock } @@ -70,8 +70,9 @@ export class Providers { const release = await this.lock.readLock() try { - this.log('get providers for %s', cid) + this.log('get providers for %c', cid) const provs = await this.loadProviders(cid) + this.log('got %d providers for %c', provs.size, cid) return [...provs.keys()] } finally { @@ -94,8 +95,9 @@ export class Providers { */ private async loadProviders (cid: CID): Promise> { const providers = new PeerMap() + const key = toProviderKey(this.datastorePrefix, cid) - for await (const entry of this.datastore.query({ prefix: toProviderKey(this.datastorePrefix, cid).toString() })) { + for await (const entry of this.datastore.query({ prefix: key.toString() })) { const { peerId } = parseProviderKey(entry.key) providers.set(peerId, readProviderTime(entry.value)) } diff --git a/packages/kad-dht/src/reprovider.ts b/packages/kad-dht/src/reprovider.ts index d29381977d..625bf03d04 100644 --- a/packages/kad-dht/src/reprovider.ts +++ b/packages/kad-dht/src/reprovider.ts @@ -80,7 +80,7 @@ export class Reprovider extends TypedEventEmitter { }) this.datastore = components.datastore this.addressManager = components.addressManager - this.datastorePrefix = `/${init.datastorePrefix}/provider` + this.datastorePrefix = `${init.datastorePrefix}/provider` this.reprovideThreshold = init.threshold ?? REPROVIDE_THRESHOLD this.maxQueueSize = init.maxQueueSize ?? REPROVIDE_MAX_QUEUE_SIZE this.validity = init.validity ?? PROVIDERS_VALIDITY diff --git a/packages/kad-dht/src/rpc/handlers/add-provider.ts b/packages/kad-dht/src/rpc/handlers/add-provider.ts index d2d4349ca7..da5a16c845 100644 --- a/packages/kad-dht/src/rpc/handlers/add-provider.ts +++ b/packages/kad-dht/src/rpc/handlers/add-provider.ts @@ -6,9 +6,11 @@ import * as Digest from 'multiformats/hashes/digest' import type { Message } from '../../message/dht.js' import type { Providers } from '../../providers' import type { DHTMessageHandler } from '../index.js' -import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface' +import type { ComponentLogger, Logger, PeerId, PeerStore } from '@libp2p/interface' export interface AddProviderComponents { + peerId: PeerId + peerStore: PeerStore logger: ComponentLogger } @@ -18,12 +20,16 @@ export interface AddProviderHandlerInit { } export class AddProviderHandler implements DHTMessageHandler { + private readonly peerId: PeerId private readonly providers: Providers + private readonly peerStore: PeerStore private readonly log: Logger constructor (components: AddProviderComponents, init: AddProviderHandlerInit) { this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:add-provider`) + this.peerId = components.peerId this.providers = init.providers + this.peerStore = components.peerStore } async handle (peerId: PeerId, msg: Message): Promise { @@ -43,12 +49,16 @@ export class AddProviderHandler implements DHTMessageHandler { this.log.error('no providers found in message') } - this.log('%p asked us to store provider record for for %c', peerId, cid) + this.log('%p asked us, %p to store provider record for for %c', peerId, this.peerId, cid) await Promise.all( msg.providers.map(async (pi) => { + const digest = Digest.decode(pi.id) + const providerId = peerIdFromMultihash(digest) + const providerMultiaddrs = pi.multiaddrs.map(buf => multiaddr(buf)) + // Ignore providers not from the originator - if (!peerId.equals(pi.id)) { + if (!peerId.equals(providerId)) { this.log('invalid provider peer %p from %p', pi.id, peerId) return } @@ -58,11 +68,12 @@ export class AddProviderHandler implements DHTMessageHandler { return } - this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, pi.multiaddrs.map((m) => multiaddr(m).toString())) - - const multihash = Digest.decode(pi.id) + this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, providerMultiaddrs) - await this.providers.addProvider(cid, peerIdFromMultihash(multihash)) + await this.providers.addProvider(cid, providerId) + await this.peerStore.merge(providerId, { + multiaddrs: providerMultiaddrs + }) }) ) diff --git a/packages/kad-dht/src/rpc/handlers/get-providers.ts b/packages/kad-dht/src/rpc/handlers/get-providers.ts index 71b7af1d65..438acefce2 100644 --- a/packages/kad-dht/src/rpc/handlers/get-providers.ts +++ b/packages/kad-dht/src/rpc/handlers/get-providers.ts @@ -1,4 +1,6 @@ import { InvalidMessageError } from '@libp2p/interface' +import all from 'it-all' +import map from 'it-map' import { CID } from 'multiformats/cid' import { MessageType } from '../../message/dht.js' import type { PeerInfoMapper } from '../../index.js' @@ -17,11 +19,13 @@ export interface GetProvidersHandlerInit { } export interface GetProvidersHandlerComponents { + peerId: PeerId peerStore: PeerStore logger: ComponentLogger } export class GetProvidersHandler implements DHTMessageHandler { + private readonly peerId: PeerId private readonly peerRouting: PeerRouting private readonly providers: Providers private readonly peerStore: PeerStore @@ -32,6 +36,7 @@ export class GetProvidersHandler implements DHTMessageHandler { const { peerRouting, providers, logPrefix } = init this.log = components.logger.forComponent(`${logPrefix}:rpc:handlers:get-providers`) + this.peerId = components.peerId this.peerStore = components.peerStore this.peerRouting = peerRouting this.providers = providers @@ -52,27 +57,33 @@ export class GetProvidersHandler implements DHTMessageHandler { this.log('%p asking for providers for %s', peerId, cid) - const [peers, closer] = await Promise.all([ - this.providers.getProviders(cid), - this.peerRouting.getCloserPeersOffline(msg.key, peerId) + const [providerPeers, closerPeers] = await Promise.all([ + all(map(await this.providers.getProviders(cid), async (peerId) => { + const peer = await this.peerStore.get(peerId) + const info: PeerInfo = { + id: peer.id, + multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr) + } + + return info + })), + this.peerRouting.getCloserPeersOffline(msg.key, this.peerId) ]) - const providerPeers = await this._getPeers(peers) - const closerPeers = await this._getPeers(closer.map(({ id }) => id)) const response: Message = { type: MessageType.GET_PROVIDERS, key: msg.key, clusterLevel: msg.clusterLevel, closer: closerPeers .map(this.peerInfoMapper) - .filter(({ multiaddrs }) => multiaddrs.length) + .filter(({ id, multiaddrs }) => multiaddrs.length > 0) .map(peerInfo => ({ id: peerInfo.id.toMultihash().bytes, multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes) })), providers: providerPeers .map(this.peerInfoMapper) - .filter(({ multiaddrs }) => multiaddrs.length) + .filter(({ id, multiaddrs }) => multiaddrs.length > 0) .map(peerInfo => ({ id: peerInfo.id.toMultihash().bytes, multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes) @@ -87,29 +98,4 @@ export class GetProvidersHandler implements DHTMessageHandler { async _getAddresses (peerId: PeerId): Promise { return [] } - - async _getPeers (peerIds: PeerId[]): Promise { - const output: PeerInfo[] = [] - - for (const peerId of peerIds) { - try { - const peer = await this.peerStore.get(peerId) - - const peerAfterFilter = this.peerInfoMapper({ - id: peerId, - multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr) - }) - - if (peerAfterFilter.multiaddrs.length > 0) { - output.push(peerAfterFilter) - } - } catch (err: any) { - if (err.name !== 'NotFoundError') { - throw err - } - } - } - - return output - } } diff --git a/packages/kad-dht/src/rpc/handlers/get-value.ts b/packages/kad-dht/src/rpc/handlers/get-value.ts index 9ce13fddbb..b0c64826b1 100644 --- a/packages/kad-dht/src/rpc/handlers/get-value.ts +++ b/packages/kad-dht/src/rpc/handlers/get-value.ts @@ -15,6 +15,7 @@ import type { Datastore } from 'interface-datastore' export interface GetValueHandlerInit { peerRouting: PeerRouting logPrefix: string + datastorePrefix: string } export interface GetValueHandlerComponents { @@ -32,7 +33,7 @@ export class GetValueHandler implements DHTMessageHandler { constructor (components: GetValueHandlerComponents, init: GetValueHandlerInit) { this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:get-value`) - this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record` + this.datastorePrefix = `${init.datastorePrefix}/record` this.peerStore = components.peerStore this.datastore = components.datastore this.peerRouting = init.peerRouting diff --git a/packages/kad-dht/src/rpc/handlers/put-value.ts b/packages/kad-dht/src/rpc/handlers/put-value.ts index 1747aec23f..156e65da49 100644 --- a/packages/kad-dht/src/rpc/handlers/put-value.ts +++ b/packages/kad-dht/src/rpc/handlers/put-value.ts @@ -11,6 +11,7 @@ import type { Datastore } from 'interface-datastore' export interface PutValueHandlerInit { validators: Validators logPrefix: string + datastorePrefix: string } export interface PutValueHandlerComponents { @@ -29,7 +30,7 @@ export class PutValueHandler implements DHTMessageHandler { this.components = components this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:put-value`) - this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record` + this.datastorePrefix = `${init.datastorePrefix}/record` this.validators = validators } diff --git a/packages/kad-dht/src/rpc/index.ts b/packages/kad-dht/src/rpc/index.ts index 36551c6bfa..bfd88659bf 100644 --- a/packages/kad-dht/src/rpc/index.ts +++ b/packages/kad-dht/src/rpc/index.ts @@ -25,6 +25,7 @@ export interface RPCInit { validators: Validators logPrefix: string metricsPrefix: string + datastorePrefix: string peerInfoMapper: PeerInfoMapper } diff --git a/packages/kad-dht/test/content-routing.spec.ts b/packages/kad-dht/test/content-routing.spec.ts new file mode 100644 index 0000000000..a59706387f --- /dev/null +++ b/packages/kad-dht/test/content-routing.spec.ts @@ -0,0 +1,256 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ + +import { expect } from 'aegir/chai' +import delay from 'delay' +import all from 'it-all' +import drain from 'it-drain' +import sinon from 'sinon' +import { MessageType } from '../src/index.js' +import * as kadUtils from '../src/utils.js' +import { createValues } from './utils/create-values.js' +import { sortDHTs } from './utils/sort-closest-peers.js' +import { TestDHT } from './utils/test-dht.js' +import type { PeerId } from '@libp2p/interface' +import type { CID } from 'multiformats/cid' + +describe('content routing', () => { + let cid: CID + let tdht: TestDHT + + beforeEach(() => { + tdht = new TestDHT() + }) + + afterEach(async () => { + await tdht.teardown() + }) + + before(async function () { + this.timeout(10 * 1000) + + cid = (await createValues(1))[0].cid + }) + + afterEach(() => { + sinon.restore() + }) + + it('provides', async function () { + this.timeout(20 * 1000) + + const dhts = await sortDHTs(await Promise.all([ + tdht.spawn(), + tdht.spawn(), + tdht.spawn(), + tdht.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + + sinon.spy(dhts[3].network, 'sendMessage') + + // connect peers + await Promise.all([ + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) + ]) + + // run provide operation + await drain(dhts[3].provide(cid)) + + // check network messages + // @ts-expect-error fn is a spy + const calls = dhts[3].network.sendMessage.getCalls().map(c => c.args) + + const peersSentMessage = new Set() + + for (const [peerId, msg] of calls) { + peersSentMessage.add(peerId.toString()) + + expect(msg.type).equals(MessageType.ADD_PROVIDER) + expect(msg.key).equalBytes(cid.multihash.bytes) + expect(msg.providers.length).equals(1) + expect(msg.providers[0].id).to.equalBytes(dhts[3].components.peerId.toMultihash().bytes) + } + + // expect an ADD_PROVIDER message to be sent to each peer for each value + expect([...peersSentMessage].sort()).to.deep.equal([ + dhts[0].components.peerId.toString(), + dhts[1].components.peerId.toString(), + dhts[2].components.peerId.toString() + ].sort(), 'did not send ADD_PROVIDER to network peers') + + // Expect each DHT to find the provider of each value + for (const d of dhts) { + const events = await all(d.findProviders(cid)) + const provs = Object.values(events.reduce>((acc, curr) => { + if (curr.name === 'PEER_RESPONSE') { + curr.providers.forEach(peer => { + acc[peer.id.toString()] = peer.id + }) + } + + return acc + }, {})) + + expect(provs).to.have.length(1) + expect(provs[0].toString()).to.equal(dhts[3].components.peerId.toString()) + } + }) + + it('provides if in server mode', async function () { + const dhts = await sortDHTs(await Promise.all([ + tdht.spawn(), + tdht.spawn(), + tdht.spawn(), + tdht.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + + // connect peers + await Promise.all([ + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) + ]) + + const sendMessageSpy = sinon.spy(dhts[0].network, 'sendMessage') + + await dhts[0].setMode('server') + + await drain(dhts[0].provide(cid)) + + expect(sendMessageSpy.called).to.be.true() + }) + + it('find providers', async function () { + this.timeout(20 * 1000) + + const dhts = await sortDHTs(await Promise.all([ + tdht.spawn(), + tdht.spawn(), + tdht.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + + // Connect + await Promise.all([ + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]) + ]) + + await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(cid)) })) + + const events = await all(dhts[0].findProviders(cid)) + + // find providers find all the 3 providers + const provs = Object.values(events.reduce>((acc, curr) => { + if (curr.name === 'PEER_RESPONSE') { + curr.providers.forEach(peer => { + acc[peer.id.toString()] = peer.id + }) + } + + return acc + }, {})) + expect(provs).to.have.length(3) + }) + + it('find providers from client', async function () { + this.timeout(20 * 1000) + + const dhts = await sortDHTs(await Promise.all([ + tdht.spawn(), + tdht.spawn(), + tdht.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + const clientDHT = await tdht.spawn({ clientMode: true }) + + // Connect + await Promise.all([ + tdht.connect(clientDHT, dhts[0]), + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]) + ]) + + await drain(dhts[2].provide(cid)) + + // wait for messages to be handled + await delay(1000) + + const events = await all(clientDHT.findProviders(cid)) + + // find providers find all the 3 providers + const provs = Object.values(events.reduce>((acc, curr) => { + if (curr.name === 'PEER_RESPONSE') { + curr.providers.forEach(peer => { + acc[peer.id.toString()] = peer.id + }) + } + + return acc + }, {})) + expect(provs).to.have.length(1) + }) + + it('find provider published by client', async function () { + this.timeout(20 * 1000) + + const dhts = await sortDHTs(await Promise.all([ + tdht.spawn(), + tdht.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + const clientDHT = await tdht.spawn({ clientMode: true }) + + // Connect + await Promise.all([ + tdht.connect(clientDHT, dhts[0]), + tdht.connect(dhts[0], dhts[1]) + ]) + + await drain(clientDHT.provide(cid)) + + await delay(1e3) + + const events = await all(dhts[1].findProviders(cid)) + + // find providers find the client provider + const provs = Object.values(events.reduce>((acc, curr) => { + if (curr.name === 'PEER_RESPONSE') { + curr.providers.forEach(peer => { + acc[peer.id.toString()] = peer.id + }) + } + + return acc + }, {})) + expect(provs).to.have.length(1) + }) + + it('find one provider locally', async function () { + this.timeout(20 * 1000) + + const dht = await tdht.spawn() + + sinon.stub(dht.components.peerStore, 'get').withArgs(dht.components.peerId) + .resolves({ + id: dht.components.peerId, + addresses: [], + protocols: [], + tags: new Map(), + metadata: new Map() + }) + sinon.stub(dht.providers, 'getProviders').resolves([dht.components.peerId]) + + // Find provider + const events = await all(dht.findProviders(cid)) + const provs = Object.values(events.reduce>((acc, curr) => { + if (curr.name === 'PEER_RESPONSE') { + curr.providers.forEach(peer => { + acc[peer.id.toString()] = peer.id + }) + } + + return acc + }, {})) + expect(provs).to.have.length(1) + }) +}) diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index ed4db55f6d..1795fcb721 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -1,34 +1,22 @@ /* eslint-env mocha */ /* eslint max-nested-callbacks: ["error", 8] */ -import { peerIdFromMultihash } from '@libp2p/peer-id' import { Libp2pRecord } from '@libp2p/record' import { expect } from 'aegir/chai' -import delay from 'delay' import all from 'it-all' import drain from 'it-drain' import filter from 'it-filter' import last from 'it-last' -import map from 'it-map' -import { pipe } from 'it-pipe' -import * as Digest from 'multiformats/hashes/digest' import sinon from 'sinon' -import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import * as c from '../src/constants.js' -import { EventTypes, MessageType } from '../src/index.js' +import { MessageType } from '../src/index.js' import { peerResponseEvent } from '../src/query/events.js' import * as kadUtils from '../src/utils.js' import { createPeerIdsWithPrivateKey } from './utils/create-peer-id.js' -import { createValues } from './utils/create-values.js' -import { countDiffPeers } from './utils/index.js' -import { sortClosestPeers, sortDHTs } from './utils/sort-closest-peers.js' +import { sortDHTs } from './utils/sort-closest-peers.js' import { TestDHT } from './utils/test-dht.js' import type { PeerIdWithPrivateKey } from './utils/create-peer-id.js' import type { FinalPeerEvent, QueryEvent, ValueEvent } from '../src/index.js' -import type { KadDHT } from '../src/kad-dht.js' -import type { PeerId } from '@libp2p/interface' -import type { CID } from 'multiformats/cid' async function findEvent (events: AsyncIterable, name: 'FINAL_PEER'): Promise async function findEvent (events: AsyncIterable, name: 'VALUE'): Promise @@ -51,7 +39,6 @@ async function findEvent (events: AsyncIterable, name: string): Prom describe('KadDHT', () => { let peerIds: PeerIdWithPrivateKey[] - let values: Array<{ cid: CID, value: Uint8Array }> let tdht: TestDHT beforeEach(() => { @@ -64,14 +51,7 @@ describe('KadDHT', () => { before(async function () { this.timeout(10 * 1000) - - const res = await Promise.all([ - createPeerIdsWithPrivateKey(3), - createValues(20) - ]) - - peerIds = res[0] - values = res[1] + peerIds = await createPeerIdsWithPrivateKey(3) }) afterEach(() => { @@ -414,227 +394,6 @@ describe('KadDHT', () => { }) }) - describe('content routing', () => { - it('provides', async function () { - this.timeout(20 * 1000) - - const dhts = await Promise.all([ - tdht.spawn(), - tdht.spawn(), - tdht.spawn(), - tdht.spawn() - ]) - - const ids = dhts.map((d) => d.components.peerId) - const idsB58 = ids.map(id => id.toString()) - sinon.spy(dhts[3].network, 'sendMessage') - - // connect peers - await Promise.all([ - tdht.connect(dhts[0], dhts[1]), - tdht.connect(dhts[1], dhts[2]), - tdht.connect(dhts[2], dhts[3]) - ]) - - // provide values - await Promise.all(values.map(async (value) => { await drain(dhts[3].provide(value.cid)) })) - - // Expect an ADD_PROVIDER message to be sent to each peer for each value - const fn = dhts[3].network.sendMessage - const valuesBuffs = values.map(v => v.cid.multihash.bytes) - // @ts-expect-error fn is a spy - const calls = fn.getCalls().map(c => c.args) - - for (const [peerId, msg] of calls) { - expect(idsB58).includes(peerId.toString()) - expect(msg.type).equals(MessageType.ADD_PROVIDER) - expect(valuesBuffs).includes(msg.key) - expect(msg.providers.length).equals(1) - expect(peerIdFromMultihash(Digest.decode(msg.providers[0].id)).toString()).equals(idsB58[3]) - } - - // Expect each DHT to find the provider of each value - let n = 0 - for (const v of values) { - n = (n + 1) % 3 - - const events = await all(dhts[n].findProviders(v.cid)) - const provs = Object.values(events.reduce>((acc, curr) => { - if (curr.name === 'PEER_RESPONSE') { - curr.providers.forEach(peer => { - acc[peer.id.toString()] = peer.id - }) - } - - return acc - }, {})) - - expect(provs).to.have.length(1) - expect(provs[0].toString()).to.equal(ids[3].toString()) - } - }) - - it('provides if in server mode', async function () { - const dhts = await Promise.all([ - tdht.spawn(), - tdht.spawn(), - tdht.spawn(), - tdht.spawn() - ]) - - // connect peers - await Promise.all([ - tdht.connect(dhts[0], dhts[1]), - tdht.connect(dhts[1], dhts[2]), - tdht.connect(dhts[2], dhts[3]) - ]) - - const sendMessageSpy = sinon.spy(dhts[0].network, 'sendMessage') - - await dhts[0].setMode('server') - - await drain(dhts[0].provide(values[0].cid)) - - expect(sendMessageSpy.called).to.be.true() - }) - - it('find providers', async function () { - this.timeout(20 * 1000) - - const val = values[0] - - const dhts = await Promise.all([ - tdht.spawn(), - tdht.spawn(), - tdht.spawn() - ]) - - // Connect - await Promise.all([ - tdht.connect(dhts[0], dhts[1]), - tdht.connect(dhts[1], dhts[2]) - ]) - - await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(val.cid)) })) - - const events = await all(dhts[0].findProviders(val.cid)) - - // find providers find all the 3 providers - const provs = Object.values(events.reduce>((acc, curr) => { - if (curr.name === 'PEER_RESPONSE') { - curr.providers.forEach(peer => { - acc[peer.id.toString()] = peer.id - }) - } - - return acc - }, {})) - expect(provs).to.have.length(3) - }) - - it('find providers from client', async function () { - this.timeout(20 * 1000) - - const val = values[0] - - const dhts = await Promise.all([ - tdht.spawn(), - tdht.spawn(), - tdht.spawn() - ]) - const clientDHT = await tdht.spawn({ clientMode: true }) - - // Connect - await Promise.all([ - tdht.connect(clientDHT, dhts[0]), - tdht.connect(dhts[0], dhts[1]), - tdht.connect(dhts[1], dhts[2]) - ]) - - await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(val.cid)) })) - - const events = await all(dhts[0].findProviders(val.cid)) - - // find providers find all the 3 providers - const provs = Object.values(events.reduce>((acc, curr) => { - if (curr.name === 'PEER_RESPONSE') { - curr.providers.forEach(peer => { - acc[peer.id.toString()] = peer.id - }) - } - - return acc - }, {})) - expect(provs).to.have.length(3) - }) - - it('find client provider', async function () { - this.timeout(20 * 1000) - - const val = values[0] - - const dhts = await Promise.all([ - tdht.spawn(), - tdht.spawn() - ]) - const clientDHT = await tdht.spawn({ clientMode: true }) - - // Connect - await Promise.all([ - tdht.connect(clientDHT, dhts[0]), - tdht.connect(dhts[0], dhts[1]) - ]) - - await drain(clientDHT.provide(val.cid)) - - await delay(1e3) - - const events = await all(dhts[1].findProviders(val.cid)) - - // find providers find the client provider - const provs = Object.values(events.reduce>((acc, curr) => { - if (curr.name === 'PEER_RESPONSE') { - curr.providers.forEach(peer => { - acc[peer.id.toString()] = peer.id - }) - } - - return acc - }, {})) - expect(provs).to.have.length(1) - }) - - it('find one provider locally', async function () { - this.timeout(20 * 1000) - const val = values[0] - - const dht = await tdht.spawn() - - sinon.stub(dht.components.peerStore, 'get').withArgs(dht.components.peerId) - .resolves({ - id: dht.components.peerId, - addresses: [], - protocols: [], - tags: new Map(), - metadata: new Map() - }) - sinon.stub(dht.providers, 'getProviders').resolves([dht.components.peerId]) - - // Find provider - const events = await all(dht.findProviders(val.cid)) - const provs = Object.values(events.reduce>((acc, curr) => { - if (curr.name === 'PEER_RESPONSE') { - curr.providers.forEach(peer => { - acc[peer.id.toString()] = peer.id - }) - } - - return acc - }, {})) - expect(provs).to.have.length(1) - }) - }) - describe('peer routing', () => { it('findPeer', async function () { this.timeout(240 * 1000) @@ -659,103 +418,6 @@ describe('KadDHT', () => { expect(finalPeer.peer.id.equals(ids[ids.length - 1])).to.eql(true) }) - it('find peer query', async function () { - this.timeout(240 * 1000) - - // Create 101 nodes - const nDHTs = 101 - - const dhts = await Promise.all( - new Array(nDHTs).fill(0).map(async () => tdht.spawn()) - ) - - const dhtsById = new Map(dhts.map((d) => { - const peerId = d.components.peerId as unknown as PeerIdWithPrivateKey - peerId.privateKey = d.components.privateKey - - return [peerId, d] - })) - const ids = [...dhtsById.keys()] - - // The origin node for the FIND_PEER query - const originNode = dhts[0] - - // The key - const val = uint8ArrayFromString('foobar') - - // Hash the key into the DHT's key format - const rtval = await kadUtils.convertBuffer(val) - // Make connections between nodes close to each other - const sorted = await sortClosestPeers(ids, rtval) - - const conns: PeerIdWithPrivateKey[][] = [] - const maxRightIndex = sorted.length - 1 - for (let i = 0; i < sorted.length; i++) { - // Connect to 5 nodes on either side (10 in total) - for (const distance of [1, 3, 11, 31, 63]) { - let rightIndex = i + distance - if (rightIndex > maxRightIndex) { - rightIndex = maxRightIndex * 2 - (rightIndex + 1) - } - let leftIndex = i - distance - if (leftIndex < 0) { - leftIndex = 1 - leftIndex - } - conns.push([sorted[leftIndex], sorted[rightIndex]]) - } - } - - await Promise.all(conns.map(async (conn) => { - const dhtA = dhtsById.get(conn[0]) - const dhtB = dhtsById.get(conn[1]) - - if (dhtA == null || dhtB == null) { - throw new Error('Could not find DHT') - } - - await tdht.connect(dhtA, dhtB) - })) - - // Get the alpha (3) closest peers to the key from the origin's - // routing table - const rtablePeers = originNode.routingTable.closestPeers(rtval, c.ALPHA) - expect(rtablePeers).to.have.length(c.ALPHA) - - // The set of peers used to initiate the query (the closest alpha - // peers to the key that the origin knows about) - const rtableSet: Record = {} - rtablePeers.forEach((p) => { - rtableSet[p.toString()] = true - }) - - const originNodeIndex = ids.findIndex(i => uint8ArrayEquals(i.toMultihash().bytes, originNode.components.peerId.toMultihash().bytes)) - const otherIds = ids.slice(0, originNodeIndex).concat(ids.slice(originNodeIndex + 1)) - - // Make the query - const out = await pipe( - originNode.getClosestPeers(val), - source => filter(source, (event) => event.type === EventTypes.FINAL_PEER), - // @ts-expect-error tsc has problems with filtering - source => map(source, (event) => event.peer.id), - async source => all(source) - ) - - const actualClosest = await sortClosestPeers(otherIds, rtval) - - // Expect that the response includes nodes that are were not - // already in the origin's routing table (ie it went out to - // the network to find closer peers) - expect(out.filter((p) => !rtableSet[p.toString()])) - .to.not.be.empty() - - // The expected closest kValue peers to the key - const exp = actualClosest.slice(0, c.K) - - // Expect the kValue peers found to include the kValue closest connected - // peers to the key - expect(countDiffPeers(out, exp)).to.equal(0) - }) - it('getClosestPeers', async function () { this.timeout(240 * 1000) @@ -777,7 +439,7 @@ describe('KadDHT', () => { expect(res).to.not.be.empty() }) - it.skip('should not include itself in getClosestPeers PEER_RESPONSE', async function () { + it('should not include requester in getClosestPeers PEER_RESPONSE', async function () { this.timeout(240 * 1000) const nDHTs = 30 @@ -804,7 +466,7 @@ describe('KadDHT', () => { } expect(event.closer.map(peer => peer.id.toString())) - .to.not.include(event.from.toString()) + .to.not.include(dhts[1].components.peerId.toString()) } }) }) diff --git a/packages/kad-dht/test/rpc/handlers/add-provider.spec.ts b/packages/kad-dht/test/rpc/handlers/add-provider.spec.ts index 775b166dad..4f074c4556 100644 --- a/packages/kad-dht/test/rpc/handlers/add-provider.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/add-provider.spec.ts @@ -1,6 +1,8 @@ /* eslint-env mocha */ +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' +import { persistentPeerStore } from '@libp2p/peer-store' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { MemoryDatastore } from 'datastore-core' @@ -12,7 +14,7 @@ import { AddProviderHandler } from '../../../src/rpc/handlers/add-provider.js' import { createPeerIds } from '../../utils/create-peer-id.js' import { createValues } from '../../utils/create-values.js' import type { DHTMessageHandler } from '../../../src/rpc/index.js' -import type { PeerId } from '@libp2p/interface' +import type { Libp2pEvents, PeerId, PeerStore } from '@libp2p/interface' import type { CID } from 'multiformats' describe('rpc - handlers - AddProvider', () => { @@ -20,6 +22,7 @@ describe('rpc - handlers - AddProvider', () => { let values: Array<{ cid: CID, value: Uint8Array }> let handler: DHTMessageHandler let providers: Providers + let peerStore: PeerStore before(async () => { [peerIds, values] = await Promise.all([ @@ -30,6 +33,12 @@ describe('rpc - handlers - AddProvider', () => { beforeEach(async () => { const datastore = new MemoryDatastore() + peerStore = persistentPeerStore({ + peerId: peerIds[0], + datastore: new MemoryDatastore(), + events: new TypedEventEmitter(), + logger: defaultLogger() + }) providers = new Providers({ datastore, @@ -41,6 +50,8 @@ describe('rpc - handlers - AddProvider', () => { }) handler = new AddProviderHandler({ + peerId: peerIds[0], + peerStore, logger: defaultLogger() }, { providers, @@ -70,7 +81,7 @@ describe('rpc - handlers - AddProvider', () => { tests.forEach((t) => { it(t.error.toString(), async () => { try { - await handler.handle(peerIds[0], t.message) + await handler.handle(peerIds[1], t.message) } catch (err: any) { expect(err).to.exist() expect(err).to.have.property('name', t.error) @@ -94,17 +105,17 @@ describe('rpc - handlers - AddProvider', () => { const ma2 = multiaddr('/ip4/127.0.0.1/tcp/2345') msg.providers = [{ - id: peerIds[0].toMultihash().bytes, + id: peerIds[1].toMultihash().bytes, multiaddrs: [ma1.bytes] }, { - id: peerIds[1].toMultihash().bytes, + id: peerIds[2].toMultihash().bytes, multiaddrs: [ma2.bytes] }] - await handler.handle(peerIds[0], msg) + await handler.handle(peerIds[1], msg) const provs = await providers.getProviders(cid) expect(provs).to.have.length(1) - expect(provs[0].toString()).to.equal(peerIds[0].toString()) + expect(provs[0].toString()).to.equal(peerIds[1].toString()) }) }) diff --git a/packages/kad-dht/test/rpc/handlers/get-providers.spec.ts b/packages/kad-dht/test/rpc/handlers/get-providers.spec.ts index 4133018c6b..95d1cf50cf 100644 --- a/packages/kad-dht/test/rpc/handlers/get-providers.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/get-providers.spec.ts @@ -49,6 +49,7 @@ describe('rpc - handlers - GetProviders', () => { }) const components: GetProvidersHandlerComponents = { + peerId, peerStore, logger: defaultLogger() } @@ -101,7 +102,7 @@ describe('rpc - handlers - GetProviders', () => { }] providers.getProviders.withArgs(v.cid).resolves([providerPeer]) - peerRouting.getCloserPeersOffline.withArgs(msg.key, sourcePeer).resolves(closer) + peerRouting.getCloserPeersOffline.withArgs(msg.key, peerId).resolves(closer) await peerStore.merge(providerPeer, { multiaddrs: provider[0].multiaddrs diff --git a/packages/kad-dht/test/rpc/handlers/get-value.spec.ts b/packages/kad-dht/test/rpc/handlers/get-value.spec.ts index 1b3c537b99..d362fa46e2 100644 --- a/packages/kad-dht/test/rpc/handlers/get-value.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/get-value.spec.ts @@ -52,7 +52,8 @@ describe('rpc - handlers - GetValue', () => { handler = new GetValueHandler(components, { peerRouting, - logPrefix: 'dht' + logPrefix: 'dht', + datastorePrefix: '/dht' }) }) diff --git a/packages/kad-dht/test/rpc/handlers/put-value.spec.ts b/packages/kad-dht/test/rpc/handlers/put-value.spec.ts index 46ecf678c1..4c06c5f7c3 100644 --- a/packages/kad-dht/test/rpc/handlers/put-value.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/put-value.spec.ts @@ -35,7 +35,8 @@ describe('rpc - handlers - PutValue', () => { handler = new PutValueHandler(components, { validators, - logPrefix: 'dht' + logPrefix: 'dht', + datastorePrefix: '/dht' }) }) diff --git a/packages/kad-dht/test/rpc/index.node.ts b/packages/kad-dht/test/rpc/index.node.ts index 8485d2959c..aa42afa29f 100644 --- a/packages/kad-dht/test/rpc/index.node.ts +++ b/packages/kad-dht/test/rpc/index.node.ts @@ -67,6 +67,7 @@ describe('rpc', () => { validators, logPrefix: '', metricsPrefix: '', + datastorePrefix: '', peerInfoMapper: passthroughMapper }) }) diff --git a/packages/kad-dht/test/utils/index.ts b/packages/kad-dht/test/utils/index.ts deleted file mode 100644 index ba27fe0e69..0000000000 --- a/packages/kad-dht/test/utils/index.ts +++ /dev/null @@ -1,11 +0,0 @@ -import type { PeerId } from '@libp2p/interface' - -/** - * Count how many peers are in b but are not in a - */ -export function countDiffPeers (a: PeerId[], b: PeerId[]): number { - const s = new Set() - a.forEach((p) => s.add(p.toString())) - - return b.filter((p) => !s.has(p.toString())).length -} diff --git a/packages/kad-dht/test/utils/test-dht.ts b/packages/kad-dht/test/utils/test-dht.ts index 793afdecf6..bb1968b383 100644 --- a/packages/kad-dht/test/utils/test-dht.ts +++ b/packages/kad-dht/test/utils/test-dht.ts @@ -64,6 +64,11 @@ export class TestDHT { components.addressManager = addressManager + // ensure the current node is in it's own peer store + await components.peerStore.merge(peerId, { + multiaddrs: addressManager.getAddresses() + }) + const opts: KadDHTInit = { validators: { async v () {