Skip to content

Commit

Permalink
Complement CA processor (#17)
Browse files Browse the repository at this point in the history
* Implement logic for supporting before and after date filters.

* Add support for multiple shape descriptions

* Add tests for Connector Architecture processor.

* Fix gitignore

* Update dependencies

* Add connector architecture tests

* Use n3 DataFactory for SDS blank node generation. The one from rdf-data-factory crashes.

* Comment typos.

* Add build step to CI
  • Loading branch information
julianrojas87 authored Mar 11, 2024
1 parent 8b6da41 commit 7ccd41f
Show file tree
Hide file tree
Showing 21 changed files with 1,829 additions and 822 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ jobs:
- name: Install dependencies
run: npm install ; npm install -g bun ;

- name: Build
run: npm run build

- name: Run tests
run: npm test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
node_modules
dist/
save.json
30 changes: 26 additions & 4 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import { Writer } from "n3";
const program = new Command();
let paramURL: string = "";
let paramFollow: boolean = false;
let after: Date | undefined;
let before: Date | undefined;
let paramPollInterval: number;
let urlIsView = false;
let noShape = false;
let shapeFile: string | undefined;
let shapeFiles: string[] | undefined;
let ordered: Ordered = "none";
let quiet: boolean = false;
let verbose: boolean = false;
Expand All @@ -27,8 +29,10 @@ program
.default("none"),
)
.option("-f, --follow", "follow the LDES, the client stays in sync")
.option("--after <after>", "follow only relations including members after a certain point in time")
.option("--before <before>", "follow only relations including members before a certain point in time")
.option("--poll-interval <number>", "specify poll interval")
.option("--shape-file <shapefile>", "specify a shapefile")
.option("--shape-files [shapeFiles...]", "specify a shapefile")
.option(
"--no-shape",
"don't extract members with a shape (only use cbd and named graphs)",
Expand Down Expand Up @@ -56,14 +60,30 @@ program
noShape = !program.shape;
save = program.save;
paramURL = url;
shapeFile = program.shapeFile;
shapeFiles = program.shapeFiles;
paramFollow = program.follow;
paramPollInterval = program.pollInterval;
ordered = program.ordered;
quiet = program.quiet;
verbose = program.verbose;
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
after = new Date(program.after);
} else {
console.error(`--after ${program.after} is not a valid date`);
process.exit();
}
}
if (program.before) {
if (!isNaN(new Date(program.before).getTime())) {
before = new Date(program.before);
} else {
console.error(`--before ${program.before} is not a valid date`);
process.exit();
}
}
});

