Skip to content

Commit

Permalink
partial solution but still not working
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Oct 25, 2023
1 parent b77d794 commit cdc3779
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions lib/StreamingStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,50 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
}
}

protected importToListeners(stream: RDF.Stream<Q>): void {
stream.on('data', async(quad: Q) => {
protected importToListeners(stream: RDF.Stream<Q>): RDF.Stream<Q> {
const storeImportStream = new PassThrough({ objectMode: true });

let streamEnded = false;
let processing = 0;
stream.on('data', async (quad: Q) => {

Check failure on line 48 in lib/StreamingStore.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected space before function parentheses
console.log(quad.subject.value, quad.predicate.value, quad.object.value, quad.graph.value);

Check failure on line 49 in lib/StreamingStore.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
processing++;
const matchStream = this.store.match(quad.subject, quad.predicate, quad.object, quad.graph);
matchStream.once('data', () => {
console.log('match found');

Check failure on line 53 in lib/StreamingStore.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
matchStream.removeAllListeners();
});

matchStream.once('end', () => {
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
if (!this.ended) {
console.log('matchSteam ended');

Check failure on line 58 in lib/StreamingStore.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
processing--;
if (!this.ended) {
storeImportStream.push(quad);
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
pendingStream.push(quad);
pendingStream.emit('quad', quad);
}
}
if (processing === 0 && streamEnded) {
storeImportStream.end();
}
});
});

stream.on('end', () => {
console.log('stream ended');

Check failure on line 74 in lib/StreamingStore.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
streamEnded = true;
});

return storeImportStream;
}

public import(stream: RDF.Stream<Q>): EventEmitter {
if (this.ended) {
throw new Error('Attempted to import into an ended StreamingStore');
}

this.importToListeners(stream);
return this.store.import(stream);
return this.store.import(this.importToListeners(stream));
}

public match(
Expand Down

0 comments on commit cdc3779

Please sign in to comment.