Skip to content

Commit

Permalink
Fix logic to handle multiple inputs and mappings asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 committed Feb 13, 2024
1 parent 8547674 commit 3454783
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 35 deletions.
61 changes: 35 additions & 26 deletions src/rml/rml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,31 @@ export async function rmlMapper(
console.error(ex);
}
}).on("end", async () => {
console.log("[rmlMapper processor]", "Received all mapping rules!");
// We assume mappings to be static and only proceed to execute them once we have them all
mappingsReady = true;
if (sources) {
for (const source of sources) {
// Process data that has already been pushed to the input stream
if (source.sourceBuffer && source.sourceBuffer.length > 0) {
await handleDataUpdate(
source.sourceBuffer.shift()!,
mappingsReady,
sources,
targets,
mappingLocations,
jarFile,
outputFile,
defaultWriter
);
if (sources.every(s => s.hasData)) {
// We made sure that all declared logical sources are present
console.log("[rmlMapper processor]", "Start mapping now (on end mappings event)");
await executeMappings(mappingLocations, jarFile, outputFile, defaultWriter, sources, targets);
// Check for buffered input updates
for (const source of sources) {
if (source.sourceBuffer && source.sourceBuffer.length > 0) {
const update = source.sourceBuffer.shift();
if (update) {
await handleDataUpdate(
update,
mappingsReady,
sources,
targets,
mappingLocations,
jarFile,
outputFile,
defaultWriter
);
}
}
}
}
} else {
Expand Down Expand Up @@ -277,9 +286,9 @@ async function handleDataUpdate(
const { source, data } = update;
console.log("[rmlMapper processor]", "Got data for", source.location);

if (!mappingsReady) {
if (source.hasData && !mappingsReady) {
// Mapping rules are still coming through or we already running a mapping process.
// Store this data update and process when ready.
// Buffer this data update and process it when ready.
if (!source.sourceBuffer) {
source.sourceBuffer = [];
}
Expand All @@ -289,7 +298,7 @@ async function handleDataUpdate(
source.hasData = true;
await writeFile(source.newLocation!, data);

if (sources.every((x) => x.hasData)) {
if (mappingsReady && sources.every((x) => x.hasData)) {
// We made sure that all declared logical sources are present
console.log("[rmlMapper processor]", "Start mapping now");
await executeMappings(
Expand All @@ -301,19 +310,19 @@ async function handleDataUpdate(
targets
);
// Process buffered input updates
if (source.sourceBuffer) {
while (source.sourceBuffer.length > 0) {
const update = source.sourceBuffer.shift();
for (const src of sources) {
if (src.sourceBuffer && src.sourceBuffer.length > 0) {
const update = src.sourceBuffer.shift();
if (update) {
console.log("[rmlMapper processor]", "Processing buffered input", update.source.location);
await handleDataUpdate(
update,
mappingsReady,
sources,
targets,
mappingLocations,
jarFile,
outputFile,
update,
mappingsReady,
sources,
targets,
mappingLocations,
jarFile,
outputFile,
defaultWriter
);
}
Expand Down
103 changes: 94 additions & 9 deletions test/rml.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
@prefix ex: <http://example.org/> .
`;

const RML_TM_LOCAL_SOURCE_AND_TARGET = `
const RML_TM_LOCAL_SOURCE_AND_TARGET = (source?: string) => {
return `
ex:map_test-mapping_000 a rr:TriplesMap ;
rdfs:label "test-mapping" ;
rml:logicalSource [
a rml:LogicalSource ;
rml:source "dataset/data.xml" ;
rml:source "${source || "dataset/data.xml"}" ;
rml:iterator "//data" ;
rml:referenceFormulation ql:XPath
] ;
Expand Down Expand Up @@ -72,7 +73,8 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
rr:termType rr:Literal
]
] .
`;
`;
}

const RML_TM_LOCAL_SOURCE_AND_LDES_TARGET = `
ex:map_test-mapping_000 a rr:TriplesMap ;
Expand Down Expand Up @@ -173,7 +175,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
rr:termType rr:Literal
]
] .
`;
`;
};

const RML_TM_REMOTE_SOURCE_AND_NO_TARGET = `
Expand Down Expand Up @@ -243,10 +245,17 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
</resource>
`;

const LOCAL_RAW_DATA_YET_ANOTHER_UPDATE = `
<resource>
<data id="001" label="yet some more new data"></data>
<data id="002" label="yet some other new data"></data>
</resource>
`;

test("Mapping process with declared logical source and target", async () => {
const rmlDoc = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET}
${RML_TM_LOCAL_SOURCE_AND_TARGET()}
`;
// Define function parameters
const mappingReader = new SimpleStream<string>();
Expand Down Expand Up @@ -366,7 +375,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
test("Mapping process with declared logical source data input arriving before mappings", async () => {
const rmlDoc = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET}
${RML_TM_LOCAL_SOURCE_AND_TARGET()}
`;
// Define function parameters
const mappingReader = new SimpleStream<string>();
Expand Down Expand Up @@ -420,6 +429,84 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
await mappingReader.end();
});

test("Mapping process with multiple declared logical sources data input arriving before mappings", async () => {
const rmlDoc1 = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET("dataset/data1.xml")}
`;
const rmlDoc2 = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET("dataset/data2.xml")}
`;
// Define function parameters
const mappingReader = new SimpleStream<string>();
const defaultOutputStream = new SimpleStream<string>();
const sourceInputStream1 = new SimpleStream<string>();
const sourceInputStream2 = new SimpleStream<string>();
const targetOutputStream = new SimpleStream<string>();
const sources: Source[] = [
{
location: "dataset/data1.xml",
dataInput: sourceInputStream1,
hasData: false,
trigger: true
},
{
location: "dataset/data2.xml",
dataInput: sourceInputStream2,
hasData: false,
trigger: true
}
];
const targets: Target[] = [
{
location: "file:///results/output.nq",
writer: targetOutputStream,
data: ""
}
];

// Check output
targetOutputStream.data((data: string) => {
const store = new Store();
store.addQuads(new Parser().parse(data));
expect(store.getQuads(null, null, null, null).length).toBe(4);
expect(store.getQuads(
"http://example.org/001",
RDF.type,
null,
"http://example.org/myNamedGraph").length
).toBe(1);
expect(store.getQuads(
"http://example.org/002",
RDFS.label,
null,
"http://example.org/myNamedGraph").length
).toBe(1);

});

// Execute function
await rmlMapper(mappingReader, defaultOutputStream, sources, targets, "/tmp/rmlMapper.jar");

// Push some data asynchronously
await Promise.all([
sourceInputStream1.push(LOCAL_RAW_DATA),
sourceInputStream2.push(LOCAL_RAW_DATA)
]);
// Push some mapping
await mappingReader.push(rmlDoc1);
// Push some more data
await sourceInputStream1.push(LOCAL_RAW_DATA_UPDATE);
await sourceInputStream2.push(LOCAL_RAW_DATA_UPDATE);
await sourceInputStream1.push(LOCAL_RAW_DATA_YET_ANOTHER_UPDATE);
await sourceInputStream2.push(LOCAL_RAW_DATA_YET_ANOTHER_UPDATE);
// Finish pushing mappings input data
await mappingReader.push(rmlDoc2);
await mappingReader.end();
await sleep(3000);
});

test("Mapping process without any declared logical sources and using default output", async () => {
const rmlDoc = `
${PREFIXES}
Expand Down Expand Up @@ -491,7 +578,7 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
test("Mapping process with declared and undeclared logical sources and targets", async () => {
const rmlDoc = `
${PREFIXES}
${RML_TM_LOCAL_SOURCE_AND_TARGET}
${RML_TM_LOCAL_SOURCE_AND_TARGET()}
${RML_TM_REMOTE_SOURCE_AND_NO_TARGET}
`;

Expand Down Expand Up @@ -665,10 +752,8 @@ describe("Functional tests for the rmlMapper Connector Architecture function", (
// Asynchronously push data updates
sourceInputStream1.push(LOCAL_RAW_DATA);
sourceInputStream2.push(LOCAL_RAW_DATA);
await sleep(1000);
sourceInputStream1.push(LOCAL_RAW_DATA_UPDATE);
await sourceInputStream2.push(LOCAL_RAW_DATA_UPDATE);
await sleep(3000);
});
});

Expand Down

0 comments on commit 3454783

Please sign in to comment.