diff --git a/package.json b/package.json index b572ea8..d6f5ecc 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "type": "module", "scripts": { "build": "tsc", + "prepublish": "tsc", "test": "bun test --timeout 45000", "watch": "tsc -w" }, diff --git a/src/rml/rml.ts b/src/rml/rml.ts index 0a9e402..b8a05b0 100644 --- a/src/rml/rml.ts +++ b/src/rml/rml.ts @@ -10,15 +10,13 @@ import { Store, Writer as N3Writer, } from "n3"; -import { getJarFile } from "../util"; +import { randomUUID, getJarFile } from "../util"; import { Cont, empty, match, pred, subject } from "rdf-lens"; import { RDF } from "@treecg/types"; import { RML, RMLT, VOID } from "../voc"; const { literal } = DataFactory; -// declare all characters -const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; const RML_MAPPER_RELEASE = "https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar"; @@ -69,26 +67,26 @@ export async function rmlMapper( // Read mapping input stream mappingReader.data(async (input) => { - console.log("Got mapping input!"); + console.log("[rmlMapper processor] ", "Got mapping input!"); try { const newMapping = transformMapping(input, sources, targets); const newLocation = `/tmp/rml-${uid}-mapping-${mappingLocations.length}.ttl`; await writeFile(newLocation, newMapping, { encoding: "utf8" }); mappingLocations.push(newLocation); - console.log("Add new mapping file", newLocation); + console.log("[rmlMapper processor] ", "Add new mapping file", newLocation); } catch (ex) { - console.error("Could not map incoming rml input"); + console.error("[rmlMapper processor] ", "Could not map incoming rml input"); console.error(ex); } }).on("end", async () => { // We assume mappings to be static and only proceed to execute them once we have them all if (sources) { for (let source of sources) { - console.log("handling source", source.location); + console.log("[rmlMapper processor] ", "Handling source", source.location); // Process raw data input streams - source.dataInput.data(async (data) => { - console.log("Got data for ", source.location); + const handleSourceData = async (data: string) => { + console.log("[rmlMapper processor] ", "Got data for ", source.location); source.hasData = true; await writeFile(source.newLocation, data); @@ -104,14 +102,21 @@ export async function rmlMapper( defaultWriter ); } else { - console.error("Cannot start mapping, not all data has been received"); + console.error("[rmlMapper processor] ", "Cannot start mapping, not all data has been received"); } - }); + }; + + // Register data event handler + source.dataInput.data(data => handleSourceData(data)); + // Process data that has already been pushed to the input stream + if (source.dataInput.lastElement) { + await handleSourceData(source.dataInput.lastElement); + } } } else { // No declared logical sources means that raw data access is delegated to the RML engine. // For example, as in the case of remote RDBs or HTTP APIs - console.log("Start mapping now"); + console.log("[rmlMapper processor] ", "Start mapping now"); await executeMappings( mappingLocations, jarFile, @@ -122,20 +127,6 @@ export async function rmlMapper( ); } }); - - return () => { - console.log("RML mapping process started..."); - }; -} - -function randomUUID(length = 8) { - let result = ""; - const charactersLength = characters.length; - for (let i = 0; i < length; i++) { - result += characters.charAt(Math.floor(Math.random() * charactersLength)); - } - - return result; } function transformMapping(input: string, sources?: Source[], targets?: Target[],) { @@ -158,13 +149,13 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[], .thenAll(extractTarget); const foundTargets = targetLens.execute(quads); - console.log("Found targets", foundTargets); for (let foundTarget of foundTargets) { if (targets) { let found = false; for (let target of targets) { if (target.location === foundTarget.target.value) { console.log( + "[rmlMapper processor] ", "Moving location", foundTarget.target.value, "to", @@ -212,7 +203,6 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[], .thenAll(extractSource); // Logical sources const foundSources = sourcesLens.execute(quads); - console.log("Found sources", foundSources); // There exists a source that has no defined source, we cannot map this mapping for (let foundSource of foundSources) { @@ -221,6 +211,7 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[], for (let source of sources) { if (source.location === foundSource.source) { console.log( + "[rmlMapper processor] ", "Moving location", foundSource.source, "to", @@ -272,15 +263,15 @@ async function executeMappings( let out = ""; for (let mappingFile of mappingLocations) { - console.log("Running", mappingFile); + console.log("[rmlMapper processor] ", "Running", mappingFile); const command = `java -jar ${jarFile} -m ${mappingFile} -o ${outputFile}`; const proc = exec(command); proc.stdout!.on("data", function (data) { - console.log("rml mapper std: ", data.toString()); + console.log("[rmlMapper processor] ", "rml mapper std: ", data.toString()); }); proc.stderr!.on("data", function (data) { - console.error("rml mapper err:", data.toString()); + console.error("[rmlMapper processor] ", "rml mapper err:", data.toString()); }); await new Promise((res) => proc.on("exit", res)); @@ -292,7 +283,7 @@ async function executeMappings( await target.writer.push(file); } } - console.log("Done", mappingFile); + console.log("[rmlMapper processor] ", "Done", mappingFile); } console.log("All done"); diff --git a/src/util.ts b/src/util.ts index b4f8204..df1d034 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,9 +1,9 @@ import { existsSync } from "fs"; -import { randomUUID } from "crypto"; +import { randomUUID as cryptoUUID } from "crypto"; import { exec } from "child_process"; -const defaultLocation = "/tmp/rml-" + randomUUID() + ".jar"; +const defaultLocation = "/tmp/rml-" + cryptoUUID() + ".jar"; let rmlJarPromise: undefined | Promise = undefined; export async function getJarFile(mLocation: string | undefined, offline: boolean, url: string): Promise { @@ -21,7 +21,7 @@ export async function getJarFile(mLocation: string | undefined, offline: boolean } if (!rmlJarPromise) { - rmlJarPromise = (async function() { + rmlJarPromise = (async function () { const cmd = `wget ${url} -O ${location}`; console.log("Executing $", cmd) const proc = exec(cmd); @@ -32,3 +32,15 @@ export async function getJarFile(mLocation: string | undefined, offline: boolean return rmlJarPromise; } + +export function randomUUID(length = 8) { + // declare all characters + const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + let result = ""; + const charactersLength = characters.length; + for (let i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * charactersLength)); + } + + return result; +} diff --git a/src/yarrrml/yarrrml.ts b/src/yarrrml/yarrrml.ts index b989b61..537fe28 100644 --- a/src/yarrrml/yarrrml.ts +++ b/src/yarrrml/yarrrml.ts @@ -4,12 +4,12 @@ import { Writer as RDFWriter } from 'n3'; export function yarrrml2rml(reader: Stream, writer: Writer) { - const handle = (x: string) => { + const handle = async (x: string) => { const y2r = new Y2R(); const triples = y2r.convert(x); const str = new RDFWriter().quadsToString(triples); - return writer.push(str); + await writer.push(str); } reader.data(handle); diff --git a/test/rml.test.ts b/test/rml.test.ts index d93c800..19ac57a 100644 --- a/test/rml.test.ts +++ b/test/rml.test.ts @@ -5,7 +5,7 @@ import { deleteAsync } from "del"; import { rmlMapper, Source, Target } from "../src/rml/rml"; import { RDF, RDFS } from "../src/voc"; -describe("Functional tests for the rmlMapper Connector Architecture function", async () => { +describe("Functional tests for the rmlMapper Connector Architecture function", () => { const prefixes = ` @prefix rdfs: . @prefix xsd: . @@ -21,65 +21,66 @@ describe("Functional tests for the rmlMapper Connector Architecture function", a @prefix ex: . `; - test("Mapping process with declared logical sources and targets", async () => { - const rmlDoc = ` - ${prefixes} - ex:map_test-mapping_000 a rr:TriplesMap ; - rdfs:label "test-mapping" ; - rml:logicalSource [ - a rml:LogicalSource ; - rml:source "dataset/data.xml" ; - rml:iterator "//data" ; - rml:referenceFormulation ql:XPath - ] ; - rr:subjectMap [ - a rr:SubjectMap ; - rr:template "http://example.org/{@id}" ; - rml:logicalTarget [ - a rmlt:LogicalTarget ; - rmlt:serialization formats:N-Quads ; - rmlt:target [ - a void:Dataset ; - void:dataDump - ] - ] ; - rr:graphMap [ - a rr:GraphMap ; - rr:constant "http://example.org/myNamedGraph" + const rmlDoc = ` + ${prefixes} + ex:map_test-mapping_000 a rr:TriplesMap ; + rdfs:label "test-mapping" ; + rml:logicalSource [ + a rml:LogicalSource ; + rml:source "dataset/data.xml" ; + rml:iterator "//data" ; + rml:referenceFormulation ql:XPath + ] ; + rr:subjectMap [ + a rr:SubjectMap ; + rr:template "http://example.org/{@id}" ; + rml:logicalTarget [ + a rmlt:LogicalTarget ; + rmlt:serialization formats:N-Quads ; + rmlt:target [ + a void:Dataset ; + void:dataDump ] ] ; - rr:predicateObjectMap [ - a rr:PredicateObjectMap ; - rr:predicateMap [ - a rr:PredicateMap ; - rr:constant "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" - ] ; - rr:objectMap [ - a rr:ObjectMap ; - rr:constant ; - rr:termType rr:IRI - ] + rr:graphMap [ + a rr:GraphMap ; + rr:constant "http://example.org/myNamedGraph" + ] + ] ; + rr:predicateObjectMap [ + a rr:PredicateObjectMap ; + rr:predicateMap [ + a rr:PredicateMap ; + rr:constant "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" ] ; - rr:predicateObjectMap [ - a rr:PredicateObjectMap ; - rr:predicateMap [ - a rr:PredicateMap ; - rr:constant rdfs:label - ] ; - rr:objectMap [ - a rr:ObjectMap ; - rml:reference "@label" ; - rr:termType rr:Literal - ] - ] . - `; + rr:objectMap [ + a rr:ObjectMap ; + rr:constant ; + rr:termType rr:IRI + ] + ] ; + rr:predicateObjectMap [ + a rr:PredicateObjectMap ; + rr:predicateMap [ + a rr:PredicateMap ; + rr:constant rdfs:label + ] ; + rr:objectMap [ + a rr:ObjectMap ; + rml:reference "@label" ; + rr:termType rr:Literal + ] + ] . + `; - const rawData = ` - - - - - `; + const rawData = ` + + + + + `; + + test("Mapping process with declared logical sources and targets", async () => { // Define function parameters const mappingReader = new SimpleStream(); const sourceInputStream = new SimpleStream(); @@ -132,8 +133,61 @@ describe("Functional tests for the rmlMapper Connector Architecture function", a await sourceInputStream.push(rawData); }); + test("Mapping process with data inputs arriving before mappings", async () => { + // Define function parameters + const mappingReader = new SimpleStream(); + const sourceInputStream = new SimpleStream(); + const targetOutputStream = new SimpleStream(); + const sources: Source[] = [ + { + location: "dataset/data.xml", + newLocation: "", + dataInput: sourceInputStream, + hasData: false, + trigger: false + } + ]; + const targets: Target[] = [ + { + location: "file:///results/output.nq", + newLocation: "", + writer: targetOutputStream + } + ]; + + // Check output + targetOutputStream.data((data: string) => { + const store = new Store(); + store.addQuads(new Parser().parse(data)); + + expect(store.getQuads(null, null, null, null).length).toBe(4); + expect(store.getQuads( + "http://example.org/001", + RDF.type, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + expect(store.getQuads( + "http://example.org/002", + RDFS.label, + null, + "http://example.org/myNamedGraph").length + ).toBe(1); + }); + + // Execute function + await rmlMapper(mappingReader, sources, targets, undefined, "/tmp/rmlMapper.jar"); + + // Push raw input data first + await sourceInputStream.push(rawData); + + // Push mappings input data + await mappingReader.push(rmlDoc); + await mappingReader.end(); + }); + test("Mapping process without declared logical sources and default output", async () => { - const rmlDoc = ` + const rmlDoc2 = ` ${prefixes} ex:map_test-mapping_000 a rr:TriplesMap ; rdfs:label "test-mapping" ; @@ -203,7 +257,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", a await rmlMapper(mappingReader, undefined, undefined, outputStream, "/tmp/rmlMapper.jar"); // Push mappings input data - await mappingReader.push(rmlDoc); + await mappingReader.push(rmlDoc2); await mappingReader.end(); }); diff --git a/test/yarrrml.test.ts b/test/yarrrml.test.ts index 1be8045..bb2e8a0 100644 --- a/test/yarrrml.test.ts +++ b/test/yarrrml.test.ts @@ -4,21 +4,22 @@ import { yarrrml2rml } from "../src/yarrrml/yarrrml"; import { Parser, Store } from "n3"; import { RDF, RML, RR } from "../src/voc"; -describe("Functional tests for the yarrrml2rml Connector Architecture function", async () => { +describe("Functional tests for the yarrrml2rml Connector Architecture function", () => { const yarrrmlDoc = ` -prefixes: - ex: "http://example.org/" - rdfs: "http://www.w3.org/2000/01/rdf-schema#" + prefixes: + ex: "http://example.org/" + rdfs: "http://www.w3.org/2000/01/rdf-schema#" -mappings: - test-mapping: - sources: - - ["dataset/data.xml~xpath","/data"] - s: ex:$(@id) - po: - - [a, ex:Entity] - - [rdfs:label, $(@label)] - graph: ex:myNamedGraph`; + mappings: + test-mapping: + sources: + - ["dataset/data.xml~xpath","/data"] + s: ex:$(@id) + po: + - [a, ex:Entity] + - [rdfs:label, $(@label)] + graph: ex:myNamedGraph + `; test("Given a YARRRML document it produces RML triples", async () => { const reader = new SimpleStream();