From bc90b4fd58aee1ccd94d4fd61cc48d336e77d772 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 25 Nov 2024 09:00:10 +0000 Subject: [PATCH] fix: ensure user dial signals are respected (#2842) Make sure we always respect the users' dial signal over the default dial timeout, even if it is longer than the default. --- .../src/connection-manager/dial-queue.ts | 31 +++++------ .../connection-manager/dial-queue.spec.ts | 52 +++++++++++++++++++ 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 2c3c8b5a34..8300905d19 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -1,11 +1,11 @@ /* eslint-disable max-depth */ import { TimeoutError, DialError, setMaxListeners, AbortError } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' -import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue' -import { type Multiaddr, type Resolver, resolvers, multiaddr } from '@multiformats/multiaddr' +import { PriorityQueue } from '@libp2p/utils/priority-queue' +import { resolvers, multiaddr } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { Circuit } from '@multiformats/multiaddr-matcher' -import { type ClearableSignal, anySignal } from 'any-signal' +import { anySignal } from 'any-signal' import { CustomProgressEvent } from 'progress-events' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { DialDeniedError, NoValidAddressesError } from '../errors.js' @@ -23,7 +23,9 @@ import { resolveMultiaddrs } from './utils.js' import { DEFAULT_DIAL_PRIORITY } from './index.js' import type { AddressSorter, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions, OpenConnectionProgressEvents } from '@libp2p/interface' import type { OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' +import type { PriorityQueueJobOptions } from '@libp2p/utils/priority-queue' import type { DNS } from '@multiformats/dns' +import type { Multiaddr, Resolver } from '@multiformats/multiaddr' import type { ProgressOptions } from 'progress-events' export interface PendingDialTarget { @@ -204,7 +206,12 @@ export class DialQueue { options?.onProgress?.(new CustomProgressEvent('dial-queue:start-dial')) // create abort conditions - need to do this before `calculateMultiaddrs` as // we may be about to resolve a dns addr which can time out - const signal = this.createDialAbortController(options?.signal) + const signal = anySignal([ + this.shutDownController.signal, + options.signal + ]) + setMaxListeners(Infinity, signal) + let addrsToDial: Address[] try { @@ -299,25 +306,11 @@ export class DialQueue { peerId, priority: options.priority ?? DEFAULT_DIAL_PRIORITY, multiaddrs: new Set(multiaddrs.map(ma => ma.toString())), - signal: options.signal, + signal: options.signal ?? AbortSignal.timeout(this.dialTimeout), onProgress: options.onProgress }) } - private createDialAbortController (userSignal?: AbortSignal): ClearableSignal { - // let any signal abort the dial - const signal = anySignal([ - AbortSignal.timeout(this.dialTimeout), - this.shutDownController.signal, - userSignal - ]) - - // This emitter gets listened to a lot - setMaxListeners(Infinity, signal) - - return signal - } - // eslint-disable-next-line complexity private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set = new Set(), options: OpenConnectionOptions = {}): Promise { const addrs: Address[] = [...multiaddrs].map(ma => ({ diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index 1534b5b438..8cdfda32b2 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -9,6 +9,7 @@ import { WebRTC } from '@multiformats/multiaddr-matcher' import { expect } from 'aegir/chai' import delay from 'delay' import pDefer from 'p-defer' +import { raceSignal } from 'race-signal' import sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { DialQueue } from '../../src/connection-manager/dial-queue.js' @@ -325,4 +326,55 @@ describe('dial queue', () => { dialer = new DialQueue(components) await expect(dialer.dial(remotePeer)).to.eventually.equal(connection) }) + + it('should respect user dial signal over default timeout if it is passed', async () => { + const dialTimeout = 10 + const userTimeout = 500 + const connection = stubInterface() + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => { + await raceSignal(delay(userTimeout / 2), options?.signal) + + return connection + }) + + dialer = new DialQueue(components, { + dialTimeout + }) + + // dial slow peer with much longer timeout than the default + await expect(dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234'), { + signal: AbortSignal.timeout(userTimeout) + })) + .to.eventually.equal(connection) + }) + + it('should respect user dial signal during parallel dial of the same peer', async () => { + const dialTimeout = 10 + const userTimeout = 500 + const connection = stubInterface() + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => { + await raceSignal(delay(userTimeout / 2), options?.signal) + + return connection + }) + + dialer = new DialQueue(components, { + dialTimeout + }) + + const all = await Promise.allSettled([ + dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb')), + dialer.dial(multiaddr('/ip4/123.123.123.123/tcp/1234/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb'), { + signal: AbortSignal.timeout(userTimeout) + }) + ]) + + expect(all[0].status).to.equal('rejected', 'did not respect default dial timeout') + expect(all[1].status).to.equal('fulfilled', 'did not respect user dial timeout') + expect(components.transportManager.dial.callCount).to.equal(1, 'should have coalesced multiple dials to same dial') + }) })