Skip to content

Commit

Permalink
actually use concurrent requests
Browse files Browse the repository at this point in the history
catch member errors

bump version
  • Loading branch information
ajuvercr committed Apr 18, 2024
1 parent 2ee33a6 commit a66a69f
Show file tree
Hide file tree
Showing 6 changed files with 1,250 additions and 650 deletions.
9 changes: 9 additions & 0 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ let verbose: boolean = false;
let save: string | undefined;
let onlyDefaultGraph: boolean = false;
let loose: boolean = false;
let concurrent: number = 5;
let basicAuth: string | undefined;

program
Expand Down Expand Up @@ -63,6 +64,11 @@ program
.option("-q --quiet", "be quiet")
.option("-v --verbose", "be verbose")
.option("--basic-auth <username>:<password>", "HTTP basic auth information")
.option(
"--concurrent <requests>",
"Allowed amount of concurrent HTTP request to the same domain",
"5",
)
.action((url: string, program) => {
urlIsView = program.urlIsView;
noShape = !program.shape;
Expand All @@ -77,6 +83,8 @@ program
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
basicAuth = program.basicAuth;
concurrent = parseInt(program.concurrent);

if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
after = new Date(program.after);
Expand Down Expand Up @@ -114,6 +122,7 @@ async function main() {
after,
before,
basicAuth,
concurrent,
}),
undefined,
undefined,
Expand Down
3 changes: 2 additions & 1 deletion lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface Config {
onlyDefaultGraph?: boolean;
fetch?: typeof fetch;
basicAuth?: string;
concurrent?: number;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
Expand Down Expand Up @@ -80,7 +81,7 @@ export function intoConfig(config: Partial<Config>): Config {

config.fetch = limit_fetch_per_domain(
retry_fetch(fetch_f, [408, 425, 429, 500, 502, 503, 504, 404]),
1,
config.concurrent || 5,
);
}

Expand Down
38 changes: 29 additions & 9 deletions lib/memberManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ export type ExtractedMember = {
member: Member;
};

export type ExtractError = {
type: "extract";
memberId: Term;
error: any;
};
export type Error = ExtractError;
export type MemberEvents = {
extracted: Member;
done: Member[];
error: Error;
};

export class Manager {
Expand Down Expand Up @@ -66,11 +73,18 @@ export class Manager {
return this.state.size;
}

private async extractMemberQuads(
member: Term,
data: RdfStore,
): Promise<Quad[]> {
return await this.extractor.extract(data, member, this.shapeId);
}

private async extractMember(
member: Term,
data: RdfStore,
): Promise<Member | undefined> {
const quads = await this.extractor.extract(data, member, this.shapeId);
const quads: Quad[] = await this.extractMemberQuads(member, data);

if (this.state.has(member.value)) {
return;
Expand Down Expand Up @@ -104,7 +118,6 @@ export class Manager {
)?.object.value;
}

// HEAD
return { id: member, quads, timestamp, isVersionOf };
}
}
Expand All @@ -120,16 +133,23 @@ export class Manager {

logger("%d members", members.length);

const promises: Promise<Member | undefined>[] = [];
const promises: Promise<Member | undefined | void>[] = [];

for (let member of members) {
if (!this.state.has(member.value)) {
const promise = this.extractMember(member, page.data).then((member) => {
if (member) {
notifier.extracted(member, state);
}
return member;
});
const promise = this.extractMember(member, page.data)
.then((member) => {
if (member) {
notifier.extracted(member, state);
}
return member;
})
.catch((ex) => {
notifier.error(
{ error: ex, type: "extract", memberId: member },
state,
);
});

promises.push(promise);
}
Expand Down
3 changes: 3 additions & 0 deletions lib/strategy/ordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ export class OrderedStrategy {
// - done: extracting is done, indicate this
// - extract: a member is extracted, add it to our heap
this.memberNotifer = {
error: (error) => {
this.notifier.error(error, {});
},
done: (_member, rel) => {
logger("Member done %s", rel.target);
const found = this.findOrDefault(rel);
Expand Down
23 changes: 19 additions & 4 deletions lib/strategy/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { Modulator, ModulatorFactory, Notifier } from "../utils";

import { StrategyEvents } from ".";

import debug from "debug";

const log = debug("strategy");

export class UnorderedStrategy {
private manager: Manager;
private fetcher: Fetcher;
Expand Down Expand Up @@ -36,6 +40,7 @@ export class UnorderedStrategy {
this.fetcher = fetcher;
this.polling = polling;

const fetchLogger = log.extend("fetch");
// Callbacks for the fetcher
// - seen: the strategy wanted to fetch an uri, but it was already seen
// so one fetch request is terminated, inFlight -= 1
Expand All @@ -44,25 +49,34 @@ export class UnorderedStrategy {
// - relationFound: a relation has been found, inFlight += 1 and put it in the queue
this.fetchNotifier = {
error: (error: any) => {
fetchLogger("error %o", error);
this.notifier.error(error, {});
},
scheduleFetch: (node: Node) => {
this.cacheList.push(node);
this.notifier.mutable({}, {});
},
pageFetched: (page, { index }) => this.handleFetched(page, index),
pageFetched: (page, { index }) => {
fetchLogger("Paged fetched %s", page.url);
this.handleFetched(page, index);
},
relationFound: ({ from, target }) => {
from.expected.push(target.node);
this.inFlight += 1;
this.modulator.push({ target: target.node, expected: [from.target] });
},
};

const memberLogger = log.extend("member");
// Callbacks for the member extractor
// - done: all members have been extracted, we are finally done with a page inFlight -= 1
// - extracted: a member has been found, yeet it
this.memberNotifier = {
error: error => {
this.notifier.error(error, {});
},
done: () => {
memberLogger("Members on page done");
this.inFlight -= 1;
this.checkEnd();
this.notifier.fragment({}, {});
Expand All @@ -77,13 +91,14 @@ export class UnorderedStrategy {
}

start(url: string) {
const logger = log.extend("start");
this.inFlight = this.modulator.length();
if (this.inFlight < 1) {
this.inFlight = 1;
this.modulator.push({ target: url, expected: [] });
console.log("Nothing in flight, adding start url");
logger("Nothing in flight, adding start url");
} else {
console.log("Things are already inflight, not adding start url");
logger("Things are already inflight, not adding start url");
}
}

Expand Down Expand Up @@ -112,7 +127,7 @@ export class UnorderedStrategy {
}
}, this.pollInterval || 1000);
} else {
console.log("Closing notifier");
log("Closing the notifier, polling is not set");
this.cancled = true;
this.notifier.close({}, {});
}
Expand Down
Loading

0 comments on commit a66a69f

Please sign in to comment.