Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
fix: Fix support for streams without content-length property (#491)
Browse files Browse the repository at this point in the history
In `pumpify` the `prefinish` event is emitted when an upstream writer has ended
  • Loading branch information
danielbankhead authored Jan 6, 2022
1 parent 985d788 commit ac2f73b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ export class Upload extends Pumpify {
this.upstreamEnded = true;
});

this.on('prefinish', () => {
this.upstreamEnded = true;
});

this.once('writing', () => {
// Now that someone is writing to this object, let's attach
// some duplexes. These duplexes enable this object to be
Expand Down Expand Up @@ -517,13 +521,15 @@ export class Upload extends Pumpify {
const removeListeners = () => {
this.removeListener('wroteToChunkBuffer', wroteToChunkBufferCallback);
this.upstream.removeListener('finish', upstreamFinishedCallback);
this.removeListener('prefinish', upstreamFinishedCallback);
};

// If there's data recently written it should be digested
this.once('wroteToChunkBuffer', wroteToChunkBufferCallback);

// If the upstream finishes let's see if there's anything to grab
this.upstream.once('finish', upstreamFinishedCallback);
this.once('prefinish', upstreamFinishedCallback);
});

return willBeMoreChunks;
Expand Down
73 changes: 70 additions & 3 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ describe('gcs-resumable-upload', () => {
assert.strictEqual(up.chunkSize, 123);
});

it('should set `upstreamEnded` to `true` on `prefinish`', () => {
const up = upload({bucket: BUCKET, file: FILE, chunkSize: 123});

assert.strictEqual(up.upstreamEnded, false);

up.emit('prefinish');

assert.strictEqual(up.upstreamEnded, true);
});

describe('on write', () => {
let uri = '';

Expand Down Expand Up @@ -582,7 +592,7 @@ describe('gcs-resumable-upload', () => {

it("should wait for upstream to 'finish' and resolve `true` if data is available", async () => {
const result = await new Promise(resolve => {
up.upstream.once('newListener', (event: string) => {
up.upstream.on('newListener', (event: string) => {
if (event === 'finish') {
// Update the `upstreamChunkBuffer` before emitting 'finish'
up.upstreamChunkBuffer = Buffer.from('abc');
Expand All @@ -597,12 +607,46 @@ describe('gcs-resumable-upload', () => {
assert.equal(result, true);
});

it("should wait for 'prefinish' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => {
await new Promise(resolve => {
up.waitForNextChunk().then(resolve);
up.emit('prefinish');
});
});

it("should wait for 'prefinish' and resolve `false` if data is not available", async () => {
const result = await new Promise(resolve => {
up.waitForNextChunk().then(resolve);
up.emit('prefinish');
});

assert.equal(result, false);
});

it("should wait for 'prefinish' and resolve `true` if data is available", async () => {
const result = await new Promise(resolve => {
up.on('newListener', (event: string) => {
if (event === 'prefinish') {
// Update the `upstreamChunkBuffer` before emitting 'prefinish'
up.upstreamChunkBuffer = Buffer.from('abc');

process.nextTick(() => up.emit('prefinish'));
}
});

up.waitForNextChunk().then(resolve);
});

assert.equal(result, true);
});

it('should remove listeners after calling back from `wroteToChunkBuffer`', async () => {
assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);

await new Promise(resolve => {
up.once('newListener', (event: string) => {
up.on('newListener', (event: string) => {
if (event === 'wroteToChunkBuffer') {
process.nextTick(() => up.emit('wroteToChunkBuffer'));
}
Expand All @@ -613,14 +657,16 @@ describe('gcs-resumable-upload', () => {

assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);
});

it("should remove listeners after calling back from upstream to 'finish'", async () => {
assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);

await new Promise(resolve => {
up.upstream.once('newListener', (event: string) => {
up.upstream.on('newListener', (event: string) => {
if (event === 'finish') {
process.nextTick(() => up.upstream.emit('finish'));
}
Expand All @@ -631,6 +677,27 @@ describe('gcs-resumable-upload', () => {

assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);
});

it("should remove listeners after calling back from 'prefinish'", async () => {
assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);

await new Promise(resolve => {
up.on('newListener', (event: string) => {
if (event === 'prefinish') {
process.nextTick(() => up.emit('prefinish'));
}
});

up.waitForNextChunk().then(resolve);
});

assert.equal(up.listenerCount('finish'), 0);
assert.equal(up.listenerCount('wroteToChunkBuffer'), 0);
assert.equal(up.listenerCount('prefinish'), 1);
});
});

Expand Down

0 comments on commit ac2f73b

Please sign in to comment.