Skip to content

Commit

Permalink
Use bindings hash bus, reimplement timeout feature
Browse files Browse the repository at this point in the history
  • Loading branch information
surilindur committed Oct 23, 2023
1 parent 0b0e8c9 commit bb270fd
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 155 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"https://linkedsoftwaredependencies.org/bundles/npm/@comunica/config-query-sparql-components/^0.0.0/components/context.jsonld"
],
"import": [
"ccqsc:config/config-default-swap.json"
"ccqsc:config/config-base-adaptive.json",
"ccqsc:config/rdf-metadata-extract/actors.json",
"ccqsc:config/rdf-metadata-accumulate/actors-predicate-count.json",
"ccqsc:config/rdf-join/actors.json"
]
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
},
"overrideParameters": {
"@type": "ActorRdfJoinInnerMultiAdaptiveHeuristics",
"mediatorHashBindings": {
"@id": "urn:comunica:default:hash-bindings/mediators#main"
},
"mediatorJoinEntriesSort": {
"@id": "urn:comunica:default:rdf-join-entries-sort/mediators#main"
},
Expand All @@ -20,8 +23,6 @@
"mediatorJoin": {
"@id": "urn:comunica:default:rdf-join/mediators#main"
},
"swap": true,
"swapOnce": true,
"beforeActors": [
{
"@id": "urn:comunica:default:rdf-join/actors#inner-multi-bind"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ After installing, this package can be added to your engine's configuration as fo
{
"@id": "urn:comunica:default:rdf-join/actors#inner-multi-adaptive-heuristics",
"@type": "ActorRdfJoinInnerMultiAdaptiveHeuristics",
"mediatorHashBindings": { "@id": "urn:comunica:default:hash-bindings/mediators#main" },
"mediatorJoinEntriesSort": { "@id": "urn:comunica:default:rdf-join-entries-sort/mediators#main" },
"mediatorJoinSelectivity": { "@id": "urn:comunica:default:rdf-join-selectivity/mediators#main" },
"mediatorJoin": { "@id": "urn:comunica:default:rdf-join/mediators#main" }
Expand All @@ -39,9 +40,13 @@ After installing, this package can be added to your engine's configuration as fo

### Config Parameters

* `mediatorHashBindings`: A mediator over the [Hash Bindings bus](https://github.com/comunica/comunica/tree/master/packages/bus-hash-bindings).
* `mediatorJoinEntriesSort`: A mediator over the [RDF Join Entries Sort bus](https://github.com/comunica/comunica/tree/master/packages/bus-rdf-join-entries-sort).
* `mediatorJoinSelectivity`: A mediator over the [RDF Join Selectivity bus](https://github.com/comunica/comunica/tree/master/packages/bus-rdf-join-selectivity).
* `mediatorJoin`: A mediator over the [RDF Join bus](https://github.com/comunica/comunica/tree/master/packages/bus-rdf-join).
* `cardinalityThreshold`: Absolute value threshold for metadata cardinality value change before restarting join.
* `cardinalityThresholdMultiplier`: Multiplier/divisor threshold for metadata cardinality value change before restarting join.
* `allowOnlyOnce`: Whether the join should only be restarted once, and not an unlimited number of times.
* `swapOnlyOnce`: Whether the join should only be restarted once, and not an unlimited number of times.
* `swapOnCardinalityChange`: Whether the join should be restarted on cardinality changes.
* `swapOnTimeout`: Whether the join should be restarted on a timeout.
* `timeout`: The timeout value on milliseconds for restarting the join.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ActorRdfJoinInnerMultiAdaptiveDestroy } from '@comunica/actor-rdf-join-inner-multi-adaptive-destroy';
import type { IActorRdfJoinInnerMultiAdaptiveDestroyArgs } from '@comunica/actor-rdf-join-inner-multi-adaptive-destroy';
import type { MediatorHashBindings } from '@comunica/bus-hash-bindings';
import type { IActionRdfJoin, IActorRdfJoinOutputInner } from '@comunica/bus-rdf-join';
import type { MediatorRdfJoinEntriesSort } from '@comunica/bus-rdf-join-entries-sort';
import { KeysRdfJoin } from '@comunica/context-entries-link-traversal';
Expand All @@ -12,19 +13,25 @@ import { BindingsStreamAdaptiveHeuristics } from './BindingsStreamAdaptiveHeuris
export class ActorRdfJoinInnerMultiAdaptiveHeuristics extends ActorRdfJoinInnerMultiAdaptiveDestroy {
protected readonly cardinalityThreshold: number;
protected readonly cardinalityThresholdMultiplier: number;
protected readonly swapOnce: boolean;
protected readonly swapOnlyOnce: boolean;
protected readonly swapOnTimeout: boolean;
protected readonly swapOnCardinalityChange: boolean;

protected readonly mediatorHashBindings: MediatorHashBindings;
protected readonly mediatorJoinEntriesSort: MediatorRdfJoinEntriesSort;

protected disableJoinRestart: boolean;

public constructor(args: IActorRdfJoinInnerMultiAdaptiveHeuristicsArgs) {
super(args);
this.swapOnce = args.swapOnce;
this.swapOnlyOnce = args.swapOnlyOnce;
this.swapOnTimeout = args.swapOnTimeout;
this.swapOnCardinalityChange = args.swapOnCardinalityChange;
this.cardinalityThreshold = args.cardinalityThreshold;
this.cardinalityThresholdMultiplier = args.cardinalityThresholdMultiplier;
this.mediatorHashBindings = args.mediatorHashBindings;
this.mediatorJoinEntriesSort = args.mediatorJoinEntriesSort;
this.disableJoinRestart = !args.swap;
this.disableJoinRestart = !args.swapOnTimeout && !args.swapOnCardinalityChange;
}

protected async getOutput(action: IActionRdfJoin): Promise<IActorRdfJoinOutputInner> {
Expand All @@ -49,59 +56,64 @@ export class ActorRdfJoinInnerMultiAdaptiveHeuristics extends ActorRdfJoinInnerM
})).entries;
};

const mediatorHashBindingsResult = await this.mediatorHashBindings.mediate({
context: action.context,
// Hash collisions should not happen at all, but there is no actor like that available
allowHashCollisions: true,
});

const currentJoinOrder: IJoinEntryWithMetadata[] = await getUpdatedJoinOrder();

const entriesWithInvalidationListener = action.entries.map(entry => {
const addMetadataInvalidationListener = (metadata: MetadataBindings): void => {
if (!this.disableJoinRestart) {
const handleInvalidationEvent = async(): Promise<void> => {
if (!this.disableJoinRestart) {
const updatedMetadata: MetadataBindings = await entry.output.metadata();
addMetadataInvalidationListener(updatedMetadata);
if (bindingsStreamAdaptive && this.cardinalityChangeMeetsThreshold(metadata, updatedMetadata)) {
if (this.swapOnce && !this.disableJoinRestart) {
this.disableJoinRestart = true;
}
const updatedJoinOrder = await getUpdatedJoinOrder();
if (updatedJoinOrder.some((ent, index) => currentJoinOrder[index].operation !== ent.operation)) {
const success = bindingsStreamAdaptive.swapSource();
if (success) {
// eslint-disable-next-line no-console
console.log(`Swapped source order due to cardinality estimate change ${metadata.cardinality.value} -> ${updatedMetadata.cardinality.value}`);
const entries = this.swapOnCardinalityChange ?
action.entries.map(entry => {
const addMetadataInvalidationListener = (metadata: MetadataBindings): void => {
if (!this.disableJoinRestart) {
const handleInvalidationEvent = async(): Promise<void> => {
if (!this.disableJoinRestart) {
const updatedMetadata: MetadataBindings = await entry.output.metadata();
addMetadataInvalidationListener(updatedMetadata);
if (bindingsStreamAdaptive && this.cardinalityChangeMeetsThreshold(metadata, updatedMetadata)) {
if (this.swapOnlyOnce && !this.disableJoinRestart) {
this.disableJoinRestart = true;
}
const updatedJoinOrder = await getUpdatedJoinOrder();
if (updatedJoinOrder.some((ent, index) => currentJoinOrder[index].operation !== ent.operation)) {
bindingsStreamAdaptive.swapSource();
}
}
}
}
};
metadata.state.addInvalidateListener(() => setImmediate(handleInvalidationEvent));
}
};
entry.output.metadata().then(addMetadataInvalidationListener).catch(error => {
throw new Error(error);
});
return entry;
});
};
metadata.state.addInvalidateListener(() => setImmediate(handleInvalidationEvent));
}
};
entry.output.metadata().then(addMetadataInvalidationListener).catch(error => {
throw new Error(error);
});
return entry;
}) :
action.entries;

// Execute the join with the metadata we have now
const firstOutput = await this.mediatorJoin.mediate({
type: action.type,
entries: this.cloneEntries(entriesWithInvalidationListener, false),
entries: this.cloneEntries(entries, false),
context: subContext,
});

const createSource = async(): Promise<BindingsStream> => {
const joinResult = await this.mediatorJoin.mediate({
type: action.type,
entries: this.cloneEntries(entriesWithInvalidationListener, false),
entries: this.cloneEntries(entries, false),
context: subContext,
});
return joinResult.bindingsStream;
};

bindingsStreamAdaptive = new BindingsStreamAdaptiveHeuristics(
firstOutput.bindingsStream,
{ timeout: this.timeout, autoStart: false },
{ timeout: this.swapOnTimeout ? this.timeout : undefined, autoStart: false },
createSource,
mediatorHashBindingsResult.hashFunction,
);

return {
Expand All @@ -127,6 +139,10 @@ export class ActorRdfJoinInnerMultiAdaptiveHeuristics extends ActorRdfJoinInnerM
}

export interface IActorRdfJoinInnerMultiAdaptiveHeuristicsArgs extends IActorRdfJoinInnerMultiAdaptiveDestroyArgs {
/**
* A mediator over the RDF Bindings Hash bus
*/
mediatorHashBindings: MediatorHashBindings;
/**
* A mediator over the RDF Join Entries Sort bus
*/
Expand All @@ -143,15 +159,25 @@ export interface IActorRdfJoinInnerMultiAdaptiveHeuristicsArgs extends IActorRdf
* @default {10}
*/
cardinalityThresholdMultiplier: number;
/**
* Whether the actor should swap join order or not. When set to false, the order will never be changed.
* @default {true}
*/
swap: boolean;
/**
* Whether the actor should swap join order only once. When set to false, the actor will change the join
* order every time the change makes sense after metadata update, an unlimited number of times.
* @default {false}
*/
swapOnce: boolean;
swapOnlyOnce: boolean;
/**
* Whether to swap after the timeout is reached.
* @default {false}
*/
swapOnTimeout: boolean;
/**
* Whether to swap when the cardinality change is significant enough.
* @default {true}
*/
swapOnCardinalityChange: boolean;
/**
* The timeout, in milliseconds, to use for swapping join order.
* @default {1000}
*/
timeout: number;
}
Loading

0 comments on commit bb270fd

Please sign in to comment.