Skip to content

Commit

Permalink
Destroy cloned join streams, disable by default
Browse files Browse the repository at this point in the history
  • Loading branch information
surilindur committed Oct 31, 2023
1 parent 07fb33d commit e7df471
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
},
"overrideParameters": {
"@type": "ActorRdfJoinInnerMultiAdaptiveHeuristics",
"useTimeout": false,
"useCardinality": true,
"allowUnlimitedRestarts": true,
"mediatorHashBindings": {
"@id": "urn:comunica:default:hash-bindings/mediators#main"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ActorRdfJoinInnerMultiAdaptiveDestroy } from '@comunica/actor-rdf-join-inner-multi-adaptive-destroy';
import type { IActorRdfJoinInnerMultiAdaptiveDestroyArgs } from '@comunica/actor-rdf-join-inner-multi-adaptive-destroy';
import type { MediatorHashBindings } from '@comunica/bus-hash-bindings';
import { ClosableTransformIterator } from '@comunica/bus-query-operation';
import type { IActionRdfJoin, IActorRdfJoinOutputInner } from '@comunica/bus-rdf-join';
import type { MediatorRdfJoinEntriesSort } from '@comunica/bus-rdf-join-entries-sort';
import { KeysRdfJoin } from '@comunica/context-entries-link-traversal';
Expand Down Expand Up @@ -69,7 +70,7 @@ export class ActorRdfJoinInnerMultiAdaptiveHeuristics extends ActorRdfJoinInnerM
addMetadataInvalidationListener(updatedMetadata);
if (bindingsStreamAdaptive && metadata.cardinality.value !== updatedMetadata.cardinality.value) {
const updatedJoinOrder = await this.getSortedJoinEntries(action);
if (updatedJoinOrder.some((ent, index) => currentJoinOrder[index].operation !== ent.operation)) {
if (updatedJoinOrder.some((je, index) => currentJoinOrder[index].operation !== je.operation)) {
currentJoinOrder = updatedJoinOrder;
bindingsStreamAdaptive.swapSource();
}
Expand Down Expand Up @@ -145,13 +146,21 @@ export class ActorRdfJoinInnerMultiAdaptiveHeuristics extends ActorRdfJoinInnerM
}

protected cloneEntries(entries: IJoinEntry[]): IJoinEntry[] {
return entries.map(entry => ({
operation: entry.operation,
output: {
...entry.output,
bindingsStream: entry.output.bindingsStream.clone(),
},
}));
return entries.map(entry => {
const clonedBindingsStream = entry.output.bindingsStream.clone();
return {
operation: entry.operation,
output: {
...entry.output,
bindingsStream: new ClosableTransformIterator(clonedBindingsStream, {
autoStart: false,
onClose() {
clonedBindingsStream.destroy();
},
}),
},
};
});
}
}

Expand All @@ -166,7 +175,7 @@ export interface IActorRdfJoinInnerMultiAdaptiveHeuristicsArgs extends IActorRdf
mediatorJoinEntriesSort: MediatorRdfJoinEntriesSort;
/**
* Whether the join should be restarted after metadata cardinality value changes.
* @default {true}
* @default {false}
*/
useCardinality: boolean;
/**
Expand All @@ -176,7 +185,7 @@ export interface IActorRdfJoinInnerMultiAdaptiveHeuristicsArgs extends IActorRdf
useTimeout: boolean;
/**
* Whether the actor should swap join order an unlimited number of times.
* @default {true}
* @default {false}
*/
allowUnlimitedRestarts: boolean;
/**
Expand Down

0 comments on commit e7df471

Please sign in to comment.