Skip to content

Commit

Permalink
[fix](planner) query should be cancelled if limit reached (apache#44338)
Browse files Browse the repository at this point in the history
Problem Summary:
When there is a `limit` cluse in SQL, if FE has obtained data with more
than the `limit` number of rows,
it should send a cancel command to BE to cancel the query to prevent BE
from reading more data.
However, this function has problems in the current code and does not
work.
Especially in external table query, this may result in lots of
unnecessary network io read.

1. `isBlockQuery`

In the old optimizer, if a query statement contains a `sort` or `agg`
node,
    `isBlockQuery` will be marked as true, otherwise it will be false.
    In the new optimizer, this value is always true.

    Regardless of the old or new optimizer, this logic is wrong.
But only when `isBlockQuery = false` will the reach limit logic be
triggered.

2. Calling problem of reach limit logic

The reach limit logic judgment will only be performed when `eos = true`
in the rowBatch returned by BE.
    This is wrong.
Because for `limit N` queries, each BE's own `limit` is N. But for FE,
as long as the total number of rows
returned by all BEs exceeds N, the reach limit logic can be triggered.
    So it should not be processed only when `eos = true`.

The PR mainly changes:

1. Remove `isBlockQuery`

`isBlockQuery` is only used in the reach limit logic. And it is not
needed. Remove it completely.

2. Modify the judgment position of reach limit.

    When the number of rows obtained by FE is greater than the limit,
    it will check the reach limit logic.

3. fix wrong `limitRows` in `QueryProcessor`

    the limitRows should be got from the first fragment, not last.

4. In scanner scheduler on BE side, if scanner has limit, ignore the
scan bytes threshold per round.

[fix](planner) query should be cancelled if limit reached
  • Loading branch information
morningman committed Dec 10, 2024
1 parent 938ca71 commit 0836af5
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 44 deletions.
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0;
bool first_read = true;
int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
Expand Down Expand Up @@ -319,6 +320,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.push_back(std::move(free_block));
}

if (limit > 0 && limit < ctx->batch_size()) {
// If this scanner has limit, and less than batch size,
// return immediately and no need to wait raw_bytes_threshold.
// This can save time that each scanner may only return a small number of rows,
// but rows are enough from all scanners.
// If not break, the query like "select * from tbl where id=1 limit 10"
// may scan a lot data when the "id=1"'s filter ratio is high.
// If limit is larger than batch size, this rule is skipped,
// to avoid user specify a large limit and causing too much small blocks.
break;
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class VScanner {
_query_statistics = query_statistics;
}

int64_t limit() const { return _limit; }

protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,6 @@ public String getExplainString(ExplainOptions explainOptions) {
return plan;
}

@Override
public boolean isBlockQuery() {
return true;
}

@Override
public DescriptorTable getDescTable() {
return descTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ public OriginalPlanner(Analyzer analyzer) {
this.analyzer = analyzer;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public PlannerContext getPlannerContext() {
return plannerContext;
}
Expand Down Expand Up @@ -274,17 +270,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue

if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
isBlockQuery = true;
if (LOG.isDebugEnabled()) {
LOG.debug("this is block query");
}
} else {
isBlockQuery = false;
if (LOG.isDebugEnabled()) {
LOG.debug("this isn't block query");
}
}
// Check SelectStatement if optimization condition satisfied
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public abstract class Planner {

protected ArrayList<PlanFragment> fragments = Lists.newArrayList();

protected boolean isBlockQuery = false;

protected TQueryOptions queryOptions;

public abstract List<ScanNode> getScanNodes();
Expand Down Expand Up @@ -115,10 +113,6 @@ public List<PlanFragment> getFragments() {
return fragments;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public TQueryOptions getQueryOptions() {
return queryOptions;
}
Expand Down
36 changes: 18 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();

private final boolean isBlockQuery;

private int numReceivedRows = 0;

private List<String> deltaUrls;
Expand Down Expand Up @@ -336,7 +334,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
Expand Down Expand Up @@ -379,7 +376,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand Down Expand Up @@ -1448,24 +1444,28 @@ public RowBatch getNext() throws Exception {
}
}

if (resultBatch.isEos()) {
this.returnedAllResults = true;

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
if (LOG.isDebugEnabled()) {
LOG.debug("number received rows: {}, {}", numReceivedRows, DebugUtil.printId(queryId));
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
}

if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
if (resultBatch.isEos()) {
numReceivedRows = 0;
numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows();
}
} else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}

Long limitRows = fragments.get(0).getPlanRoot().getLimit();
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query, {}",
limitRows, numReceivedRows, DebugUtil.printId(queryId));
}
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, "reach limit");
resultBatch.setEos(true);
}

return resultBatch;
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.common.Status;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Consumer;

/**
* This is a utility class for limit related operations.
* Because current there are 2 places need to check limit rows, so put the logic here for unification.
* - Coordinator.getNext();
* - QueryProcessor.getNext();
*/
public class LimitUtils {
private static final Logger LOG = LogManager.getLogger(LimitUtils.class);
private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit");

// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows,
Consumer<Status> cancelFunc) {
boolean reachedLimit = false;
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows);
}
cancelFunc.accept(LIMIT_REACH_STATUS);
// set this
resultBatch.setEos(true);
reachedLimit = true;
}
return reachedLimit;
}
}
59 changes: 59 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;


import org.apache.doris.common.Status;

import org.junit.Assert;
import org.junit.Test;

import java.util.function.Consumer;

public class LimitUtilsTest {

private static int res = 0;

@Test
public void testUpperBound() {
Consumer<Status> cancelFunc = batch -> res = 666;
RowBatch rowBatch = new RowBatch();
rowBatch.setEos(false);
// - no limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - not reach limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - reach limit
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);

// - reach limit
res = 0;
rowBatch.setEos(false);
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);
}
}

0 comments on commit 0836af5

Please sign in to comment.