Skip to content

Commit

Permalink
Merge pull request #3260 from ingef/feature/dont-create-subquery-if-n…
Browse files Browse the repository at this point in the history
…ot-included

Draft: hoping to avoid keeping SecondaryIdQuery subPlans, that are not needed
  • Loading branch information
awildturtok authored Feb 6, 2024
2 parents 2ffde21 + 650782c commit 70dc817
Show file tree
Hide file tree
Showing 65 changed files with 254 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.bakdata.conquery.models.query.QueryResolveContext;
import com.bakdata.conquery.models.query.RequiredEntities;
import com.bakdata.conquery.models.query.Visitable;
import com.bakdata.conquery.models.query.queryplan.ConceptQueryPlan;
import com.bakdata.conquery.models.query.queryplan.SecondaryIdQueryPlan;
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.resultinfo.SimpleResultInfo;
Expand Down Expand Up @@ -66,8 +67,9 @@ public class SecondaryIdQuery extends Query {

@Override
public SecondaryIdQueryPlan createQueryPlan(QueryPlanContext context) {
final ConceptQueryPlan queryPlan = query.createQueryPlan(context.withSelectedSecondaryId(secondaryId));

return new SecondaryIdQueryPlan(query, context, secondaryId, withSecondaryId, withoutSecondaryId, query.createQueryPlan(context.withSelectedSecondaryId(secondaryId)));
return new SecondaryIdQueryPlan(query, context, secondaryId, withSecondaryId, withoutSecondaryId, queryPlan, context.getSecondaryIdSubPlanRetention());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
getConfig().getQueries().getExecutionPool(),
() -> createInternalObjectMapper(View.Persistence.Shard.class),
() -> createInternalObjectMapper(View.InternalCommunication.class),
getConfig().getCluster().getEntityBucketSize()
getConfig().getCluster().getEntityBucketSize(),
getConfig().getQueries().getSecondaryIdSubPlanRetention()
);

final Collection<WorkerStorage> workerStorages = config.getStorage().discoverWorkerStorages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ public class QueryConfig {
private ThreadPoolDefinition executionPool = new ThreadPoolDefinition();

private Duration oldQueriesTime = Duration.days(30);

/**
* Limits how many subQuery-Plans should be cached between executions:
* This number limits how many sub-plans are cached per core so that outliers do not cause massive memory overhead.
*
* TODO Implement global limit of active secondaryId sub plans
*/
private int secondaryIdSubPlanRetention = 15;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void react(Worker worker) throws Exception {

// Before we start the query, we create it once to test if it will succeed before creating it multiple times for evaluation per core.
try {
query.createQueryPlan(new QueryPlanContext(worker));
query.createQueryPlan(new QueryPlanContext(worker, queryExecutor.getSecondaryIdSubPlanLimit()));
}
catch (Exception e) {
ConqueryError err = asConqueryError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void react(Worker worker) throws Exception {

// Before we start the query, we create it once to test if it will succeed before creating it multiple times for evaluation per core.
try {
query.createQueryPlan(new QueryPlanContext(worker));
query.createQueryPlan(new QueryPlanContext(worker, queryExecutor.getSecondaryIdSubPlanLimit()));
}
catch (Exception e) {
ConqueryError err = asConqueryError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@
import com.bakdata.conquery.models.query.results.ShardResult;
import com.bakdata.conquery.models.worker.Worker;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Data
public class QueryExecutor implements Closeable {

private final Worker worker;

private final ThreadPoolExecutor executor;

private final int secondaryIdSubPlanLimit;

private final Set<ManagedExecutionId> cancelledQueries = new HashSet<>();

public void unsetQueryCancelled(ManagedExecutionId query) {
Expand All @@ -52,7 +54,7 @@ public boolean isCancelled(ManagedExecutionId query) {

public boolean execute(Query query, QueryExecutionContext executionContext, ShardResult result, Set<Entity> entities) {

final ThreadLocal<QueryPlan<?>> plan = ThreadLocal.withInitial(() -> query.createQueryPlan(new QueryPlanContext(worker)));
final ThreadLocal<QueryPlan<?>> plan = ThreadLocal.withInitial(() -> query.createQueryPlan(new QueryPlanContext(worker, secondaryIdSubPlanLimit)));

if (entities.isEmpty()) {
log.warn("Entities for query are empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
import com.bakdata.conquery.models.worker.Worker;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.With;

@RequiredArgsConstructor @AllArgsConstructor @Getter @With
@Data @With
@AllArgsConstructor @RequiredArgsConstructor
public class QueryPlanContext {

@Getter(AccessLevel.NONE)
private final Worker worker;
private final int secondaryIdSubPlanRetention;

private CDateRange dateRestriction = CDateRange.all();


/**
* Set if in {@link com.bakdata.conquery.models.query.queryplan.SecondaryIdQueryPlan}, to the query-active {@link SecondaryIdDescriptionId}.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.bakdata.conquery.models.common.IRange;
import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator;
import com.bakdata.conquery.models.query.queryplan.filter.AggregationResultFilterNode;
import lombok.ToString;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public void init(QueryExecutionContext ctx, Entity entity) {
child.init(entity, ctx);
}

public void nextEvent(Bucket bucket, int event) {
getChild().acceptEvent(bucket, event);
public boolean nextEvent(Bucket bucket, int event) {
return getChild().acceptEvent(bucket, event);
}

protected SinglelineEntityResult createResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void init(Entity entity, QueryExecutionContext context) {
}

@Override
public void acceptEvent(Bucket bucket, int event) {
public void consumeEvent(Bucket bucket, int event) {
throw new UnsupportedOperationException("This Aggregator uses the result of its siblings and does not accept events");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public abstract class EventIterating {
public void collectRequiredTables(Set<Table> requiredTables) {
}

public Set<Table> collectRequiredTables() {
public final Set<Table> collectRequiredTables() {
Set<Table> out = new HashSet<>();
collectRequiredTables(out);
return out;
Expand All @@ -42,7 +42,12 @@ public void nextTable(QueryExecutionContext ctx, Table currentTable) {
public void nextBlock(Bucket bucket) {
}

public abstract void acceptEvent(Bucket bucket, int event);
/**
* Consume the event of the bucket.
*
* @implSpec If the event was not consumed, may return false. This can be used to discard intermediate results. Currently, only relevant for {@link SecondaryIdQueryPlan}, where subplans are discarded if no event was consumed.
*/
public abstract boolean acceptEvent(Bucket bucket, int event);


public boolean isOfInterest(Bucket bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void nextTable(QueryExecutionContext ctx, Table currentTable) {
}

@Override
public abstract void acceptEvent(Bucket bucket, int event);
public abstract boolean acceptEvent(Bucket bucket, int event);

public abstract boolean isContained();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ public boolean isOfInterest(Entity entity) {
}

@Override
public void acceptEvent(Bucket bucket, int event) {
public boolean acceptEvent(Bucket bucket, int event) {
boolean consumed = false;
for (QPNode currentTableChild : currentTableChildren) {
currentTableChild.acceptEvent(bucket, event);
consumed |= currentTableChild.acceptEvent(bucket, event);
}

return consumed;
}

@Override
Expand Down
Loading

0 comments on commit 70dc817

Please sign in to comment.