Skip to content

Commit

Permalink
fix: ensure user dial signals are respected
Browse files Browse the repository at this point in the history
Make sure we always respect the users' dial signal over the default
dial timeout, even if it is longer than the default.
  • Loading branch information
achingbrain committed Nov 25, 2024
1 parent 4a85eb0 commit a649f74
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
31 changes: 12 additions & 19 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/* eslint-disable max-depth */
import { TimeoutError, DialError, setMaxListeners, AbortError } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue'
import { type Multiaddr, type Resolver, resolvers, multiaddr } from '@multiformats/multiaddr'
import { PriorityQueue } from '@libp2p/utils/priority-queue'
import { resolvers, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { Circuit } from '@multiformats/multiaddr-matcher'
import { type ClearableSignal, anySignal } from 'any-signal'
import { anySignal } from 'any-signal'
import { CustomProgressEvent } from 'progress-events'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { DialDeniedError, NoValidAddressesError } from '../errors.js'
Expand All @@ -23,7 +23,9 @@ import { resolveMultiaddrs } from './utils.js'
import { DEFAULT_DIAL_PRIORITY } from './index.js'
import type { AddressSorter, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions, OpenConnectionProgressEvents } from '@libp2p/interface'
import type { OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
import type { PriorityQueueJobOptions } from '@libp2p/utils/priority-queue'
import type { DNS } from '@multiformats/dns'
import type { Multiaddr, Resolver } from '@multiformats/multiaddr'
import type { ProgressOptions } from 'progress-events'

export interface PendingDialTarget {
Expand Down Expand Up @@ -204,7 +206,12 @@ export class DialQueue {
options?.onProgress?.(new CustomProgressEvent('dial-queue:start-dial'))
// create abort conditions - need to do this before `calculateMultiaddrs` as
// we may be about to resolve a dns addr which can time out
const signal = this.createDialAbortController(options?.signal)
const signal = anySignal([
this.shutDownController.signal,
options.signal
])
setMaxListeners(Infinity, signal)

let addrsToDial: Address[]

try {
Expand Down Expand Up @@ -299,25 +306,11 @@ export class DialQueue {
peerId,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
signal: options.signal,
signal: options.signal ?? AbortSignal.timeout(this.dialTimeout),
onProgress: options.onProgress
})
}

private createDialAbortController (userSignal?: AbortSignal): ClearableSignal {
// let any signal abort the dial
const signal = anySignal([
AbortSignal.timeout(this.dialTimeout),
this.shutDownController.signal,
userSignal
])

// This emitter gets listened to a lot
setMaxListeners(Infinity, signal)

return signal
}

// eslint-disable-next-line complexity
private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set<string> = new Set<string>(), options: OpenConnectionOptions = {}): Promise<Address[]> {
const addrs: Address[] = [...multiaddrs].map(ma => ({
Expand Down
52 changes: 52 additions & 0 deletions packages/libp2p/test/connection-manager/dial-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { WebRTC } from '@multiformats/multiaddr-matcher'
import { expect } from 'aegir/chai'
import delay from 'delay'
import pDefer from 'p-defer'
import { raceSignal } from 'race-signal'
import sinon from 'sinon'
import { type StubbedInstance, stubInterface } from 'sinon-ts'
import { DialQueue } from '../../src/connection-manager/dial-queue.js'
Expand Down Expand Up @@ -325,4 +326,55 @@ describe('dial queue', () => {
dialer = new DialQueue(components)
await expect(dialer.dial(remotePeer)).to.eventually.equal(connection)
})

it('should respect user dial signal over default timeout if it is passed', async () => {
const dialTimeout = 10
const userTimeout = 500
const connection = stubInterface<Connection>()

components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, options) => {
await raceSignal(delay(userTimeout / 2), options?.signal)

return connection
})

dialer = new DialQueue(components, {
dialTimeout
})

// dial slow peer with much longer timeout than the default
await expect(dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234'), {
signal: AbortSignal.timeout(userTimeout)
}))
.to.eventually.equal(connection)
})

it('should respect user dial signal during parallel dial of the same peer', async () => {
const dialTimeout = 10
const userTimeout = 500
const connection = stubInterface<Connection>()

components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, options) => {
await raceSignal(delay(userTimeout / 2), options?.signal)

return connection
})

dialer = new DialQueue(components, {
dialTimeout
})

const all = await Promise.allSettled([
dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb')),
dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb'), {
signal: AbortSignal.timeout(userTimeout)
})
])

expect(all[0].status).to.equal('rejected', 'did not respect default dial timeout')
expect(all[1].status).to.equal('fulfilled', 'did not respect user dial timeout')
expect(components.transportManager.dial.callCount).to.equal(1, 'should have coalesced multiple dials to same dial')
})
})

0 comments on commit a649f74

Please sign in to comment.