diff --git a/README.md b/README.md index 6e937a7..155947b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,32 @@ # rml-mapper-processor-ts -Typescript wrapper over the RML Mapper to be resused within the [Connector Architecture](https://github.com/TREEcg/connector-architecture). +Typescript wrappers over the RML-related operations to be reused within the [Connector Architecture](https://github.com/TREEcg/connector-architecture). Currently this repository exposes 2 functions: + +### [`js:Y2R`](https://github.com/julianrojas87/rml-mapper-processor-ts/blob/main/processors.ttl#L9): + +This processor takes a stream of YARRRML mapping files as input and converts them to their correspondent representation in RML quads. It relies on the [`yarrrml-parser`](https://github.com/RMLio/yarrrml-parser) library for executing the transformation. + +### [`js:RMLMapperReader`](https://github.com/julianrojas87/rml-mapper-processor-ts/blob/main/processors.ttl#L44): + +This processor is executes RML mapping rules using the Java-based [RMLMapper engine](https://github.com/RMLio/rmlmapper-java). A mapping process can be defined within a Connector Architecture (CA) pipeline, by defining an input stream of RML mappings, which will be executed sequentially. A set of logical sources (`js:rmlSource`) and targets (`js:rmlTarget`) can be optionally declared to make them visible to the CA pipeline. Otherwise a default output (`js:output`) needs to be defined, pointing to a file where all produced RDF triples/quads will be collected. + +Logical sources can be marked as trigger-based (`js:trigger`), to indicate that it will be updated in the future and therefore, multiple mapping executions will be required. Finally, a path (`js:rmlJar`) to a local RMLMapper can be given. An example definition of the processor is shown next: + +```turtle +[ ] a js:RMLMapperReader; + js:rmlSource [ + js:sourceLocation "dataset/data.xml"; + js:input ; + js:trigger true + ], [ + js:sourceLocation "dataset/static_data.json"; + js:input ; + ]; + js:rmlTarget [ + js:targetLocation "dataset/output.nt"; + js:output + ]; + js:mappings ; + # js:output ; # This parameter is only needed if no js:rmlTarget are defined + js:rmlJar <./rmlmapper-6.3.0-r0-all.jar>. +``` \ No newline at end of file diff --git a/processors.ttl b/processors.ttl index fd6de1a..718144d 100644 --- a/processors.ttl +++ b/processors.ttl @@ -63,14 +63,10 @@ js:RMLMapperReader a js:JsProcess; a fnom:PositionParameterMapping; fnom:functionParameter "defaultWriter"; fnom:implementationParameterPosition "3"^^xsd:integer; - ], [ - a fnom:PositionParameterMapping; - fnom:functionParameter "appendMapping"; - fnom:implementationParameterPosition "4"^^xsd:integer; ], [ a fnom:PositionParameterMapping; fnom:functionParameter "jarLocation"; - fnom:implementationParameterPosition "5"^^xsd:integer; + fnom:implementationParameterPosition "4"^^xsd:integer; ]; ]. @@ -132,11 +128,6 @@ js:RMLMapperReader a js:JsProcess; sh:name "defaultWriter"; sh:class :WriterChannel; sh:maxCount 1; - ], [ - sh:path js:appendMapping; - sh:name "appendMapping"; - sh:datatype xsd:boolean; - sh:maxCount 1; ], [ sh:path js:rmlJar; sh:name "jarLocation"; diff --git a/src/rml/rml.ts b/src/rml/rml.ts index 7cb4678..7ccca2d 100644 --- a/src/rml/rml.ts +++ b/src/rml/rml.ts @@ -10,7 +10,6 @@ import { Store, Writer as N3Writer, } from "n3"; -import { unlink } from "fs/promises"; import { getJarFile } from "../util"; import { Cont, empty, match, pred, subject } from "rdf-lens"; import { RDF } from "@treecg/types"; @@ -42,7 +41,6 @@ export async function rmlMapper( targets: Target[], mappingReader: Stream, defaultWriter?: Writer, - appendMapping?: boolean, jarLocation?: string, ) { const uid = randomUUID(); @@ -70,15 +68,11 @@ export async function rmlMapper( console.log("Got mapping input!"); try { const newMapping = transformMapping(input, sources, targets); - if (mappingLocations.length < 1 || appendMapping) { - const newLocation = `/tmp/rml-${uid}-mapping-${mappingLocations.length}.ttl`; - await writeFile(newLocation, newMapping, { encoding: "utf8" }); - mappingLocations.push(newLocation); - console.log("Add new mapping file", newLocation); - } else { - await writeFile(mappingLocations[0], newMapping, { encoding: "utf8" }); - console.log("Overwriting mapping file at", mappingLocations[0]); - } + const newLocation = `/tmp/rml-${uid}-mapping-${mappingLocations.length}.ttl`; + await writeFile(newLocation, newMapping, { encoding: "utf8" }); + mappingLocations.push(newLocation); + console.log("Add new mapping file", newLocation); + } catch (ex) { console.error("Could not map incoming rml input"); console.error(ex); @@ -87,6 +81,7 @@ export async function rmlMapper( // We assume mappings to be static and only proceed to execute them once we have them all for (let source of sources) { console.log("handling source", source.location); + // Process raw data input streams source.dataInput.data(async (data) => { console.log("Got data for ", source.location); source.hasData = true; @@ -96,11 +91,11 @@ export async function rmlMapper( // We made sure that all declared logical sources are present console.log("Start mapping now"); await executeMappings( - sources, - mappingLocations, - jarFile, - outputFile, - targets, + sources, + mappingLocations, + jarFile, + outputFile, + targets, defaultWriter ); } else { @@ -236,8 +231,8 @@ function transformMapping(input: string, sources: Source[], targets: Target[],) } async function executeMappings( - sources: Source[], - mappingLocations: string[], + sources: Source[], + mappingLocations: string[], jarFile: string, outputFile: string, targets: Target[], diff --git a/src/util.ts b/src/util.ts index ee510df..b4f8204 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,18 +1,11 @@ -import { existsSync, createReadStream } from "fs"; +import { existsSync } from "fs"; import { randomUUID } from "crypto"; import { exec } from "child_process"; -import * as N3 from "n3"; -import { createUriAndTermNamespace } from "@treecg/types"; - const defaultLocation = "/tmp/rml-" + randomUUID() + ".jar"; let rmlJarPromise: undefined | Promise = undefined; -const { namedNode } = N3.DataFactory; -const OWL = createUriAndTermNamespace("http://www.w3.org/2002/07/owl#", "imports"); - - export async function getJarFile(mLocation: string | undefined, offline: boolean, url: string): Promise { const location = mLocation || defaultLocation;