Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complement CA processor #17

Merged
merged 7 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ jobs:
- name: Install dependencies
run: npm install ; npm install -g bun ;

- name: Build
run: npm run build

- name: Run tests
run: npm test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
node_modules
dist/
save.json
30 changes: 26 additions & 4 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import { Writer } from "n3";
const program = new Command();
let paramURL: string = "";
let paramFollow: boolean = false;
let after: Date | undefined;
let before: Date | undefined;
let paramPollInterval: number;
let urlIsView = false;
let noShape = false;
let shapeFile: string | undefined;
let shapeFiles: string[] | undefined;
let ordered: Ordered = "none";
let quiet: boolean = false;
let verbose: boolean = false;
Expand All @@ -27,8 +29,10 @@ program
.default("none"),
)
.option("-f, --follow", "follow the LDES, the client stays in sync")
.option("--after <after>", "follow only relations including members after a certain point in time")
.option("--before <before>", "follow only relations including members before a certain point in time")
.option("--poll-interval <number>", "specify poll interval")
.option("--shape-file <shapefile>", "specify a shapefile")
.option("--shape-files [shapeFiles...]", "specify a shapefile")
.option(
"--no-shape",
"don't extract members with a shape (only use cbd and named graphs)",
Expand Down Expand Up @@ -56,14 +60,30 @@ program
noShape = !program.shape;
save = program.save;
paramURL = url;
shapeFile = program.shapeFile;
shapeFiles = program.shapeFiles;
paramFollow = program.follow;
paramPollInterval = program.pollInterval;
ordered = program.ordered;
quiet = program.quiet;
verbose = program.verbose;
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
after = new Date(program.after);
} else {
console.error(`--after ${program.after} is not a valid date`);
process.exit();
}
}
if (program.before) {
if (!isNaN(new Date(program.before).getTime())) {
before = new Date(program.before);
} else {
console.error(`--before ${program.before} is not a valid date`);
process.exit();
}
}
});

