From a8832f6d237eb079d6b416c5ada6be17f7c7a0e1 Mon Sep 17 00:00:00 2001 From: JrMasterModelBuilder Date: Mon, 2 Oct 2023 00:51:29 -0400 Subject: [PATCH] Ensure pipeline compatibility and stream does not skip --- src/archive.spec.ts | 46 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/archive.spec.ts b/src/archive.spec.ts index 6290fb9..d8bbba2 100644 --- a/src/archive.spec.ts +++ b/src/archive.spec.ts @@ -5,7 +5,8 @@ import {ok, strictEqual, notStrictEqual} from 'node:assert'; import {mkdir, rm} from 'node:fs/promises'; import {platform as osPlatform} from 'node:os'; import {join as pathJoin} from 'node:path'; -import {Readable} from 'node:stream'; +import {Readable, Writable} from 'node:stream'; +import {pipeline} from 'node:stream/promises'; import {Archive, Entry, IEntryInfo} from './archive'; import {PathType} from './types'; @@ -236,14 +237,6 @@ export function testArchive( const {type, size} = entry; const stream = await entry.stream(); - if (stream) { - strictEqual(stream.listenerCount('data'), 0); - - await new Promise(resolve => - setTimeout(resolve, 100) - ); - } - const buffer = stream ? await streamToBuffer(stream) : null; @@ -415,6 +408,41 @@ export function testArchive( }); }); + void it('pipeline', async () => { + await withSetup(path, async path => { + const archive = new ArchiveConstructor(path); + + await archive.read(async entry => { + const {size} = entry; + const stream = await entry.stream(); + + if (!stream) { + return; + } + + // Ensure stream does not read before we do. + strictEqual(stream.listenerCount('data'), 0); + await new Promise(resolve => + setTimeout(resolve, 100) + ); + + let read = 0; + await pipeline( + stream, + new Writable({ + write: (chunk: Buffer, _encoding, cb) => { + read += chunk.length; + cb(); + } + }) + ); + if (size !== null) { + strictEqual(read, size); + } + }); + }); + }); + if (skippable) { void it('skip', async () => { await withSetup(path, async path => {