Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 25, 2023
1 parent f31c1d8 commit 8e37db5
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
scanNode = new HiveScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
}
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
break;
default:
throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType());
Expand Down Expand Up @@ -624,12 +622,8 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName);
tupleDescriptor.setRef(tableRef);
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
if (olapScan.getTableSample().isPresent()) {
olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent,
olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek));
olapScanNode.computeSampleTabletIds();
}
olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent,
olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek));

// TODO: remove this switch?
switch (olapScan.getTable().getKeysType()) {
Expand Down
123 changes: 67 additions & 56 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -220,7 +221,7 @@ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason :
this.reasonOfPreAggregation + " " + reason;
this.reasonOfPreAggregation + " " + reason;
}


Expand Down Expand Up @@ -402,7 +403,8 @@ public void updateScanRangeInfoByNewMVSelector(long selectedIndexId,
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
CHECK:
{ // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
Expand Down Expand Up @@ -545,7 +547,6 @@ public void init(Analyzer analyzer) throws UserException {
computePartitionInfo();
}
computeTupleState(analyzer);
computeSampleTabletIds();

/**
* Compute InAccurate cardinality before mv selector and tablet pruning.
Expand Down Expand Up @@ -778,7 +779,7 @@ private void addScanRangeLocations(Partition partition,
// but it means we will do 3 S3 IO to get the data which will bring 3 slow query
if (-1L != coolDownReplicaId) {
final Optional<Replica> replicaOptional = replicas.stream()
.filter(r -> r.getId() == coolDownReplicaId).findAny();
.filter(r -> r.getId() == coolDownReplicaId).findAny();
replicaOptional.ifPresent(
r -> {
Backend backend = Env.getCurrentSystemInfo()
Expand Down Expand Up @@ -930,75 +931,84 @@ public void setOutputColumnUniqueIds(Set<Integer> outputColumnUniqueIds) {
}

/**
* First, determine how many rows to sample from each partition according to the number of partitions.
* Then determine the number of Tablets to be selected for each partition according to the average number
* of rows of Tablet,
* If seek is not specified, the specified number of Tablets are pseudo-randomly selected from each partition.
* If seek is specified, it will be selected sequentially from the seek tablet of the partition.
* And add the manually specified Tablet id to the selected Tablet.
* simpleTabletNums = simpleRows / partitionNums / (partitionRows / partitionTabletNums)
* Sample some tablets in the selected partition.
* If Seek is specified, the tablets sampled each time are the same.
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
long sampleRows; // The total number of sample rows
long hitRows = 1; // The total number of rows hit by the tablet
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition

// 1. Calculate the total number of rows in the selected partition, and sort partition list.
long selectedRows = 0;
long totalSampleRows = 0;
List<Long> selectedPartitionList = new ArrayList<>();
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
selectedRows += selectedTable.getRowCount();
selectedPartitionList.add(partitionId);
}
selectedPartitionList.sort(Comparator.naturalOrder());

// 2.Sampling is not required in some cases, will not take effect after clear sampleTabletIds.
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
if (tableSample.getSampleValue() == 100) {
sampleTabletIds.clear();
return;
}
totalSampleRows = (long) Math.max(selectedRows * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
if (tableSample.getSampleValue() > selectedRows) {
sampleTabletIds.clear();
return;
}
totalSampleRows = tableSample.getSampleValue();
}

// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);

for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();

if (ids.isEmpty()) {
// 3. Sampling partition. If Seek is specified, the partition will be the same for each sampling.
long hitRows = 0; // The number of rows hit by the tablet
long partitionSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * selectedPartitionIds.size());
for (int i = 0; i < selectedPartitionList.size(); i++) {
int seekPid = (int) ((i + partitionSeek) % selectedPartitionList.size());
final Partition partition = olapTable.getPartition(selectedPartitionList.get(seekPid));
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
List<Tablet> tablets = selectedTable.getTablets();
if (tablets.isEmpty()) {
continue;
}

// Skip partitions with row count < row count / 2 expected to be sampled per partition.
// It can be expected to sample a smaller number of partitions to avoid uneven distribution
// of sampling results.
if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
continue;
// 4. Calculate the number of rows that need to be sampled in the current partition.
long sampleRows = 0; // The number of sample rows in partition
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(selectedTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = (long) Math.max(
tableSample.getSampleValue() * (selectedTable.getRowCount() / selectedRows), 1);
}

// It is assumed here that all tablets row count is uniformly distributed
// TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get each tablet row count to compute sample.
long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / ids.size(), 1);
long tabletCounts = Math.max(
avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
tabletCounts = Math.min(tabletCounts, ids.size());

long seek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * ids.size());
for (int i = 0; i < tabletCounts; i++) {
int seekTid = (int) ((i + seek) % ids.size());
sampleTabletIds.add(ids.get(seekTid));
// 5. Sampling tablets. If Seek is specified, the same tablet will be sampled each time.
long tabletSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * tablets.size());
for (int j = 0; j < tablets.size(); j++) {
int seekTid = (int) ((j + tabletSeek) % tablets.size());
if (tablets.get(seekTid).getRowCount(true) == 0) {
continue;
}
sampleTabletIds.add(tablets.get(seekTid).getId());
sampleRows -= tablets.get(seekTid).getRowCount(true);
hitRows += tablets.get(seekTid).getRowCount(true);
if (sampleRows <= 0) {
break;
}
}
if (hitRows > totalSampleRows) {
break;
}

hitRows += avgRowsPerTablet * tabletCounts;
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
}

// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}", hitRows, selectedRows);
}

public boolean isFromPrepareStmt() {
Expand All @@ -1024,6 +1034,7 @@ private void computeTabletInfo() throws UserException {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
computeSampleTabletIds();
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
Expand Down

0 comments on commit 8e37db5

Please sign in to comment.