Skip to content

Commit

Permalink
Fix race condition when pending stream ends before store stream
Browse files Browse the repository at this point in the history
In very rare cases, this could occur, which would cause a few store
triples to silenty become lost.
  • Loading branch information
rubensworks committed Aug 23, 2024
1 parent 2c829c8 commit de73725
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
5 changes: 4 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@ module.exports = {
],
rules: {
'no-implicit-coercion': 'off'
}
},
globals: {
'AsyncIterableIterator': true,
},
};
14 changes: 10 additions & 4 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import type { EventEmitter } from 'events';
import type * as RDF from '@rdfjs/types';
import { Store } from 'n3';
import type { Readable } from 'readable-stream';
import { PassThrough } from 'readable-stream';
import { Readable, PassThrough } from 'readable-stream';
import { PendingStreamsIndex } from './PendingStreamsIndex';

interface ILocalStore<Q extends RDF.BaseQuad> extends RDF.Store<Q> {
Expand Down Expand Up @@ -45,7 +44,6 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
// Mark all pendingStreams as ended.
for (const pendingStream of this.pendingStreams.allStreams) {
pendingStream.push(null);
(<any> pendingStream)._pipeSource.unpipe();
}
}

Expand All @@ -67,6 +65,14 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
});
}

protected static async * concatStreams(readables: Readable[]): AsyncIterableIterator<any> {
for (const readable of readables) {
for await (const chunk of readable) {
yield chunk;
}
}
}

public import(stream: RDF.Stream<Q>): EventEmitter {
if (this.ended) {
throw new Error('Attempted to import into an ended StreamingStore');
Expand All @@ -90,7 +96,7 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
// The new pendingStream remains open, until the store is ended.
const pendingStream = new PassThrough({ objectMode: true });
this.pendingStreams.addPatternListener(pendingStream, subject, predicate, object, graph);
stream = storeResult.pipe(pendingStream, { end: false });
stream = Readable.from(StreamingStore.concatStreams([ storeResult, pendingStream ]));
(<any> stream)._pipeSource = storeResult;

// This is an ugly hack to annotate pendingStream with the isInitialized once the store stream started being read.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"readable-stream": "^4.3.0",
"@rdfjs/types": "*",
"@types/n3": "^1.10.4",
"@types/readable-stream": "^2.3.15"
"@types/readable-stream": "^4.0.15"
},
"pre-commit": [
"build",
Expand Down
22 changes: 21 additions & 1 deletion test/StreamingStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,9 @@ describe('StreamingStore', () => {
const listener = jest.fn();
match.on('data', listener);

await new Promise(resolve => importStream.on('end', resolve));
const p = new Promise(resolve => match.on('end', resolve));
store.end();
await p;

expect(listener).toHaveBeenCalledTimes(1);
});
Expand All @@ -534,4 +535,23 @@ describe('StreamingStore', () => {
expect(callback).toHaveBeenCalledWith(error);
store.end();
});

it('handles pending stream ending before store stream', async() => {
const importStream = new Readable({ objectMode: true });
importStream._read = () => {
importStream.push(quad('s1', 'p1', 'o1'));
importStream.push(null);
};
store.import(importStream);
await new Promise(resolve => importStream.on('end', resolve));

const match = store.match();
const listener = jest.fn();
match.on('data', listener);

store.end();
await new Promise(resolve => match.on('end', resolve));

expect(listener).toHaveBeenCalledTimes(1);
});
});
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,10 @@
dependencies:
"@types/node" "*"

"@types/readable-stream@^2.3.15":
version "2.3.15"
resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.15.tgz#3d79c9ceb1b6a57d5f6e6976f489b9b5384321ae"
integrity sha512-oM5JSKQCcICF1wvGgmecmHldZ48OZamtMxcGGVICOJA8o8cahXC1zEVAif8iwoc5j8etxFaRFnf095+CDsuoFQ==
"@types/readable-stream@^4.0.15":
version "4.0.15"
resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-4.0.15.tgz#e6ec26fe5b02f578c60baf1fa9452e90957d2bfb"
integrity sha512-oAZ3kw+kJFkEqyh7xORZOku1YAKvsFTogRY8kVl4vHpEKiDkfnSA/My8haRE7fvmix5Zyy+1pwzOi7yycGLBJw==
dependencies:
"@types/node" "*"
safe-buffer "~5.1.1"
Expand Down

0 comments on commit de73725

Please sign in to comment.