Skip to content

Commit

Permalink
[Fix](statistics)Fix bug and improve auto analyze. (apache#27626)
Browse files Browse the repository at this point in the history
1. Implement needReAnalyzeTable for ExternalTable. For now, external table will not be reanalyzed in 10 days.
2. For HiveMetastoreCache.loadPartitions, handle the empty iterator case to avoid Index out of boundary exception.
3. Wrap handle show analyze loop with try catch, so that when one table failed (for example, catalog dropped so the table couldn't be found anymore), we can still show the other tables.
4. For now, only OlapTable and Hive HMSExternalTable support sample analyze, throw exception for other types of table.
5. In StatisticsCollector, call constructJob after createTableLevelTaskForExternalTable to avoid NPE.
  • Loading branch information
Jibing-Li authored and seawinde committed Nov 28, 2023
1 parent 7b0ebb0 commit 7733dea
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,18 @@ public void gsonPostProcess() throws IOException {

@Override
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
// TODO: Find a way to decide if this external table need to be reanalyzed.
// For now, simply return true for all external tables.
return true;
if (tblStats == null) {
return true;
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
}
return System.currentTimeMillis()
- tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ private HivePartition loadPartition(PartitionCacheKey key) {
}

private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
Map<PartitionCacheKey, HivePartition> ret = new HashMap<>();
if (keys == null || !keys.iterator().hasNext()) {
return ret;
}
PartitionCacheKey oneKey = Iterables.get(keys, 0);
String dbName = oneKey.getDbName();
String tblName = oneKey.getTblName();
Expand All @@ -341,7 +345,6 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
}).collect(Collectors.toList());
List<Partition> partitions = catalog.getClient().getPartitions(dbName, tblName, partitionNames);
// Compose the return result map.
Map<PartitionCacheKey, HivePartition> ret = new HashMap<>();
for (Partition partition : partitions) {
StorageDescriptor sd = partition.getSd();
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ public class SessionVariable implements Serializable, Writable {
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "huge_table_auto_analyze_interval_in_millis";

public static final String EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "external_table_auto_analyze_interval_in_millis";

public static final String TABLE_STATS_HEALTH_THRESHOLD
= "table_stats_health_threshold";

Expand Down Expand Up @@ -1366,6 +1369,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
+ "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."})
public long hugeTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(12);

@VariableMgr.VarAttr(name = EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL,
description = {"控制对外表的自动ANALYZE的最小时间间隔,在该时间间隔内的外表仅ANALYZE一次",
"This controls the minimum time interval for automatic ANALYZE on external tables."
+ "Within this interval, external tables are analyzed only once."})
public long externalTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(24);

@VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL,
description = {"取值在0-100之间,当自上次统计信息收集操作之后"
+ "数据更新量达到 (100 - table_stats_health_threshold)% ,认为该表的统计信息已过时",
Expand Down
79 changes: 43 additions & 36 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2622,44 +2622,51 @@ private void handleShowAnalyze() {
List<List<String>> resultRows = Lists.newArrayList();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (AnalysisInfo analysisInfo : results) {
List<String> row = new ArrayList<>();
row.add(String.valueOf(analysisInfo.jobId));
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(analysisInfo.catalogId);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
} else {
row.add("DB may get deleted");
}
row.add(analysisInfo.colName);
row.add(analysisInfo.jobType.toString());
row.add(analysisInfo.analysisType.toString());
row.add(analysisInfo.message);
row.add(TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
try {
row.add(showStmt.isAuto()
? analysisInfo.progress
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
List<String> row = new ArrayList<>();
row.add(String.valueOf(analysisInfo.jobId));
CatalogIf<? extends DatabaseIf<? extends TableIf>> c
= StatisticsUtil.findCatalog(analysisInfo.catalogId);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
} else {
row.add("DB may get deleted");
}
row.add(analysisInfo.colName);
row.add(analysisInfo.jobType.toString());
row.add(analysisInfo.analysisType.toString());
row.add(analysisInfo.message);
row.add(TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
try {
row.add(showStmt.isAuto()
? analysisInfo.progress
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
} catch (Exception e) {
row.add("N/A");
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);
}
row.add(analysisInfo.scheduleType.toString());
LocalDateTime startTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
java.time.ZoneId.systemDefault());
LocalDateTime endTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.endTime),
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
resultRows.add(row);
} catch (Exception e) {
row.add("N/A");
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);
}
row.add(analysisInfo.scheduleType.toString());
LocalDateTime startTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
java.time.ZoneId.systemDefault());
LocalDateTime endTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.endTime),
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
resultRows.add(row);
LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}",
analysisInfo.catalogId, analysisInfo.dbId, analysisInfo.tblId, e.getMessage());
continue;
}
}
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
Expand Down Expand Up @@ -336,6 +337,12 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio
// No statistics need to be collected or updated
return null;
}
// Only OlapTable and Hive HMSExternalTable support sample analyze.
if ((stmt.getSamplePercent() > 0 || stmt.getSampleRows() > 0) && !canSample(stmt.getTable())) {
String message = String.format("Table %s doesn't support sample analyze.", stmt.getTable().getName());
LOG.info(message);
throw new DdlException(message);
}

boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
Expand Down Expand Up @@ -1085,4 +1092,20 @@ public void removeJob(long id) {
public boolean hasUnFinished() {
return !analysisJobIdToTaskMap.isEmpty();
}

/**
* Only OlapTable and Hive HMSExternalTable can sample for now.
* @param table
* @return Return true if the given table can do sample analyze. False otherwise.
*/
public boolean canSample(TableIf table) {
if (table instanceof OlapTable) {
return true;
}
if (table instanceof HMSExternalTable
&& ((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class StatisticConstants {

public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(12);

public static final long EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(24);

public static final int TABLE_STATS_HEALTH_THRESHOLD = 60;

public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -107,17 +107,28 @@ public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) {
List<AnalysisInfo> analysisInfos = new ArrayList<>();
for (TableIf table : db.getTables()) {
if (skip(table)) {
try {
if (skip(table)) {
continue;
}
createAnalyzeJobForTbl(db, analysisInfos, table);
} catch (Throwable t) {
LOG.warn("Failed to analyze table {}.{}.{}",
db.getCatalog().getName(), db.getFullName(), table.getName(), t);
continue;
}
createAnalyzeJobForTbl(db, analysisInfos, table);
}
return analysisInfos;
}

// return true if skip auto analyze this time.
protected boolean skip(TableIf table) {
if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) {
return true;
}
// For now, only support Hive HMS table auto collection.
if (table instanceof HMSExternalTable
&& !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false);
}
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks);
analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,16 @@ public static long getHugeTableAutoAnalyzeIntervalInMillis() {
return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}

public static long getExternalTableAutoAnalyzeIntervalInMillis() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
.externalTableAutoAnalyzeIntervalInMillis;
} catch (Exception e) {
LOG.warn("Failed to get value of externalTableAutoAnalyzeIntervalInMillis, return default", e);
}
return StatisticConstants.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}

public static long getTableStatsHealthThreshold() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)
Expand Down

0 comments on commit 7733dea

Please sign in to comment.