Skip to content

Commit

Permalink
Handle case where default output is the same as a declared logical ta…
Browse files Browse the repository at this point in the history
…rget
  • Loading branch information
julianrojas87 committed Nov 30, 2023
1 parent edb1a60 commit 484bf5b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 12 deletions.
37 changes: 25 additions & 12 deletions src/rml/rml.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Stream, Writer } from "@ajuvercr/js-runner";
import { exec } from "child_process";
import { access, constants, readFile, writeFile } from "fs/promises";
import { access, constants, readFile, unlink, writeFile } from "fs/promises";
import {
DataFactory,
Parser,
Expand Down Expand Up @@ -99,7 +99,7 @@ export async function rmlMapper(

if (executing) {
// We are already running a mapping process.
// Store this date update and process when it is finished.
// Store this data update and process when it is finished.
console.log("[rmlMapper processor]", "Buffering input until previous mapping is finished");
sourceBuffer.push({ source, data });
} else {
Expand Down Expand Up @@ -219,7 +219,7 @@ function transformMapping(input: string, sources?: Source[], targets?: Target[],
}
}
}

// Extract logical Sources from incoming mapping
const extractSource = empty<Cont>()
.map(({ id }) => ({ subject: id }))
Expand Down Expand Up @@ -301,7 +301,7 @@ async function executeMappings(
}

let out = "";
if(targets) {
if (targets) {
// Initialize data holders of every declared target
targets.forEach(t => t.data = "");
}
Expand All @@ -323,25 +323,38 @@ async function executeMappings(
if (targets) {
for (let target of targets) {
try {
// Temporal logical target files will be created by the mapping process where it corresponds
await access(target.newLocation, constants.F_OK);
target.data += await readFile(target.newLocation, { encoding: "utf8" });
} catch (err) {
// Fallback to default output
out += await readFile(outputFile, { encoding: "utf8" });
}
// Delete the temporal file to prevent it causes generated data to be sent to a target
// where it does not belong in subsequent mapping executions.
await unlink(target.newLocation);
} catch (err) { /* There was no data meant for this target in this mapping */ }
}
} else {
out += await readFile(outputFile, { encoding: "utf8" });
}

try {
await access(outputFile);
out += await readFile(outputFile, { encoding: "utf8" });
await unlink(outputFile);
} catch (err) { /* There was no data meant for the default output in this mapping */ }

console.log("[rmlMapper processor]", "Done", mappingFile, `in ${new Date().getTime() - t0.getTime()} ms`);
}

console.log("[rmlMapper processor]", "All done");
if (targets) {
targets.forEach(async target => {
await target.writer.push(target.data);
// Account for the possibility that a declared target and the default target are the same.
// This is important to make sure that all produced triples/quads are pushed downstream together.
if (target.writer === defaultWriter) {
out += target.data;
} else {
await target.writer.push(target.data);
}
});
} else if (out !== "") {
}
if (out !== "") {
await defaultWriter.push(out);
}
}
Expand Down
63 changes: 63 additions & 0 deletions test/rml.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,69 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
await mappingReader.end();
});

test("Mapping process with declared and undeclared logical sources and targets", async () => {
const rmlDoc = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET}
${RML_TM_REMOTE_SOURCE_AND_NO_TARGET}
`;

// Define function parameters
const mappingReader = new SimpleStream<string>();
const sourceInputStream = new SimpleStream<string>();
const outputStream = new SimpleStream<string>();
const sources: Source[] = [
{
location: "dataset/data.xml",
newLocation: "",
dataInput: sourceInputStream,
hasData: false,
trigger: false
}
];

const targets: Target[] = [
{
location: "file:///results/output.nq",
newLocation: "",
writer: outputStream, // Here we are using the same stream as the default output
data: ""
}
];

// Check output
outputStream.data((data: string) => {
const store = new Store();
store.addQuads(new Parser().parse(data));
expect(store.getQuads(null, RDF.type, "http://example.org/Entity", null).length).toBeGreaterThan(0);
expect(store.getQuads(null, RDFS.label, null, null).length).toBeGreaterThan(0);
expect(store.getQuads(null, "http://example.org/name", null, null).length).toBeGreaterThan(0);
expect(store.getQuads(null, "http://example.org/availableBikes", null, null).length).toBeGreaterThan(0);
expect(store.getQuads(
"http://example.org/001",
RDF.type,
null,
"http://example.org/myNamedGraph").length
).toBe(1);
expect(store.getQuads(
"http://example.org/002",
RDFS.label,
null,
"http://example.org/myNamedGraph").length
).toBe(1);
});

// Execute function
await rmlMapper(mappingReader, outputStream, sources, targets, "/tmp/rmlMapper.jar");

// Push raw input data first
await sourceInputStream.push(LOCAL_RAW_DATA);

// Push mappings input data
await mappingReader.push(rmlDoc);
await mappingReader.end();
});

test("Mapping process with async input updates", async () => {
const rmlDoc = `
${PREFIXES}
Expand Down

0 comments on commit 484bf5b

Please sign in to comment.