From 80bcb431436eda84389661e51be1f6102afbe4d5 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Tue, 19 Sep 2023 11:20:27 +0800 Subject: [PATCH] [Feature]Support external table sample stats collection (#24376) Support hive table sample stats collection. Gramma is like `analyze table with sample percent 10` --- .../doris/analysis/AnalyzeProperties.java | 4 + .../apache/doris/analysis/AnalyzeTblStmt.java | 8 +- .../org/apache/doris/analysis/TableRef.java | 11 ++- .../datasource/hive/HiveMetaStoreCache.java | 3 + .../doris/planner/SingleNodePlanner.java | 1 + .../planner/external/FileQueryScanNode.java | 7 ++ .../doris/planner/external/HiveScanNode.java | 42 ++++++++ .../doris/statistics/HMSAnalysisTask.java | 52 ++++++---- .../hive/test_hive_sample_statistic.groovy | 99 +++++++++++++++++++ 9 files changed, 199 insertions(+), 28 deletions(-) create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_sample_statistic.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java index 208e86e19985df..7cd3a8d82767c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java @@ -272,6 +272,10 @@ public boolean forceFull() { return properties.containsKey(PROPERTY_FORCE_FULL); } + public boolean isSampleRows() { + return properties.containsKey(PROPERTY_SAMPLE_ROWS); + } + public String toSQL() { StringBuilder sb = new StringBuilder(); sb.append("PROPERTIES("); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index 874b83b280d886..5ca1ecd76cccb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -166,11 +166,9 @@ public void check() throws AnalysisException { analyzeProperties.check(); // TODO support external table - if (analyzeProperties.isSample()) { - if (!(table instanceof OlapTable)) { - throw new AnalysisException("Sampling statistics " - + "collection of external tables is not supported"); - } + if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) { + throw new AnalysisException("Sampling statistics " + + "collection of external tables is not supported with rows, use percent instead."); } if (analyzeProperties.isSync() && (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index f332d269b3fcce..42370a0c53b9c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -470,9 +470,16 @@ protected void analyzeSortHints() throws AnalysisException { } protected void analyzeSample() throws AnalysisException { - if ((sampleTabletIds != null || tableSample != null) && desc.getTable().getType() != TableIf.TableType.OLAP) { + if ((sampleTabletIds != null || tableSample != null) + && desc.getTable().getType() != TableIf.TableType.OLAP + && desc.getTable().getType() != TableIf.TableType.HMS_EXTERNAL_TABLE) { throw new AnalysisException("Sample table " + desc.getTable().getName() - + " type " + desc.getTable().getType() + " is not OLAP"); + + " type " + desc.getTable().getType() + " is not supported"); + } + if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) { + if (!tableSample.isPercent()) { + throw new AnalysisException("HMS table doesn't support sample rows, use percent instead."); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 0a85d9ff5bd00e..df6f48b97dcb3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -1071,6 +1071,9 @@ public static class HiveFileStatus { long length; long blockSize; long modificationTime; + boolean splittable; + List partitionValues; + AcidInfo acidInfo; } @Data diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 80d22e25b1ee1b..e0bdfdce4de734 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -2024,6 +2024,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s break; case HIVE: scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: throw new UserException("Not supported table type: " + ((HMSExternalTable) table).getDlaType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 00188dc50bbd42..8e6976b5ef2bcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.TableSample; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -92,6 +93,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected Map destSlotDescByName; protected TFileScanRangeParams params; + protected TableSample tableSample; + /** * External file scan node for Query hms table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -200,6 +203,10 @@ private void updateRequiredSlots() throws UserException { setColumnPositionMapping(); } + public void setTableSample(TableSample tSample) { + this.tableSample = tSample; + } + @Override public void finalize(Analyzer analyzer) throws UserException { doFinalize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 0b6e1d44466903..ba9b3e0abc387e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -63,6 +63,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -218,6 +219,11 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List hiveFileStatuses = selectFiles(fileCaches); + splitAllFiles(allFiles, hiveFileStatuses); + return; + } for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { // This if branch is to support old splitter, will remove later. if (fileCacheValue.getSplits() != null) { @@ -235,6 +241,42 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, + List hiveFileStatuses) throws IOException { + for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { + allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), + status.getBlockLocations(), status.getLength(), status.getModificationTime(), + status.isSplittable(), status.getPartitionValues(), + new HiveSplitCreator(status.getAcidInfo()))); + } + } + + private List selectFiles(List inputCacheValue) { + List fileList = Lists.newArrayList(); + long totalSize = 0; + for (FileCacheValue value : inputCacheValue) { + for (HiveMetaStoreCache.HiveFileStatus file : value.getFiles()) { + file.setSplittable(value.isSplittable()); + file.setPartitionValues(value.getPartitionValues()); + file.setAcidInfo(value.getAcidInfo()); + fileList.add(file); + totalSize += file.getLength(); + } + } + long sampleSize = totalSize * tableSample.getSampleValue() / 100; + long selectedSize = 0; + Collections.shuffle(fileList); + int index = 0; + for (HiveMetaStoreCache.HiveFileStatus file : fileList) { + selectedSize += file.getLength(); + index += 1; + if (selectedSize >= sampleSize) { + break; + } + } + return fileList.subList(0, index); + } + private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions) { for (HivePartition partition : partitions) { if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 973e5e76aa010a..2cf9accdee7c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -17,10 +17,10 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; @@ -31,9 +31,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -61,14 +58,14 @@ public class HMSAnalysisTask extends BaseAnalysisTask { + "${idxId} AS idx_id, " + "'${colId}' AS col_id, " + "NULL AS part_id, " - + "COUNT(1) AS row_count, " + + "${countExpr} AS row_count, " + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "${nullCountExpr} AS null_count, " + "MIN(`${colName}`) AS min, " + "MAX(`${colName}`) AS max, " + "${dataSizeFunction} AS data_size, " + "NOW() " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " @@ -86,8 +83,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask { + "${dataSizeFunction} AS data_size, " + "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where "; - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ${countExpr} as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; // cache stats for each partition, it would be inserted into column_statistics in a batch. private final List> buf = new ArrayList<>(); @@ -163,6 +160,7 @@ private void getTableColumnStats() throws Exception { params.put("colName", col.getName()); params.put("colId", info.colName); params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("nullCountExpr", getNullCountExpression()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); executeInsertSql(sql); @@ -279,6 +277,8 @@ private Map buildTableStatsParams(String partId) { commonParams.put("catalogName", catalog.getName()); commonParams.put("dbName", db.getFullName()); commonParams.put("tblName", tbl.getName()); + commonParams.put("sampleExpr", getSampleExpression()); + commonParams.put("countExpr", getCountExpression()); if (col != null) { commonParams.put("type", col.getType().toString()); } @@ -286,20 +286,30 @@ private Map buildTableStatsParams(String partId) { return commonParams; } - private void setParameterData(Map parameters, Map params) { - String numRows = ""; - String timestamp = ""; - if (parameters.containsKey(NUM_ROWS)) { - numRows = parameters.get(NUM_ROWS); + protected String getCountExpression() { + if (info.samplePercent > 0) { + return String.format("ROUND(COUNT(1) * 100 / %d)", info.samplePercent); + } else { + return "COUNT(1)"; + } + } + + protected String getNullCountExpression() { + if (info.samplePercent > 0) { + return String.format("ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 100 / %d)", + info.samplePercent); + } else { + return "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)"; } - if (parameters.containsKey(TIMESTAMP)) { - timestamp = parameters.get(TIMESTAMP); + } + + protected String getDataSizeFunction(Column column) { + String originFunction = super.getDataSizeFunction(column); + if (info.samplePercent > 0 && !isPartitionOnly) { + return String.format("ROUND((%s) * 100 / %d)", originFunction, info.samplePercent); + } else { + return originFunction; } - params.put("numRows", numRows); - params.put("rowCount", numRows); - params.put("update_time", TimeUtils.DATETIME_FORMAT.format( - LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000), - ZoneId.systemDefault()))); } @Override diff --git a/regression-test/suites/external_table_p2/hive/test_hive_sample_statistic.groovy b/regression-test/suites/external_table_p2/hive/test_hive_sample_statistic.groovy new file mode 100644 index 00000000000000..c2a21e3994b74c --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_sample_statistic.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_sample_statistic", "p2,external,hive,external_remote,external_remote_hive") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_sample_statistic" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + + sql """use ${catalog_name}.tpch_1000_parquet""" + sql """analyze table part with sample percent 10 with sync;""" + + def result = sql """show table stats part""" + assertTrue(result.size() == 1) + assertTrue(Long.parseLong(result[0][2]) >= 200000000) + assertTrue(Long.parseLong(result[0][2]) < 220000000) + + def ctlId + result = sql """show proc '/catalogs'""" + + for (int i = 0; i < result.size(); i++) { + if (result[i][1] == catalog_name) { + ctlId = result[i][0] + } + } + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_partkey'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_name'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_mfgr'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_brand'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_type'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_size'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_container'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_retailprice'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_comment'""" + assertTrue(result.size() == 1) + assertTrue(result[0][0] >= 200000000) + assertTrue(result[0][0] < 220000000) + + sql """drop catalog ${catalog_name}"""; + } +} +