Skip to content

Commit

Permalink
Add cluster level reduction (#117731)
Browse files Browse the repository at this point in the history
This change introduces cluster-level reduction. Unlike data-node-level 
reduction, it does not require pragmas because the network latency and
throughput across clusters differ significantly from those within a
cluster. As a result, the benefits of this reduction should outweigh the
risks.
  • Loading branch information
dnhatn authored Dec 3, 2024
1 parent 12be820 commit af7d3f9
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 67 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117731.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117731
summary: Add cluster level reduction
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,41 @@ public void testSameRemoteClusters() throws Exception {
}
}
}

public void testTasks() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
request.pragmas(randomPragmas());
ActionFuture<EsqlQueryResponse> requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
try {
assertBusy(() -> {
List<TaskInfo> clusterTasks = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(ComputeService.CLUSTER_ACTION_NAME)
.get()
.getTasks();
assertThat(clusterTasks.size(), equalTo(1));
List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setTargetParentTaskId(clusterTasks.getFirst().taskId())
.setActions(DriverTaskRunner.ACTION_NAME)
.setDetailed(true)
.get()
.getTasks();
assertThat(drivers.size(), equalTo(1));
TaskInfo driver = drivers.getFirst();
assertThat(driver.description(), equalTo("""
\\_ExchangeSourceOperator[]
\\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]
\\_ExchangeSinkOperator"""));
});
} finally {
PauseFieldPlugin.allowEmitting.countDown();
}
requestFuture.actionGet(30, TimeUnit.SECONDS).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,16 @@
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.session.Configuration;
Expand Down Expand Up @@ -83,29 +74,25 @@ public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDa
return new Tuple<>(coordinatorPlan, dataNodePlan.get());
}

public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan unused) {
var pipelineBreakers = plan.collectFirstChildren(Mapper::isPipelineBreaker);
public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
// find the logical fragment
var fragments = plan.collectFirstChildren(p -> p instanceof FragmentExec);
if (fragments.isEmpty()) {
return null;
}
final FragmentExec fragment = (FragmentExec) fragments.getFirst();

if (pipelineBreakers.isEmpty() == false) {
UnaryPlan pipelineBreaker = (UnaryPlan) pipelineBreakers.get(0);
if (pipelineBreaker instanceof TopN) {
LocalMapper mapper = new LocalMapper();
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
return physicalPlan.collectFirstChildren(TopNExec.class::isInstance).get(0);
} else if (pipelineBreaker instanceof Limit limit) {
return new LimitExec(limit.source(), unused, limit.limit());
} else if (pipelineBreaker instanceof OrderBy order) {
return new OrderExec(order.source(), unused, order.order());
} else if (pipelineBreaker instanceof Aggregate) {
LocalMapper mapper = new LocalMapper();
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
var aggregate = (AggregateExec) physicalPlan.collectFirstChildren(AggregateExec.class::isInstance).get(0);
return aggregate.withMode(AggregatorMode.INITIAL);
} else {
throw new EsqlIllegalArgumentException("unsupported unary physical plan node [" + pipelineBreaker.nodeName() + "]");
}
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
if (pipelineBreakers.isEmpty()) {
return null;
}
final var pipelineBreaker = pipelineBreakers.getFirst();
final LocalMapper mapper = new LocalMapper();
PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
if (reducePlan instanceof AggregateExec agg) {
reducePlan = agg.withMode(AggregatorMode.INITIAL); // force to emit intermediate outputs
}
return null;
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,10 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
Expand Down Expand Up @@ -780,35 +778,24 @@ private void runComputeOnDataNode(
}
}

private static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {
PhysicalPlan reducePlan = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
if (enable) {
PhysicalPlan p = PlannerUtils.reductionPlan(plan);
if (p != null) {
reducePlan = p.replaceChildren(List.of(reducePlan));
}
}
return new ExchangeSinkExec(plan.source(), plan.output(), plan.isIntermediateAgg(), reducePlan);
}

private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
@Override
public void messageReceived(DataNodeRequest request, TransportChannel channel, Task task) {
final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
final ExchangeSinkExec reducePlan;
final PhysicalPlan reductionPlan;
if (request.plan() instanceof ExchangeSinkExec plan) {
var fragments = plan.collectFirstChildren(FragmentExec.class::isInstance);
if (fragments.isEmpty()) {
listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
return;
}
var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
if (request.pragmas().nodeLevelReduction()) {
PhysicalPlan dataNodePlan = request.plan();
request.plan()
.forEachUp(
FragmentExec.class,
f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
);
}
reducePlan = new ExchangeSinkExec(
plan.source(),
plan.output(),
plan.isIntermediateAgg(),
reducePlanHolder.get() != null
? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
: localExchangeSource
);
reductionPlan = reductionPlan(plan, request.pragmas().nodeLevelReduction());
} else {
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
return;
Expand All @@ -825,7 +812,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
request.indicesOptions()
);
try (var computeListener = ComputeListener.create(transportService, (CancellableTask) task, listener)) {
runComputeOnDataNode((CancellableTask) task, sessionId, reducePlan, request, computeListener);
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, computeListener);
}
}
}
Expand Down Expand Up @@ -871,10 +858,10 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
* Performs a compute on a remote cluster. The output pages are placed in an exchange sink specified by
* {@code globalSessionId}. The coordinator on the main cluster will poll pages from there.
* <p>
* Currently, the coordinator on the remote cluster simply collects pages from data nodes in the remote cluster
* and places them in the exchange sink. We can achieve this by using a single exchange buffer to minimize overhead.
* However, here we use two exchange buffers so that we can run an actual plan on this coordinator to perform partial
* reduce operations, such as limit, topN, and partial-to-partial aggregation in the future.
* Currently, the coordinator on the remote cluster polls pages from data nodes within the remote cluster
* and performs cluster-level reduction before sending pages to the querying cluster. This reduction aims
* to minimize data transfers across clusters but may require additional CPU resources for operations like
* aggregations.
*/
void runComputeOnRemoteCluster(
String clusterAlias,
Expand All @@ -892,19 +879,14 @@ void runComputeOnRemoteCluster(
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
final String localSessionId = clusterAlias + ":" + globalSessionId;
final PhysicalPlan coordinatorPlan = reductionPlan(plan, true);
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
PhysicalPlan coordinatorPlan = new ExchangeSinkExec(
plan.source(),
plan.output(),
plan.isIntermediateAgg(),
new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg())
);
runCompute(
parentTask,
new ComputeContext(localSessionId, clusterAlias, List.of(), configuration, exchangeSource, exchangeSink),
Expand Down

0 comments on commit af7d3f9

Please sign in to comment.