Skip to content

Commit

Permalink
Add functional tests for processor functions
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 committed Nov 2, 2023
1 parent f55b13c commit cab34a3
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 112 deletions.
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@jest/globals": "^29.7.0",
"@types/n3": "^1.16.3",
"@types/node": "^20.8.10",
"del": "^7.1.0",
"ts-node": "^10.9.1",
"typescript": "^5.2.2"
}
Expand Down
26 changes: 15 additions & 11 deletions processors.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ js:RMLMapperReader a js:JsProcess;
a fno:Mapping;
fno:parameterMapping [
a fnom:PositionParameterMapping;
fnom:functionParameter "sources";
fnom:functionParameter "mappingInput";
fnom:implementationParameterPosition "0"^^xsd:integer;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "targets";
fnom:functionParameter "sources";
fnom:implementationParameterPosition "1"^^xsd:integer;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "mappingInput";
fnom:functionParameter "targets";
fnom:implementationParameterPosition "2"^^xsd:integer;
], [
a fnom:PositionParameterMapping;
Expand Down Expand Up @@ -110,28 +110,32 @@ js:RMLMapperReader a js:JsProcess;
[ ] a sh:NodeShape;
sh:targetClass js:RMLMapperReader;
sh:property [
sh:path js:mappings;
sh:name "mappingInput";
sh:class :ReaderChannel;
sh:minCount 1;
sh:maxCount 1
], [
sh:path js:rmlSource;
sh:name "sources";
sh:class js:RmlSource;
sh:minCount 0
], [
sh:path js:rmlTarget;
sh:name "targets";
sh:class js:RmlTarget;
], [
sh:path js:mappings;
sh:name "mappingInput";
sh:class :ReaderChannel;
sh:maxCount 1;
sh:minCount 1;
sh:minCount 0
], [
sh:path js:output;
sh:name "defaultWriter";
sh:class :WriterChannel;
sh:maxCount 1;
sh:minCount 1;
sh:maxCount 1
], [
sh:path js:rmlJar;
sh:name "jarLocation";
sh:datatype xsd:string;
sh:maxCount 1;
sh:minCount 0;
sh:maxCount 1
].

226 changes: 126 additions & 100 deletions src/rml/rml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,31 @@ export type Target = {
};

export async function rmlMapper(
sources: Source[],
targets: Target[],
mappingReader: Stream<string>,
sources?: Source[],
targets?: Target[],
defaultWriter?: Writer<string>,
jarLocation?: string,
) {
const uid = randomUUID();
const outputFile = "/tmp/rml-" + uid + "-output.ttl";

// Iterate over declared logical sources and organize them in a temporal location
for (let source of sources) {
const filename = source.location.split("/").pop();
source.newLocation = `/tmp/rml-${uid}-input-${randomUUID()}-${filename}`;
source.hasData = false;
source.trigger = !!source.trigger;
if (sources) {
for (let source of sources) {
const filename = source.location.split("/").pop();
source.newLocation = `/tmp/rml-${uid}-input-${randomUUID()}-${filename}`;
source.hasData = false;
source.trigger = !!source.trigger;
}
}

// Iterate over declared logical targets and organize them in a temporal location
for (let target of targets) {
const filename = target.location.split("/").pop();
target.newLocation = `/tmp/rml-${uid}-output-${randomUUID()}-${filename}`;
if (targets) {
for (let target of targets) {
const filename = target.location.split("/").pop();
target.newLocation = `/tmp/rml-${uid}-output-${randomUUID()}-${filename}`;
}
}

const jarFile = await getJarFile(jarLocation, false, RML_MAPPER_RELEASE);
Expand All @@ -77,31 +81,45 @@ export async function rmlMapper(
console.error("Could not map incoming rml input");
console.error(ex);
}
}).on("end", () => {
}).on("end", async () => {
// We assume mappings to be static and only proceed to execute them once we have them all
for (let source of sources) {
console.log("handling source", source.location);
// Process raw data input streams
source.dataInput.data(async (data) => {
console.log("Got data for ", source.location);
source.hasData = true;
await writeFile(source.newLocation, data);

if (sources.every((x) => x.hasData)) {
// We made sure that all declared logical sources are present
console.log("Start mapping now");
await executeMappings(
sources,
mappingLocations,
jarFile,
outputFile,
targets,
defaultWriter
);
} else {
console.error("Cannot start mapping, not all data has been received");
}
});
if (sources) {
for (let source of sources) {
console.log("handling source", source.location);
// Process raw data input streams
source.dataInput.data(async (data) => {
console.log("Got data for ", source.location);
source.hasData = true;
await writeFile(source.newLocation, data);

if (sources.every((x) => x.hasData)) {
// We made sure that all declared logical sources are present
console.log("Start mapping now");
await executeMappings(
mappingLocations,
jarFile,
outputFile,
sources,
targets,
defaultWriter
);
} else {
console.error("Cannot start mapping, not all data has been received");
}
});
}
} else {
// No declared logical sources means that raw data access is delegated to the RML engine.
// For example, as in the case of remote RDBs or HTTP APIs
console.log("Start mapping now");
await executeMappings(
mappingLocations,
jarFile,
outputFile,
sources,
targets,
defaultWriter
);
}
});

Expand All @@ -120,7 +138,7 @@ function randomUUID(length = 8) {
return result;
}

function transformMapping(input: string, sources: Source[], targets: Target[],) {
function transformMapping(input: string, sources?: Source[], targets?: Target[],) {
const quads = new Parser().parse(input);
const store = new Store(quads);

Expand All @@ -135,41 +153,43 @@ function transformMapping(input: string, sources: Source[], targets: Target[],)

const extractTarget = pred(RMLT.terms.target).one().then(targetLens2);

const targetLens = match(undefined, RDF.terms.type, RML.terms.LogicalTarget)
const targetLens = match(undefined, RDF.terms.type, RMLT.terms.LogicalTarget)
.thenAll(subject)
.thenAll(extractTarget);
const foundTargets = targetLens.execute(quads);

console.log("Found targets", foundTargets);
for (let foundTarget of foundTargets) {
let found = false;
for (let target of targets) {
if (target.location === foundTarget.target.value) {
console.log(
"Moving location",
foundTarget.target.value,
"to",
target.newLocation,
);
found = true;
// Remove the old location
store.removeQuad(
<Quad_Subject>foundTarget.subject,
<Quad_Predicate>VOID.terms.dataDump,
<Quad_Object>foundTarget.target,
);

// Add the new location
store.addQuad(
<Quad_Subject>foundTarget.subject,
<Quad_Predicate>VOID.terms.dataDump,
literal("file://" + target.newLocation),
);
break;
if (targets) {
let found = false;
for (let target of targets) {
if (target.location === foundTarget.target.value) {
console.log(
"Moving location",
foundTarget.target.value,
"to",
target.newLocation,
);
found = true;
// Remove the old location
store.removeQuad(
<Quad_Subject>foundTarget.subject,
<Quad_Predicate>VOID.terms.dataDump,
<Quad_Object>foundTarget.target,
);

// Add the new location
store.addQuad(
<Quad_Subject>foundTarget.subject,
<Quad_Predicate>VOID.terms.dataDump,
literal("file://" + target.newLocation),
);
break;
}
}
if (!found) {
throw `Logical source ${foundTarget.subject.value} has no configured source`;
}
}
if (!found) {
throw `Logical source ${foundTarget.subject.value} has no configured source`;
}
}

Expand All @@ -196,53 +216,57 @@ function transformMapping(input: string, sources: Source[], targets: Target[],)

// There exists a source that has no defined source, we cannot map this mapping
for (let foundSource of foundSources) {
let found = false;
for (let source of sources) {
if (source.location === foundSource.source) {
console.log(
"Moving location",
foundSource.source,
"to",
source.newLocation,
);
found = true;
// Remove the old location
store.removeQuad(
<Quad_Subject>foundSource.subject,
<Quad_Predicate>RML.terms.source,
literal(foundSource.source),
);

// Add the new location
store.addQuad(
<Quad_Subject>foundSource.subject,
<Quad_Predicate>RML.terms.source,
literal(source.newLocation),
);
break;
if (sources) {
let found = false;
for (let source of sources) {
if (source.location === foundSource.source) {
console.log(
"Moving location",
foundSource.source,
"to",
source.newLocation,
);
found = true;
// Remove the old location
store.removeQuad(
<Quad_Subject>foundSource.subject,
<Quad_Predicate>RML.terms.source,
literal(foundSource.source),
);

// Add the new location
store.addQuad(
<Quad_Subject>foundSource.subject,
<Quad_Predicate>RML.terms.source,
literal(source.newLocation),
);
break;
}
}
if (!found) {
throw `Logical source ${foundSource.subject.value} has no configured source (${foundSource.source}) channel!`;
}
}
if (!found) {
throw `Logical source ${foundSource.subject.value} has no configured source (${foundSource.source}) channel!`;
}
}

return new N3Writer().quadsToString(store.getQuads(null, null, null, null));
}

async function executeMappings(
sources: Source[],
mappingLocations: string[],
jarFile: string,
outputFile: string,
targets: Target[],
sources?: Source[],
targets?: Target[],
defaultWriter?: Writer<string>
) {
for (let source of sources) {
// Reset the hasData property so it requires new data before it can map again
// Useful when multiple sources need to update
if (source.trigger) {
source.hasData = false;
if (sources) {
for (let source of sources) {
// Reset the hasData property so it requires new data before it can map again
// Useful when multiple sources need to update
if (source.trigger) {
source.hasData = false;
}
}
}

Expand All @@ -262,9 +286,11 @@ async function executeMappings(

out += await readFile(outputFile, { encoding: "utf8" });

for (let target of targets) {
const file = await readFile(target.newLocation, { encoding: "utf8" });
await target.writer.push(file);
if (targets) {
for (let target of targets) {
const file = await readFile(target.newLocation, { encoding: "utf8" });
await target.writer.push(file);
}
}
console.log("Done", mappingFile);
}
Expand Down
Loading

0 comments on commit cab34a3

Please sign in to comment.