Skip to content

Commit

Permalink
Ensure pipeline compatibility and stream does not skip
Browse files Browse the repository at this point in the history
  • Loading branch information
JrMasterModelBuilder committed Oct 2, 2023
1 parent 6e81f14 commit a8832f6
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions src/archive.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {ok, strictEqual, notStrictEqual} from 'node:assert';
import {mkdir, rm} from 'node:fs/promises';
import {platform as osPlatform} from 'node:os';
import {join as pathJoin} from 'node:path';
import {Readable} from 'node:stream';
import {Readable, Writable} from 'node:stream';
import {pipeline} from 'node:stream/promises';

import {Archive, Entry, IEntryInfo} from './archive';
import {PathType} from './types';
Expand Down Expand Up @@ -236,14 +237,6 @@ export function testArchive(
const {type, size} = entry;
const stream = await entry.stream();

if (stream) {
strictEqual(stream.listenerCount('data'), 0);

await new Promise(resolve =>
setTimeout(resolve, 100)
);
}

const buffer = stream
? await streamToBuffer(stream)
: null;
Expand Down Expand Up @@ -415,6 +408,41 @@ export function testArchive(
});
});

void it('pipeline', async () => {
await withSetup(path, async path => {
const archive = new ArchiveConstructor(path);

await archive.read(async entry => {
const {size} = entry;
const stream = await entry.stream();

if (!stream) {
return;
}

// Ensure stream does not read before we do.
strictEqual(stream.listenerCount('data'), 0);
await new Promise(resolve =>
setTimeout(resolve, 100)
);

let read = 0;
await pipeline(
stream,
new Writable({
write: (chunk: Buffer, _encoding, cb) => {
read += chunk.length;
cb();
}
})
);
if (size !== null) {
strictEqual(read, size);
}
});
});
});

if (skippable) {
void it('skip', async () => {
await withSetup(path, async path => {
Expand Down

0 comments on commit a8832f6

Please sign in to comment.