From cdc3779b12efa1149d236e253a6abd3d349efa6d Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Wed, 25 Oct 2023 11:07:17 +0200 Subject: [PATCH] partial solution but still not working --- lib/StreamingStore.ts | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/lib/StreamingStore.ts b/lib/StreamingStore.ts index fbfd337..7ad0737 100644 --- a/lib/StreamingStore.ts +++ b/lib/StreamingStore.ts @@ -40,22 +40,42 @@ implements RDF.Source, RDF.Sink, EventEmitter> { } } - protected importToListeners(stream: RDF.Stream): void { - stream.on('data', async(quad: Q) => { + protected importToListeners(stream: RDF.Stream): RDF.Stream { + const storeImportStream = new PassThrough({ objectMode: true }); + + let streamEnded = false; + let processing = 0; + stream.on('data', async (quad: Q) => { + console.log(quad.subject.value, quad.predicate.value, quad.object.value, quad.graph.value); + processing++; const matchStream = this.store.match(quad.subject, quad.predicate, quad.object, quad.graph); matchStream.once('data', () => { + console.log('match found'); matchStream.removeAllListeners(); }); matchStream.once('end', () => { - for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) { - if (!this.ended) { + console.log('matchSteam ended'); + processing--; + if (!this.ended) { + storeImportStream.push(quad); + for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) { pendingStream.push(quad); pendingStream.emit('quad', quad); } } + if (processing === 0 && streamEnded) { + storeImportStream.end(); + } }); }); + + stream.on('end', () => { + console.log('stream ended'); + streamEnded = true; + }); + + return storeImportStream; } public import(stream: RDF.Stream): EventEmitter { @@ -63,8 +83,7 @@ implements RDF.Source, RDF.Sink, EventEmitter> { throw new Error('Attempted to import into an ended StreamingStore'); } - this.importToListeners(stream); - return this.store.import(stream); + return this.store.import(this.importToListeners(stream)); } public match(