From 34547836f7bcea978c9d70909465014835864fc4 Mon Sep 17 00:00:00 2001 From: Julian Rojas Date: Tue, 13 Feb 2024 20:29:00 +0100 Subject: [PATCH] Fix logic to handle multiple inputs and mappings asynchronously --- src/rml/rml.ts | 61 ++++++++++++++++------------ test/rml.test.ts | 103 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 129 insertions(+), 35 deletions(-) diff --git a/src/rml/rml.ts b/src/rml/rml.ts index 4ada3c4..ca9664e 100644 --- a/src/rml/rml.ts +++ b/src/rml/rml.ts @@ -106,22 +106,31 @@ export async function rmlMapper( console.error(ex); } }).on("end", async () => { + console.log("[rmlMapper processor]", "Received all mapping rules!"); // We assume mappings to be static and only proceed to execute them once we have them all mappingsReady = true; if (sources) { - for (const source of sources) { - // Process data that has already been pushed to the input stream - if (source.sourceBuffer && source.sourceBuffer.length > 0) { - await handleDataUpdate( - source.sourceBuffer.shift()!, - mappingsReady, - sources, - targets, - mappingLocations, - jarFile, - outputFile, - defaultWriter - ); + if (sources.every(s => s.hasData)) { + // We made sure that all declared logical sources are present + console.log("[rmlMapper processor]", "Start mapping now (on end mappings event)"); + await executeMappings(mappingLocations, jarFile, outputFile, defaultWriter, sources, targets); + // Check for buffered input updates + for (const source of sources) { + if (source.sourceBuffer && source.sourceBuffer.length > 0) { + const update = source.sourceBuffer.shift(); + if (update) { + await handleDataUpdate( + update, + mappingsReady, + sources, + targets, + mappingLocations, + jarFile, + outputFile, + defaultWriter + ); + } + } } } } else { @@ -277,9 +286,9 @@ async function handleDataUpdate( const { source, data } = update; console.log("[rmlMapper processor]", "Got data for", source.location); - if (!mappingsReady) { + if (source.hasData && !mappingsReady) { // Mapping rules are still coming through or we already running a mapping process. - // Store this data update and process when ready. + // Buffer this data update and process it when ready. if (!source.sourceBuffer) { source.sourceBuffer = []; } @@ -289,7 +298,7 @@ async function handleDataUpdate( source.hasData = true; await writeFile(source.newLocation!, data); - if (sources.every((x) => x.hasData)) { + if (mappingsReady && sources.every((x) => x.hasData)) { // We made sure that all declared logical sources are present console.log("[rmlMapper processor]", "Start mapping now"); await executeMappings( @@ -301,19 +310,19 @@ async function handleDataUpdate( targets ); // Process buffered input updates - if (source.sourceBuffer) { - while (source.sourceBuffer.length > 0) { - const update = source.sourceBuffer.shift(); + for (const src of sources) { + if (src.sourceBuffer && src.sourceBuffer.length > 0) { + const update = src.sourceBuffer.shift(); if (update) { console.log("[rmlMapper processor]", "Processing buffered input", update.source.location); await handleDataUpdate( - update, - mappingsReady, - sources, - targets, - mappingLocations, - jarFile, - outputFile, + update, + mappingsReady, + sources, + targets, + mappingLocations, + jarFile, + outputFile, defaultWriter ); } diff --git a/test/rml.test.ts b/test/rml.test.ts index e6544c8..d32c80b 100644 --- a/test/rml.test.ts +++ b/test/rml.test.ts @@ -23,12 +23,13 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( @prefix ex: . `; - const RML_TM_LOCAL_SOURCE_AND_TARGET = ` + const RML_TM_LOCAL_SOURCE_AND_TARGET = (source?: string) => { + return ` ex:map_test-mapping_000 a rr:TriplesMap ; rdfs:label "test-mapping" ; rml:logicalSource [ a rml:LogicalSource ; - rml:source "dataset/data.xml" ; + rml:source "${source || "dataset/data.xml"}" ; rml:iterator "//data" ; rml:referenceFormulation ql:XPath ] ; @@ -72,7 +73,8 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( rr:termType rr:Literal ] ] . - `; + `; + } const RML_TM_LOCAL_SOURCE_AND_LDES_TARGET = ` ex:map_test-mapping_000 a rr:TriplesMap ; @@ -173,7 +175,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( rr:termType rr:Literal ] ] . - `; + `; }; const RML_TM_REMOTE_SOURCE_AND_NO_TARGET = ` @@ -243,10 +245,17 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( `; + const LOCAL_RAW_DATA_YET_ANOTHER_UPDATE = ` + + + + + `; + test("Mapping process with declared logical source and target", async () => { const rmlDoc = ` ${PREFIXES} - ${RML_TM_LOCAL_SOURCE_AND_TARGET} + ${RML_TM_LOCAL_SOURCE_AND_TARGET()} `; // Define function parameters const mappingReader = new SimpleStream(); @@ -366,7 +375,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( test("Mapping process with declared logical source data input arriving before mappings", async () => { const rmlDoc = ` ${PREFIXES} - ${RML_TM_LOCAL_SOURCE_AND_TARGET} + ${RML_TM_LOCAL_SOURCE_AND_TARGET()} `; // Define function parameters const mappingReader = new SimpleStream(); @@ -420,6 +429,84 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( await mappingReader.end(); }); + test("Mapping process with multiple declared logical sources data input arriving before mappings", async () => { + const rmlDoc1 = ` + ${PREFIXES} + ${RML_TM_LOCAL_SOURCE_AND_TARGET("dataset/data1.xml")} + `; + const rmlDoc2 = ` + ${PREFIXES} + ${RML_TM_LOCAL_SOURCE_AND_TARGET("dataset/data2.xml")} + `; + // Define function parameters + const mappingReader = new SimpleStream(); + const defaultOutputStream = new SimpleStream(); + const sourceInputStream1 = new SimpleStream(); + const sourceInputStream2 = new SimpleStream(); + const targetOutputStream = new SimpleStream(); + const sources: Source[] = [ + { + location: "dataset/data1.xml", + dataInput: sourceInputStream1, + hasData: false, + trigger: true + }, + { + location: "dataset/data2.xml", + dataInput: sourceInputStream2, + hasData: false, + trigger: true + } + ]; + const targets: Target[] = [ + { + location: "file:///results/output.nq", + writer: targetOutputStream, + data: "" + } + ]; + + // Check output + targetOutputStream.data((data: string) => { + const store = new Store(); + store.addQuads(new Parser().parse(data)); + expect(store.getQuads(null, null, null, null).length).toBe(4); + expect(store.getQuads( + "http://example.org/001", + RDF.type, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + expect(store.getQuads( + "http://example.org/002", + RDFS.label, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + + }); + + // Execute function + await rmlMapper(mappingReader, defaultOutputStream, sources, targets, "/tmp/rmlMapper.jar"); + + // Push some data asynchronously + await Promise.all([ + sourceInputStream1.push(LOCAL_RAW_DATA), + sourceInputStream2.push(LOCAL_RAW_DATA) + ]); + // Push some mapping + await mappingReader.push(rmlDoc1); + // Push some more data + await sourceInputStream1.push(LOCAL_RAW_DATA_UPDATE); + await sourceInputStream2.push(LOCAL_RAW_DATA_UPDATE); + await sourceInputStream1.push(LOCAL_RAW_DATA_YET_ANOTHER_UPDATE); + await sourceInputStream2.push(LOCAL_RAW_DATA_YET_ANOTHER_UPDATE); + // Finish pushing mappings input data + await mappingReader.push(rmlDoc2); + await mappingReader.end(); + await sleep(3000); + }); + test("Mapping process without any declared logical sources and using default output", async () => { const rmlDoc = ` ${PREFIXES} @@ -491,7 +578,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( test("Mapping process with declared and undeclared logical sources and targets", async () => { const rmlDoc = ` ${PREFIXES} - ${RML_TM_LOCAL_SOURCE_AND_TARGET} + ${RML_TM_LOCAL_SOURCE_AND_TARGET()} ${RML_TM_REMOTE_SOURCE_AND_NO_TARGET} `; @@ -665,10 +752,8 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( // Asynchronously push data updates sourceInputStream1.push(LOCAL_RAW_DATA); sourceInputStream2.push(LOCAL_RAW_DATA); - await sleep(1000); sourceInputStream1.push(LOCAL_RAW_DATA_UPDATE); await sourceInputStream2.push(LOCAL_RAW_DATA_UPDATE); - await sleep(3000); }); });