program.parse(process.argv);
Expand All @@ -80,8 +100,10 @@ async function main() {
pollInterval: paramPollInterval,
fetcher: { maxFetched: 2, concurrentRequests: 10 },
urlIsView: urlIsView,
shapeFile,
shapeFiles,
onlyDefaultGraph,
after,
before
}),
undefined,
undefined,
Expand Down
Binary file added bun.lockb
Binary file not shown.
141 changes: 93 additions & 48 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import rdfDereference, { RdfDereferencer } from "rdf-dereference";
import { FileStateFactory, NoStateFactory, State, StateFactory } from "./state";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { Writer as NWriter } from "n3";
import { DataFactory, Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import { LDES, SDS, TREE } from "@treecg/types";
import { extractMainNodeShape, getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import { LDES, SDS, SHACL, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy";
Expand All @@ -19,9 +18,8 @@ export { intoConfig } from "./config";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";

const df = new DataFactory();
const log = debug("client");
const { namedNode, blankNode, quad } = df;
const { namedNode, blankNode, quad } = DataFactory;

type Controller = ReadableStreamDefaultController<Member>;

Expand All @@ -41,7 +39,7 @@ export function replicateLDES(
}

export type LDESInfo = {
shape?: Term;
shapeMap?: Map<string, Term>;
extractor: CBDShapeExtractor;
timestampPath?: Term;
isVersionOfPath?: Term;
Expand All @@ -55,19 +53,39 @@ async function getInfo(
): Promise<LDESInfo> {
const logger = log.extend("getShape");

if (config.shapeFile) {
const shapeId = config.shapeFile.startsWith("http")
? config.shapeFile
: "file://" + config.shapeFile;
const shapeConfigStore = RdfStore.createDefault();
if (config.shapeFiles && config.shapeFiles.length > 0) {
config.shapes = [];

const resp = await rdfDereference.dereference(config.shapeFile, {
localFiles: true,
});
const quads = await streamToArray(resp.data);
config.shape = {
quads: quads,
shapeId: namedNode(shapeId),
};
for (const shapeFile of config.shapeFiles) {
const tempShapeStore = RdfStore.createDefault();
const shapeId = shapeFile.startsWith("http")
? shapeFile
: "file://" + shapeFile;

const resp = await rdfDereference.dereference(shapeFile, {
localFiles: true,
});
const quads = await streamToArray(resp.data);
// Add retrieved quads to local stores
quads.forEach(q => {
tempShapeStore.addQuad(q);
shapeConfigStore.addQuad(q);
});

if (shapeId.startsWith("file://")) {
// We have to find the actual IRI/Blank Node of the main shape within the file
config.shapes.push({
quads,
shapeId: extractMainNodeShape(tempShapeStore)
});
} else {
config.shapes.push({
quads: quads,
shapeId: namedNode(shapeId),
});
}
}
}

let shapeIds = config.noShape
Expand Down Expand Up @@ -107,11 +125,7 @@ async function getInfo(
timestampPaths.length,
isVersionOfPaths.length,
);
} catch (ex: any) {}
}

if (shapeIds.length > 1) {
console.error("Expected at most one shape id, found " + shapeIds.length);
} catch (ex: any) { }
}

if (timestampPaths.length > 1) {
Expand All @@ -126,22 +140,38 @@ async function getInfo(
);
}

let shapeConfigStore = RdfStore.createDefault();
if (config.shape) {
for (let quad of config.shape.quads) {
shapeConfigStore.addQuad(quad);
// Create a map of shapes and member types
const shapeMap = new Map<string, Term>();

if (config.shapes) {
for (const shape of config.shapes) {
const memberType = getObjects(shapeConfigStore, shape.shapeId, SHACL.terms.targetClass)[0];
if (memberType) {
shapeMap.set(memberType.value, shape.shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shape.shapeId);
}
}
} else {
for (const shapeId of shapeIds) {
const memberType = getObjects(store, shapeId, SHACL.terms.targetClass)[0];
if (memberType) {
shapeMap.set(memberType.value, shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shapeId);
}
}
}

return {
extractor: new CBDShapeExtractor(
config.shape ? shapeConfigStore : store,
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
cbdDefaultGraph: config.onlyDefaultGraph,
},
),
shape: config.shape ? config.shape.shapeId : shapeIds[0],
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
isVersionOfPath: isVersionOfPaths[0],
};
Expand Down Expand Up @@ -282,11 +312,22 @@ export class Client {
throw "Can only emit members in order, if LDES is configured with timestampPath";
}

this.fetcher = new Fetcher(this.dereferencer, this.config.loose);
this.fetcher = new Fetcher(this.dereferencer, this.config.loose, this.config.after, this.config.before);

const notifier: Notifier<StrategyEvents, {}> = {
fragment: () => this.emit("fragment", undefined),
member: (m) => {
// Check if member is within date constraints (if any)
if (this.config.before) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp > this.config.before) {
return;
}
}
if (this.config.after) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp < this.config.after) {
return;
}
}
emit(m);
},
pollCycle: () => {
Expand All @@ -302,22 +343,22 @@ export class Client {
this.strategy =
this.ordered !== "none"
? new OrderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
: new UnorderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);

logger("Found %d views, choosing %s", viewQuads.length, ldesId.value);
this.strategy.start(ldesId.value);
Expand Down Expand Up @@ -375,10 +416,12 @@ async function fetchPage(
export async function processor(
writer: Writer<string>,
url: string,
before?: Date,
after?: Date,
ordered?: string,
follow?: boolean,
pollInterval?: number,
shape?: string,
shapes?: string[],
noShape?: boolean,
save?: string,
loose?: boolean,
Expand All @@ -389,9 +432,11 @@ export async function processor(
intoConfig({
loose,
noShape,
shapeFile: shape,
shapeFiles: shapes,
polling: follow,
url: url,
after,
before,
stateFile: save,
follow,
pollInterval: pollInterval,
Expand Down
4 changes: 2 additions & 2 deletions lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export interface Config {
fetcher: FetcherConfig;
before?: Date;
after?: Date;
shape?: ShapeConfig;
shapeFile?: string;
shapes?: ShapeConfig[];
shapeFiles?: string[];
onlyDefaultGraph?: boolean;

// Add flag to indicate in order (default true)
Expand Down
Loading

0 comments on commit 7ccd41f

Please sign in to comment.