Skip to content

Commit

Permalink
Add jsdoc comment and inline comments for the importToListeners method
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Oct 26, 2023
1 parent ff97883 commit d6b67f9
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,29 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
}
}

/**
* This function will read the quad stream in an on-demand fashion, and will check if the quads already exist in the
* store. If they don't, they will be pushed into the storeImportStream, and the matching pendingStreams.
* @param stream A quad stream.
* @param storeImportStream A stream to import the quads into the store.
*/
protected importToListeners(stream: RDF.Stream<Q>, storeImportStream: PassThrough): void {
let streamEnded = false;
stream.once('readable', async() => {
let quad: Q | null = stream.read();
while (quad) {
const staticQuad = quad;
await new Promise<void>(resolve => {
// Match the new quad with the store.
const matchStream = this.store.match(
staticQuad.subject,
staticQuad.predicate,
staticQuad.object,
staticQuad.graph,
);

// If the StreamingStore hasn't ended, we add the quad to the storeImportStream and the corresponding
// pendingStreams and resolve to handle the next quad.
const handleEnd = (): void => {
if (!this.ended) {
storeImportStream.push(staticQuad);
Expand All @@ -65,20 +74,26 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
resolve();
};

// If the matchStream has a result, the quad already exists.
// We remove the 'end' listener and continue to the next quad.
matchStream.once('data', () => {
matchStream.removeListener('end', handleEnd);
resolve();
});

// If the matchStream has ended (and this listener isn't removed), the quad doesn't exist yet.
// So we call the handleEnd function.
matchStream.once('end', handleEnd);
});

quad = stream.read();
}
// If the stream has ended, all quads will be read from the quad stream, so we can end the storeImportStream.
if (streamEnded) {
storeImportStream.end();
return;
}
// If the stream hasn't ended, we recursively call this function to wait for the stream to become readable again.
this.importToListeners(stream, storeImportStream);
});

Expand Down

0 comments on commit d6b67f9

Please sign in to comment.