diff --git a/src/rml/rml.ts b/src/rml/rml.ts index 8ce91ad..7f628d7 100644 --- a/src/rml/rml.ts +++ b/src/rml/rml.ts @@ -1,6 +1,6 @@ import type { Stream, Writer } from "@ajuvercr/js-runner"; import { exec } from "child_process"; -import { access, constants, readFile, writeFile } from "fs/promises"; +import { access, constants, readFile, unlink, writeFile } from "fs/promises"; import { DataFactory, Parser, @@ -99,7 +99,7 @@ export async function rmlMapper( if (executing) { // We are already running a mapping process. - // Store this date update and process when it is finished. + // Store this data update and process when it is finished. console.log("[rmlMapper processor]", "Buffering input until previous mapping is finished"); sourceBuffer.push({ source, data }); } else { @@ -219,7 +219,7 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[], } } } - + // Extract logical Sources from incoming mapping const extractSource = empty() .map(({ id }) => ({ subject: id })) @@ -301,7 +301,7 @@ async function executeMappings( } let out = ""; - if(targets) { + if (targets) { // Initialize data holders of every declared target targets.forEach(t => t.data = ""); } @@ -323,25 +323,38 @@ async function executeMappings( if (targets) { for (let target of targets) { try { + // Temporal logical target files will be created by the mapping process where it corresponds await access(target.newLocation, constants.F_OK); target.data += await readFile(target.newLocation, { encoding: "utf8" }); - } catch (err) { - // Fallback to default output - out += await readFile(outputFile, { encoding: "utf8" }); - } + // Delete the temporal file to prevent it causes generated data to be sent to a target + // where it does not belong in subsequent mapping executions. + await unlink(target.newLocation); + } catch (err) { /* There was no data meant for this target in this mapping */ } } - } else { - out += await readFile(outputFile, { encoding: "utf8" }); } + + try { + await access(outputFile); + out += await readFile(outputFile, { encoding: "utf8" }); + await unlink(outputFile); + } catch (err) { /* There was no data meant for the default output in this mapping */ } + console.log("[rmlMapper processor]", "Done", mappingFile, `in ${new Date().getTime() - t0.getTime()} ms`); } console.log("[rmlMapper processor]", "All done"); if (targets) { targets.forEach(async target => { - await target.writer.push(target.data); + // Account for the possibility that a declared target and the default target are the same. + // This is important to make sure that all produced triples/quads are pushed downstream together. + if (target.writer === defaultWriter) { + out += target.data; + } else { + await target.writer.push(target.data); + } }); - } else if (out !== "") { + } + if (out !== "") { await defaultWriter.push(out); } } diff --git a/test/rml.test.ts b/test/rml.test.ts index e760053..0bf1a19 100644 --- a/test/rml.test.ts +++ b/test/rml.test.ts @@ -487,6 +487,69 @@ describe("Functional tests for the rmlMapper Connector Architecture function", ( await mappingReader.end(); }); + test("Mapping process with declared and undeclared logical sources and targets", async () => { + const rmlDoc = ` + ${PREFIXES} + ${RML_TM_LOCAL_SOURCE_AND_TARGET} + ${RML_TM_REMOTE_SOURCE_AND_NO_TARGET} + `; + + // Define function parameters + const mappingReader = new SimpleStream(); + const sourceInputStream = new SimpleStream(); + const outputStream = new SimpleStream(); + const sources: Source[] = [ + { + location: "dataset/data.xml", + newLocation: "", + dataInput: sourceInputStream, + hasData: false, + trigger: false + } + ]; + + const targets: Target[] = [ + { + location: "file:///results/output.nq", + newLocation: "", + writer: outputStream, // Here we are using the same stream as the default output + data: "" + } + ]; + + // Check output + outputStream.data((data: string) => { + const store = new Store(); + store.addQuads(new Parser().parse(data)); + expect(store.getQuads(null, RDF.type, "http://example.org/Entity", null).length).toBeGreaterThan(0); + expect(store.getQuads(null, RDFS.label, null, null).length).toBeGreaterThan(0); + expect(store.getQuads(null, "http://example.org/name", null, null).length).toBeGreaterThan(0); + expect(store.getQuads(null, "http://example.org/availableBikes", null, null).length).toBeGreaterThan(0); + expect(store.getQuads( + "http://example.org/001", + RDF.type, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + expect(store.getQuads( + "http://example.org/002", + RDFS.label, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + }); + + // Execute function + await rmlMapper(mappingReader, outputStream, sources, targets, "/tmp/rmlMapper.jar"); + + // Push raw input data first + await sourceInputStream.push(LOCAL_RAW_DATA); + + // Push mappings input data + await mappingReader.push(rmlDoc); + await mappingReader.end(); + }); + test("Mapping process with async input updates", async () => { const rmlDoc = ` ${PREFIXES}