diff --git a/benchmark/package.json b/benchmark/package.json new file mode 100644 index 0000000..b65e677 --- /dev/null +++ b/benchmark/package.json @@ -0,0 +1,15 @@ +{ + "name": "yamux-benchmarks", + "version": "0.0.1", + "description": "Benchmark for Libp2p's Yamux specification in Js", + "main": "benchmark.js", + "type": "module", + "author": "", + "license": "ISC", + "dependencies": { + "it-drain": "^3.0.3", + "it-pair": "^2.0.6", + "it-pipe": "^3.0.1", + "uint8arraylist": "^2.4.3" + } +} diff --git a/benchmark/stream-transfer.js b/benchmark/stream-transfer.js new file mode 100644 index 0000000..e588836 --- /dev/null +++ b/benchmark/stream-transfer.js @@ -0,0 +1,56 @@ +import drain from 'it-drain' +import { duplexPair } from 'it-pair/duplex' +import { pipe } from 'it-pipe' +import { Uint8ArrayList } from 'uint8arraylist' +import { yamux } from '../dist/src/index.js' + +const DATA_LENGTH = 1024 * 1024 * 1024 * 5 +const CHUNK_SIZE = (1024 * 1024) / 4 +const ITERATIONS = 10 + +const results = [] + +for (let i = 0; i < ITERATIONS; i++) { + const p = duplexPair() + const muxerA = yamux()().createStreamMuxer({ + direction: 'outbound' + }) + const muxerB = yamux()().createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + // echo stream back to itself + pipe(stream, stream) + } + }) + + // pipe data through muxers + pipe(p[0], muxerA, p[0]) + pipe(p[1], muxerB, p[1]) + + const stream = await muxerA.newStream() + + const start = Date.now() + + await pipe( + async function * () { + for (let i = 0; i < DATA_LENGTH; i += CHUNK_SIZE) { + yield * new Uint8ArrayList(new Uint8Array(CHUNK_SIZE)) + } + }, + stream, + (source) => drain(source) + ) + + const finish = Date.now() - start + + muxerA.close() + muxerB.close() + + results.push(finish) +} + +const megs = DATA_LENGTH / (1024 * 1024) +const secs = (results.reduce((acc, curr) => acc + curr, 0) / results.length) / 1000 + +// eslint-disable-next-line no-console +console.info((megs / secs).toFixed(2), 'MB/s') diff --git a/package.json b/package.json index 9a2728c..c916142 100644 --- a/package.json +++ b/package.json @@ -173,7 +173,7 @@ "dependencies": { "@libp2p/interface": "^0.1.0", "@libp2p/logger": "^3.0.0", - "abortable-iterator": "^5.0.1", + "get-iterator": "^2.0.1", "it-foreach": "^2.0.3", "it-pipe": "^3.0.1", "it-pushable": "^3.2.0", diff --git a/src/muxer.ts b/src/muxer.ts index 9f3f6ac..7e71ac1 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,7 +1,6 @@ import { CodeError } from '@libp2p/interface/errors' import { logger, type Logger } from '@libp2p/logger' -import { abortableSource } from 'abortable-iterator' -import { pipe } from 'it-pipe' +import { getIterator } from 'get-iterator' import { pushable, type Pushable } from 'it-pushable' import { type Config, defaultConfig, verifyConfig } from './config.js' import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js' @@ -102,23 +101,33 @@ export class YamuxMuxer implements StreamMuxer { }) this.sink = async (source: Source): Promise => { - source = abortableSource( - source, - this.closeController.signal, - { returnOnAbort: true } - ) + const shutDownListener = (): void => { + const iterator = getIterator(source) + + if (iterator.return != null) { + const res = iterator.return() + + if (isPromise(res)) { + res.catch(err => { + this.log?.('could not cause sink source to return', err) + }) + } + } + } let reason, error try { const decoder = new Decoder(source) - await pipe( - decoder.emitFrames.bind(decoder), - async source => { - for await (const { header, readData } of source) { - await this.handleFrame(header, readData) - } + + try { + this.closeController.signal.addEventListener('abort', shutDownListener) + + for await (const frame of decoder.emitFrames()) { + await this.handleFrame(frame.header, frame.readData) } - ) + } finally { + this.closeController.signal.removeEventListener('abort', shutDownListener) + } reason = GoAwayCode.NormalTermination } catch (err: unknown) { @@ -570,3 +579,7 @@ export class YamuxMuxer implements StreamMuxer { }) } } + +function isPromise (thing: any): thing is Promise { + return thing != null && typeof thing.then === 'function' +}