Skip to content

Commit

Permalink
physcialStorageLayer , p0
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Aug 7, 2024
1 parent f52000d commit 2543112
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,11 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
}
}
for (ScanNode scanNode : context.getScanNodes()) {
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
try {
scanNode.finalizeForNereids();
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
return rootFragment;
}
Expand Down Expand Up @@ -834,6 +838,9 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
.map(context::findSlotRef).collect(Collectors.toList());
dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
}
if (olapScan.getStats() != null) {
olapScanNode.setCardinality((long) olapScan.getStats().getRowCount());
}
// TODO: maybe we could have a better way to create fragment
PlanFragment planFragment = createPlanFragment(olapScanNode, dataPartition, olapScan);
context.addPlanFragment(planFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.algebra.Repeat;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
Expand Down Expand Up @@ -115,6 +116,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
Expand Down Expand Up @@ -328,24 +330,22 @@ private long computeDeltaRowCount(OlapScan olapScan, SlotReference slot) {
return deltaRowCount;
}

private void adjustColStats(CatalogRelation catalogRelation, SlotReference slot,
private void adjustColStats(OlapScan olapScan, SlotReference slot,
ColumnStatisticBuilder builder) {
if (builder.getAvgSizeByte() <= 0) {
builder.setAvgSizeByte(slot.getDataType().toCatalogDataType().getSlotSize());
}
if (catalogRelation instanceof OlapScan) {
OlapScan olapScan = (OlapScan) catalogRelation;
long delta = computeDeltaRowCount(olapScan, slot);
if (delta > 0) {
builder.setCount(builder.getCount() + delta);
// clear min-max to avoid error estimation
// for example, after yesterday data loaded, user send query about yesterday immediately.
// since yesterday data are not analyzed, the max date is before yesterday, and hence optimizer
// estimates the filter result is zero
builder.setMinExpr(null).setMinValue(Double.NEGATIVE_INFINITY)
.setMaxExpr(null).setMaxValue(Double.POSITIVE_INFINITY);
}
long delta = computeDeltaRowCount(olapScan, slot);
if (delta > 0) {
builder.setCount(builder.getCount() + delta);
// clear min-max to avoid error estimation
// for example, after yesterday data loaded, user send query about yesterday immediately.
// since yesterday data are not analyzed, the max date is before yesterday, and hence optimizer
// estimates the filter result is zero
builder.setMinExpr(null).setMinValue(Double.NEGATIVE_INFINITY)
.setMaxExpr(null).setMaxValue(Double.POSITIVE_INFINITY);
}

}

private ColumnStatistic getColumnStatsFromTableCache(CatalogRelation catalogRelation, SlotReference slot) {
Expand All @@ -356,12 +356,10 @@ private ColumnStatistic getColumnStatsFromTableCache(CatalogRelation catalogRela
return getColumnStatistic(catalogRelation.getTable(), slot.getName(), idxId);
}

private ColumnStatistic getColumnStatsFromPartitionCache(CatalogRelation catalogRelation, SlotReference slot,
private ColumnStatistic getColumnStatsFromPartitionCache(OlapScan catalogRelation, SlotReference slot,
List<String> partitionNames) {
long idxId = -1;
if (catalogRelation instanceof OlapScan) {
idxId = ((OlapScan) catalogRelation).getSelectedIndexId();
}
long idxId = catalogRelation.getSelectedIndexId();

return getColumnStatistic(catalogRelation.getTable(), slot.getName(), idxId, partitionNames);
}

Expand Down Expand Up @@ -399,7 +397,7 @@ private void checkIfUnknownStatsUsedAsKey(StatisticsBuilder builder) {
}
}

private Statistics computeOlapScan(LogicalOlapScan olapScan) {
private Statistics computeOlapScan(OlapScan olapScan) {
OlapTable olapTable = olapScan.getTable();
double tableRowCount = olapTable.getRowCountForIndex(olapScan.getSelectedIndexId());
if (tableRowCount <= 0) {
Expand All @@ -416,12 +414,14 @@ private Statistics computeOlapScan(LogicalOlapScan olapScan) {
if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId() || olapTable instanceof MTMV) {
// mv is selected, return its estimated stats
Optional<Statistics> optStats = cascadesContext.getStatementContext()
.getStatistics(olapScan.getRelationId());
.getStatistics(((Relation) olapScan).getRelationId());
if (optStats.isPresent()) {
double selectedRowCount = tableRowCount * olapScan.getSelectedPartitionIds().size()
/ Math.max(1, olapTable.getPartitions().size());
double selectedPartitionsRowCount = getSelectedPartitionRowCount(olapScan);
if (selectedPartitionsRowCount == -1) {
selectedPartitionsRowCount = tableRowCount;
}
// if estimated mv rowCount is more than actual row count, fall back to base table stats
if (selectedRowCount > optStats.get().getRowCount()) {
if (selectedPartitionsRowCount > optStats.get().getRowCount()) {
return optStats.get();
}
}
Expand All @@ -433,7 +433,7 @@ private Statistics computeOlapScan(LogicalOlapScan olapScan) {
if (StatisticConstants.isSystemTable(olapTable) || !FeConstants.enableInternalSchemaDb
|| ConnectContext.get() == null
|| ConnectContext.get().getSessionVariable().internalSession) {
for (Slot slot : olapScan.getOutput()) {
for (Slot slot : ((Plan) olapScan).getOutput()) {
builder.putColumnStatistics(slot, ColumnStatistic.UNKNOWN);
}
setHasUnknownColStatsInStatementContext();
Expand All @@ -444,7 +444,7 @@ private Statistics computeOlapScan(LogicalOlapScan olapScan) {
// for regression shape test
if (ConnectContext.get() == null || !ConnectContext.get().getSessionVariable().enableStats) {
// get row count from any visible slotReference's colStats
for (Slot slot : olapScan.getOutput()) {
for (Slot slot : ((Plan) olapScan).getOutput()) {
builder.putColumnStatistics(slot,
new ColumnStatisticBuilder(ColumnStatistic.UNKNOWN).setCount(tableRowCount).build());
}
Expand All @@ -455,7 +455,7 @@ private Statistics computeOlapScan(LogicalOlapScan olapScan) {
// build Stats for olapScan
// if slot is invisible, use UNKNOWN
List<SlotReference> visibleOutputSlots = new ArrayList<>();
for (Slot slot : olapScan.getOutput()) {
for (Slot slot : ((Plan) olapScan).getOutput()) {
if (isVisibleSlotReference(slot)) {
visibleOutputSlots.add((SlotReference) slot);
} else {
Expand All @@ -482,23 +482,20 @@ private Statistics computeOlapScan(LogicalOlapScan olapScan) {
builder.setRowCount(selectedPartitionsRowCount);
} else {
// if partition row count is invalid (-1), fallback to table stats
// suppose all partitions have the same row count
double estimatedSelectedPartitionsRowCount = tableRowCount * olapScan.getSelectedPartitionIds().size()
/ Math.max(1, olapTable.getPartitions().size());
for (SlotReference slot : visibleOutputSlots) {
ColumnStatistic cache = getColumnStatsFromTableCache(olapScan, slot);
ColumnStatistic cache = getColumnStatsFromTableCache((CatalogRelation) olapScan, slot);
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(estimatedSelectedPartitionsRowCount);
colStatsBuilder.setCount(tableRowCount);
adjustColStats(olapScan, slot, colStatsBuilder);
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
builder.setRowCount(estimatedSelectedPartitionsRowCount);
builder.setRowCount(tableRowCount);
}
} else {
// get table level stats
for (SlotReference slot : visibleOutputSlots) {
ColumnStatistic cache = getColumnStatsFromTableCache(olapScan, slot);
ColumnStatistic cache = getColumnStatsFromTableCache((CatalogRelation) olapScan, slot);
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(tableRowCount);
adjustColStats(olapScan, slot, colStatsBuilder);
Expand Down Expand Up @@ -676,7 +673,7 @@ public Statistics visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelat

@Override
public Statistics visitPhysicalOlapScan(PhysicalOlapScan olapScan, Void context) {
return computeCatalogRelation(olapScan);
return computeOlapScan(olapScan);
}

@Override
Expand All @@ -698,7 +695,10 @@ public Statistics visitPhysicalFileScan(PhysicalFileScan fileScan, Void context)
@Override
public Statistics visitPhysicalStorageLayerAggregate(
PhysicalStorageLayerAggregate storageLayerAggregate, Void context) {
return storageLayerAggregate.getRelation().accept(this, context);
PhysicalRelation relation = storageLayerAggregate.getRelation();
Preconditions.checkArgument(relation instanceof PhysicalOlapScan, "aaaa");
return relation.accept(this, context);

}

@Override
Expand Down Expand Up @@ -1086,7 +1086,6 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
}
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(tableRowCount);
adjustColStats(catalogRelation, slot, colStatsBuilder);
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1729,16 +1729,14 @@ public String getSelectedIndexName() {
public void finalizeForNereids() {
computeNumNodes();
computeStatsForNereids();
// NOTICE: must call here to get selected tablet row count to let block rules work well.
mockRowCountInStatistic();
}

private void computeStatsForNereids() {
if (cardinality > 0 && avgRowSize <= 0) {
avgRowSize = totalBytes / (float) cardinality * COMPRESSION_RATIO;
capCardinalityAtLimit();
}
// when node scan has no data, cardinality should be 0 instead of a invalid
// when node scan has no data, cardinality should be 0 instead of an invalid
// value after computeStats()
cardinality = cardinality == -1 ? 0 : cardinality;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ suite("test_count_on_index_httplogs", "p0") {
sql """set experimental_enable_nereids_planner=true;"""
sql """set enable_fallback_to_original_planner=false;"""
sql """analyze table ${testTable_dup} with sync""";
// wait BE report every partition's row count
sleep(10000)
// case1: test duplicate table
explain {
sql("select COUNT() from ${testTable_dup} where request match 'GET'")
Expand Down
Loading

0 comments on commit 2543112

Please sign in to comment.