program.parse(process.argv);
Expand All @@ -80,8 +100,10 @@ async function main() {
pollInterval: paramPollInterval,
fetcher: { maxFetched: 2, concurrentRequests: 10 },
urlIsView: urlIsView,
shapeFile,
shapeFiles,
onlyDefaultGraph,
after,
before
}),
undefined,
undefined,
Expand Down
Binary file added bun.lockb
Binary file not shown.
141 changes: 93 additions & 48 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import rdfDereference, { RdfDereferencer } from "rdf-dereference";
import { FileStateFactory, NoStateFactory, State, StateFactory } from "./state";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { Writer as NWriter } from "n3";
import { DataFactory, Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import { LDES, SDS, TREE } from "@treecg/types";
import { extractMainNodeShape, getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import { LDES, SDS, SHACL, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy";
Expand All @@ -19,9 +18,8 @@ export { intoConfig } from "./config";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";

const df = new DataFactory();
const log = debug("client");
const { namedNode, blankNode, quad } = df;
const { namedNode, blankNode, quad } = DataFactory;

type Controller = ReadableStreamDefaultController<Member>;

Expand All @@ -41,7 +39,7 @@ export function replicateLDES(
}

export type LDESInfo = {
shape?: Term;
shapeMap?: Map<string, Term>;
extractor: CBDShapeExtractor;
timestampPath?: Term;
isVersionOfPath?: Term;
Expand All @@ -55,19 +53,39 @@ async function getInfo(
): Promise<LDESInfo> {
const logger = log.extend("getShape");

if (config.shapeFile) {
const shapeId = config.shapeFile.startsWith("http")
? config.shapeFile
: "file://" + config.shapeFile;
const shapeConfigStore = RdfStore.createDefault();
if (config.shapeFiles && config.shapeFiles.length > 0) {
config.shapes = [];

const resp = await rdfDereference.dereference(config.shapeFile, {
localFiles: true,
});
const quads = await streamToArray(resp.data);
config.shape = {
quads: quads,
shapeId: namedNode(shapeId),
};
for (const shapeFile of config.shapeFiles) {
const tempShapeStore = RdfStore.createDefault();
const shapeId = shapeFile.startsWith("http")
? shapeFile
: "file://" + shapeFile;

const resp = await rdfDereference.dereference(shapeFile, {
localFiles: true,
});
const quads = await streamToArray(resp.data);
// Add retrieved quads to local stores
quads.forEach(q => {
tempShapeStore.addQuad(q);
shapeConfigStore.addQuad(q);
});

if (shapeId.startsWith("file://")) {
// We have to find the actual IRI/Blank Node of the main shape within the file
config.shapes.push({
quads,
shapeId: extractMainNodeShape(tempShapeStore)
});
} else {
config.shapes.push({
quads: quads,
shapeId: namedNode(shapeId),
});
}
}
}

let shapeIds = config.noShape
Expand Down Expand Up @@ -107,11 +125,7 @@ async function getInfo(
timestampPaths.length,
isVersionOfPaths.length,
);
} catch (ex: any) {}
}

if (shapeIds.length > 1) {
console.error("Expected at most one shape id, found " + shapeIds.length);
} catch (ex: any) { }
}

if (timestampPaths.length > 1) {
Expand All @@ -126,22 +140,38 @@ async function getInfo(
);
}

let shapeConfigStore = RdfStore.createDefault();
if (config.shape) {
for (let quad of config.shape.quads) {
shapeConfigStore.addQuad(quad);
// Create a map of shapes and member types
const shapeMap = new Map<string, Term>();

if (config.shapes) {
for (const shape of config.shapes) {
const memberType = getObjects(shapeConfigStore, shape.shapeId, SHACL.terms.targetClass)[0];
if (memberType) {
shapeMap.set(memberType.value, shape.shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shape.shapeId);
}
}
} else {
for (const shapeId of shapeIds) {
const memberType = getObjects(store, shapeId, SHACL.terms.targetClass)[0];
if (memberType) {
shapeMap.set(memberType.value, shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shapeId);
}
}
}

return {
extractor: new CBDShapeExtractor(
config.shape ? shapeConfigStore : store,
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
cbdDefaultGraph: config.onlyDefaultGraph,
},
),
shape: config.shape ? config.shape.shapeId : shapeIds[0],
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
isVersionOfPath: isVersionOfPaths[0],
};
Expand Down Expand Up @@ -282,11 +312,22 @@ export class Client {
throw "Can only emit members in order, if LDES is configured with timestampPath";
}

this.fetcher = new Fetcher(this.dereferencer, this.config.loose);
this.fetcher = new Fetcher(this.dereferencer, this.config.loose, this.config.after, this.config.before);

const notifier: Notifier<StrategyEvents, {}> = {
fragment: () => this.emit("fragment", undefined),
member: (m) => {
// Check if member is within date constraints (if any)
if (this.config.before) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp > this.config.before) {
return;
}
}
if (this.config.after) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp < this.config.after) {
return;
}
}
emit(m);
},
pollCycle: () => {
Expand All @@ -302,22 +343,22 @@ export class Client {
this.strategy =
this.ordered !== "none"
? new OrderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
: new UnorderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);

logger("Found %d views, choosing %s", viewQuads.length, ldesId.value);
this.strategy.start(ldesId.value);
Expand Down Expand Up @@ -375,10 +416,12 @@ async function fetchPage(
export async function processor(
writer: Writer<string>,
url: string,
before?: Date,
after?: Date,
ordered?: string,
follow?: boolean,
pollInterval?: number,
shape?: string,
shapes?: string[],
noShape?: boolean,
save?: string,
loose?: boolean,
Expand All @@ -389,9 +432,11 @@ export async function processor(
intoConfig({
loose,
noShape,
shapeFile: shape,
shapeFiles: shapes,
polling: follow,
url: url,
after,
before,
stateFile: save,
follow,
pollInterval: pollInterval,
Expand Down
4 changes: 2 additions & 2 deletions lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export interface Config {
fetcher: FetcherConfig;
before?: Date;
after?: Date;
shape?: ShapeConfig;
shapeFile?: string;
shapes?: ShapeConfig[];
shapeFiles?: string[];
onlyDefaultGraph?: boolean;

// Add flag to indicate in order (default true)
Expand Down
Loading
Loading