Skip to content

Commit

Permalink
Fix synchronization issue when input data is pushed before mappings +…
Browse files Browse the repository at this point in the history
… tests
  • Loading branch information
julianrojas87 committed Nov 3, 2023
1 parent e47fe5d commit f52a861
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 108 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"type": "module",
"scripts": {
"build": "tsc",
"prepublish": "tsc",
"test": "bun test --timeout 45000",
"watch": "tsc -w"
},
Expand Down
55 changes: 23 additions & 32 deletions src/rml/rml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import {
Store,
Writer as N3Writer,
} from "n3";
import { getJarFile } from "../util";
import { randomUUID, getJarFile } from "../util";
import { Cont, empty, match, pred, subject } from "rdf-lens";
import { RDF } from "@treecg/types";
import { RML, RMLT, VOID } from "../voc";

const { literal } = DataFactory;

// declare all characters
const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
const RML_MAPPER_RELEASE =
"https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar";

Expand Down Expand Up @@ -69,26 +67,26 @@ export async function rmlMapper(

// Read mapping input stream
mappingReader.data(async (input) => {
console.log("Got mapping input!");
console.log("[rmlMapper processor] ", "Got mapping input!");
try {
const newMapping = transformMapping(input, sources, targets);
const newLocation = `/tmp/rml-${uid}-mapping-${mappingLocations.length}.ttl`;
await writeFile(newLocation, newMapping, { encoding: "utf8" });
mappingLocations.push(newLocation);
console.log("Add new mapping file", newLocation);
console.log("[rmlMapper processor] ", "Add new mapping file", newLocation);

} catch (ex) {
console.error("Could not map incoming rml input");
console.error("[rmlMapper processor] ", "Could not map incoming rml input");
console.error(ex);
}
}).on("end", async () => {
// We assume mappings to be static and only proceed to execute them once we have them all
if (sources) {
for (let source of sources) {
console.log("handling source", source.location);
console.log("[rmlMapper processor] ", "Handling source", source.location);
// Process raw data input streams
source.dataInput.data(async (data) => {
console.log("Got data for ", source.location);
const handleSourceData = async (data: string) => {
console.log("[rmlMapper processor] ", "Got data for ", source.location);
source.hasData = true;
await writeFile(source.newLocation, data);

Expand All @@ -104,14 +102,21 @@ export async function rmlMapper(
defaultWriter
);
} else {
console.error("Cannot start mapping, not all data has been received");
console.error("[rmlMapper processor] ", "Cannot start mapping, not all data has been received");
}
});
};

// Register data event handler
source.dataInput.data(data => handleSourceData(data));
// Process data that has already been pushed to the input stream
if (source.dataInput.lastElement) {
await handleSourceData(source.dataInput.lastElement);
}
}
} else {
// No declared logical sources means that raw data access is delegated to the RML engine.
// For example, as in the case of remote RDBs or HTTP APIs
console.log("Start mapping now");
console.log("[rmlMapper processor] ", "Start mapping now");
await executeMappings(
mappingLocations,
jarFile,
Expand All @@ -122,20 +127,6 @@ export async function rmlMapper(
);
}
});

return () => {
console.log("RML mapping process started...");
};
}

function randomUUID(length = 8) {
let result = "";
const charactersLength = characters.length;
for (let i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}

return result;
}

function transformMapping(input: string, sources?: Source[], targets?: Target[],) {
Expand All @@ -158,13 +149,13 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[],
.thenAll(extractTarget);
const foundTargets = targetLens.execute(quads);

console.log("Found targets", foundTargets);
for (let foundTarget of foundTargets) {
if (targets) {
let found = false;
for (let target of targets) {
if (target.location === foundTarget.target.value) {
console.log(
"[rmlMapper processor] ",
"Moving location",
foundTarget.target.value,
"to",
Expand Down Expand Up @@ -212,7 +203,6 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[],
.thenAll(extractSource); // Logical sources

const foundSources = sourcesLens.execute(quads);
console.log("Found sources", foundSources);

// There exists a source that has no defined source, we cannot map this mapping
for (let foundSource of foundSources) {
Expand All @@ -221,6 +211,7 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[],
for (let source of sources) {
if (source.location === foundSource.source) {
console.log(
"[rmlMapper processor] ",
"Moving location",
foundSource.source,
"to",
Expand Down Expand Up @@ -272,15 +263,15 @@ async function executeMappings(

let out = "";
for (let mappingFile of mappingLocations) {
console.log("Running", mappingFile);
console.log("[rmlMapper processor] ", "Running", mappingFile);
const command = `java -jar ${jarFile} -m ${mappingFile} -o ${outputFile}`;

const proc = exec(command);
proc.stdout!.on("data", function (data) {
console.log("rml mapper std: ", data.toString());
console.log("[rmlMapper processor] ", "rml mapper std: ", data.toString());
});
proc.stderr!.on("data", function (data) {
console.error("rml mapper err:", data.toString());
console.error("[rmlMapper processor] ", "rml mapper err:", data.toString());
});
await new Promise((res) => proc.on("exit", res));

Expand All @@ -292,7 +283,7 @@ async function executeMappings(
await target.writer.push(file);
}
}
console.log("Done", mappingFile);
console.log("[rmlMapper processor] ", "Done", mappingFile);
}

console.log("All done");
Expand Down
18 changes: 15 additions & 3 deletions src/util.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { existsSync } from "fs";
import { randomUUID } from "crypto";
import { randomUUID as cryptoUUID } from "crypto";
import { exec } from "child_process";


const defaultLocation = "/tmp/rml-" + randomUUID() + ".jar";
const defaultLocation = "/tmp/rml-" + cryptoUUID() + ".jar";
let rmlJarPromise: undefined | Promise<string> = undefined;

export async function getJarFile(mLocation: string | undefined, offline: boolean, url: string): Promise<string> {
Expand All @@ -21,7 +21,7 @@ export async function getJarFile(mLocation: string | undefined, offline: boolean
}

if (!rmlJarPromise) {
rmlJarPromise = (async function() {
rmlJarPromise = (async function () {
const cmd = `wget ${url} -O ${location}`;
console.log("Executing $", cmd)
const proc = exec(cmd);
Expand All @@ -32,3 +32,15 @@ export async function getJarFile(mLocation: string | undefined, offline: boolean

return rmlJarPromise;
}

export function randomUUID(length = 8) {
// declare all characters
const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let result = "";
const charactersLength = characters.length;
for (let i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}

return result;
}
4 changes: 2 additions & 2 deletions src/yarrrml/yarrrml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { Writer as RDFWriter } from 'n3';

export function yarrrml2rml(reader: Stream<string>, writer: Writer<string>) {

const handle = (x: string) => {
const handle = async (x: string) => {
const y2r = new Y2R();
const triples = y2r.convert(x);

const str = new RDFWriter().quadsToString(triples);
return writer.push(str);
await writer.push(str);
}

reader.data(handle);
Expand Down
Loading

0 comments on commit f52a861

Please sign in to comment.