diff --git a/src/archive.spec.ts b/src/archive.spec.ts index 6290fb9..d8bbba2 100644 --- a/src/archive.spec.ts +++ b/src/archive.spec.ts @@ -5,7 +5,8 @@ import {ok, strictEqual, notStrictEqual} from 'node:assert'; import {mkdir, rm} from 'node:fs/promises'; import {platform as osPlatform} from 'node:os'; import {join as pathJoin} from 'node:path'; -import {Readable} from 'node:stream'; +import {Readable, Writable} from 'node:stream'; +import {pipeline} from 'node:stream/promises'; import {Archive, Entry, IEntryInfo} from './archive'; import {PathType} from './types'; @@ -236,14 +237,6 @@ export function testArchive( const {type, size} = entry; const stream = await entry.stream(); - if (stream) { - strictEqual(stream.listenerCount('data'), 0); - - await new Promise(resolve => - setTimeout(resolve, 100) - ); - } - const buffer = stream ? await streamToBuffer(stream) : null; @@ -415,6 +408,41 @@ export function testArchive( }); }); + void it('pipeline', async () => { + await withSetup(path, async path => { + const archive = new ArchiveConstructor(path); + + await archive.read(async entry => { + const {size} = entry; + const stream = await entry.stream(); + + if (!stream) { + return; + } + + // Ensure stream does not read before we do. + strictEqual(stream.listenerCount('data'), 0); + await new Promise(resolve => + setTimeout(resolve, 100) + ); + + let read = 0; + await pipeline( + stream, + new Writable({ + write: (chunk: Buffer, _encoding, cb) => { + read += chunk.length; + cb(); + } + }) + ); + if (size !== null) { + strictEqual(read, size); + } + }); + }); + }); + if (skippable) { void it('skip', async () => { await withSetup(path, async path => {