diff --git a/src/index.ts b/src/index.ts index cbcb797..e2f6297 100644 --- a/src/index.ts +++ b/src/index.ts @@ -393,6 +393,10 @@ export class Upload extends Pumpify { this.upstreamEnded = true; }); + this.on('prefinish', () => { + this.upstreamEnded = true; + }); + this.once('writing', () => { // Now that someone is writing to this object, let's attach // some duplexes. These duplexes enable this object to be @@ -517,6 +521,7 @@ export class Upload extends Pumpify { const removeListeners = () => { this.removeListener('wroteToChunkBuffer', wroteToChunkBufferCallback); this.upstream.removeListener('finish', upstreamFinishedCallback); + this.removeListener('prefinish', upstreamFinishedCallback); }; // If there's data recently written it should be digested @@ -524,6 +529,7 @@ export class Upload extends Pumpify { // If the upstream finishes let's see if there's anything to grab this.upstream.once('finish', upstreamFinishedCallback); + this.once('prefinish', upstreamFinishedCallback); }); return willBeMoreChunks; diff --git a/test/test.ts b/test/test.ts index 4c79ef1..0d34edd 100644 --- a/test/test.ts +++ b/test/test.ts @@ -304,6 +304,16 @@ describe('gcs-resumable-upload', () => { assert.strictEqual(up.chunkSize, 123); }); + it('should set `upstreamEnded` to `true` on `prefinish`', () => { + const up = upload({bucket: BUCKET, file: FILE, chunkSize: 123}); + + assert.strictEqual(up.upstreamEnded, false); + + up.emit('prefinish'); + + assert.strictEqual(up.upstreamEnded, true); + }); + describe('on write', () => { let uri = ''; @@ -582,7 +592,7 @@ describe('gcs-resumable-upload', () => { it("should wait for upstream to 'finish' and resolve `true` if data is available", async () => { const result = await new Promise(resolve => { - up.upstream.once('newListener', (event: string) => { + up.upstream.on('newListener', (event: string) => { if (event === 'finish') { // Update the `upstreamChunkBuffer` before emitting 'finish' up.upstreamChunkBuffer = Buffer.from('abc'); @@ -597,12 +607,46 @@ describe('gcs-resumable-upload', () => { assert.equal(result, true); }); + it("should wait for 'prefinish' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => { + await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.emit('prefinish'); + }); + }); + + it("should wait for 'prefinish' and resolve `false` if data is not available", async () => { + const result = await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.emit('prefinish'); + }); + + assert.equal(result, false); + }); + + it("should wait for 'prefinish' and resolve `true` if data is available", async () => { + const result = await new Promise(resolve => { + up.on('newListener', (event: string) => { + if (event === 'prefinish') { + // Update the `upstreamChunkBuffer` before emitting 'prefinish' + up.upstreamChunkBuffer = Buffer.from('abc'); + + process.nextTick(() => up.emit('prefinish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(result, true); + }); + it('should remove listeners after calling back from `wroteToChunkBuffer`', async () => { assert.equal(up.listenerCount('finish'), 0); assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); await new Promise(resolve => { - up.once('newListener', (event: string) => { + up.on('newListener', (event: string) => { if (event === 'wroteToChunkBuffer') { process.nextTick(() => up.emit('wroteToChunkBuffer')); } @@ -613,14 +657,16 @@ describe('gcs-resumable-upload', () => { assert.equal(up.listenerCount('finish'), 0); assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); }); it("should remove listeners after calling back from upstream to 'finish'", async () => { assert.equal(up.listenerCount('finish'), 0); assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); await new Promise(resolve => { - up.upstream.once('newListener', (event: string) => { + up.upstream.on('newListener', (event: string) => { if (event === 'finish') { process.nextTick(() => up.upstream.emit('finish')); } @@ -631,6 +677,27 @@ describe('gcs-resumable-upload', () => { assert.equal(up.listenerCount('finish'), 0); assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + }); + + it("should remove listeners after calling back from 'prefinish'", async () => { + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + + await new Promise(resolve => { + up.on('newListener', (event: string) => { + if (event === 'prefinish') { + process.nextTick(() => up.emit('prefinish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); }); });