Skip to content

Commit

Permalink
[Feature]Support external table sample stats collection (apache#24376)
Browse files Browse the repository at this point in the history
Support hive table sample stats collection. Gramma is like

`analyze table with sample percent 10`
  • Loading branch information
Jibing-Li authored Sep 19, 2023
1 parent 6a33e46 commit 80bcb43
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ public boolean forceFull() {
return properties.containsKey(PROPERTY_FORCE_FULL);
}

public boolean isSampleRows() {
return properties.containsKey(PROPERTY_SAMPLE_ROWS);
}

public String toSQL() {
StringBuilder sb = new StringBuilder();
sb.append("PROPERTIES(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,9 @@ public void check() throws AnalysisException {
analyzeProperties.check();

// TODO support external table
if (analyzeProperties.isSample()) {
if (!(table instanceof OlapTable)) {
throw new AnalysisException("Sampling statistics "
+ "collection of external tables is not supported");
}
if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) {
throw new AnalysisException("Sampling statistics "
+ "collection of external tables is not supported with rows, use percent instead.");
}
if (analyzeProperties.isSync()
&& (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) {
Expand Down
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,16 @@ protected void analyzeSortHints() throws AnalysisException {
}

protected void analyzeSample() throws AnalysisException {
if ((sampleTabletIds != null || tableSample != null) && desc.getTable().getType() != TableIf.TableType.OLAP) {
if ((sampleTabletIds != null || tableSample != null)
&& desc.getTable().getType() != TableIf.TableType.OLAP
&& desc.getTable().getType() != TableIf.TableType.HMS_EXTERNAL_TABLE) {
throw new AnalysisException("Sample table " + desc.getTable().getName()
+ " type " + desc.getTable().getType() + " is not OLAP");
+ " type " + desc.getTable().getType() + " is not supported");
}
if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) {
if (!tableSample.isPercent()) {
throw new AnalysisException("HMS table doesn't support sample rows, use percent instead.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,9 @@ public static class HiveFileStatus {
long length;
long blockSize;
long modificationTime;
boolean splittable;
List<String> partitionValues;
AcidInfo acidInfo;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
break;
case HIVE:
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample());
break;
default:
throw new UserException("Not supported table type: " + ((HMSExternalTable) table).getDlaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -92,6 +93,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected Map<String, SlotDescriptor> destSlotDescByName;
protected TFileScanRangeParams params;

protected TableSample tableSample;

/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down Expand Up @@ -200,6 +203,10 @@ private void updateRequiredSlots() throws UserException {
setColumnPositionMapping();
}

public void setTableSample(TableSample tSample) {
this.tableSample = tSample;
}

@Override
public void finalize(Analyzer analyzer) throws UserException {
doFinalize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -218,6 +219,11 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
splitAllFiles(allFiles, hiveFileStatuses);
return;
}
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
// This if branch is to support old splitter, will remove later.
if (fileCacheValue.getSplits() != null) {
Expand All @@ -235,6 +241,42 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
}
}

private void splitAllFiles(List<Split> allFiles,
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException {
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
}
}

private List<HiveMetaStoreCache.HiveFileStatus> selectFiles(List<FileCacheValue> inputCacheValue) {
List<HiveMetaStoreCache.HiveFileStatus> fileList = Lists.newArrayList();
long totalSize = 0;
for (FileCacheValue value : inputCacheValue) {
for (HiveMetaStoreCache.HiveFileStatus file : value.getFiles()) {
file.setSplittable(value.isSplittable());
file.setPartitionValues(value.getPartitionValues());
file.setAcidInfo(value.getAcidInfo());
fileList.add(file);
totalSize += file.getLength();
}
}
long sampleSize = totalSize * tableSample.getSampleValue() / 100;
long selectedSize = 0;
Collections.shuffle(fileList);
int index = 0;
for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
selectedSize += file.getLength();
index += 1;
if (selectedSize >= sampleSize) {
break;
}
}
return fileList.subList(0, index);
}

private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions) {
for (HivePartition partition : partitions) {
if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
Expand All @@ -31,9 +31,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -61,14 +58,14 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "COUNT(1) AS row_count, "
+ "${countExpr} AS row_count, "
+ "NDV(`${colName}`) AS ndv, "
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
+ "${nullCountExpr} AS null_count, "
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";

private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
Expand All @@ -86,8 +83,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "${dataSizeFunction} AS data_size, "
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";

private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ${countExpr} as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";

// cache stats for each partition, it would be inserted into column_statistics in a batch.
private final List<List<ColStatsData>> buf = new ArrayList<>();
Expand Down Expand Up @@ -163,6 +160,7 @@ private void getTableColumnStats() throws Exception {
params.put("colName", col.getName());
params.put("colId", info.colName);
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("nullCountExpr", getNullCountExpression());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
Expand Down Expand Up @@ -279,27 +277,39 @@ private Map<String, String> buildTableStatsParams(String partId) {
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleExpr", getSampleExpression());
commonParams.put("countExpr", getCountExpression());
if (col != null) {
commonParams.put("type", col.getType().toString());
}
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}

private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
String numRows = "";
String timestamp = "";
if (parameters.containsKey(NUM_ROWS)) {
numRows = parameters.get(NUM_ROWS);
protected String getCountExpression() {
if (info.samplePercent > 0) {
return String.format("ROUND(COUNT(1) * 100 / %d)", info.samplePercent);
} else {
return "COUNT(1)";
}
}

protected String getNullCountExpression() {
if (info.samplePercent > 0) {
return String.format("ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 100 / %d)",
info.samplePercent);
} else {
return "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)";
}
if (parameters.containsKey(TIMESTAMP)) {
timestamp = parameters.get(TIMESTAMP);
}

protected String getDataSizeFunction(Column column) {
String originFunction = super.getDataSizeFunction(column);
if (info.samplePercent > 0 && !isPartitionOnly) {
return String.format("ROUND((%s) * 100 / %d)", originFunction, info.samplePercent);
} else {
return originFunction;
}
params.put("numRows", numRows);
params.put("rowCount", numRows);
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
ZoneId.systemDefault())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_sample_statistic", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_sample_statistic"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")

sql """use ${catalog_name}.tpch_1000_parquet"""
sql """analyze table part with sample percent 10 with sync;"""

def result = sql """show table stats part"""
assertTrue(result.size() == 1)
assertTrue(Long.parseLong(result[0][2]) >= 200000000)
assertTrue(Long.parseLong(result[0][2]) < 220000000)

def ctlId
result = sql """show proc '/catalogs'"""

for (int i = 0; i < result.size(); i++) {
if (result[i][1] == catalog_name) {
ctlId = result[i][0]
}
}

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_partkey'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_name'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_mfgr'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_brand'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_type'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_size'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_container'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_retailprice'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_comment'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

sql """drop catalog ${catalog_name}""";
}
}

0 comments on commit 80bcb43

Please sign in to comment.