Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](hms table)Some optimizations for hms external table #44909

Merged
merged 4 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fe/check/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ under the License.
<suppress files="HiveVersionUtil\.java" checks="[a-zA-Z0-9]*"/>
<suppress files="[\\/]com[\\/]amazonaws[\\/]glue[\\/]catalog[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="[\\/]com[\\/]aliyun[\\/]datalake[\\/]metastore[\\/]hive2[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="FSDataInputStreamWrapper\.java" checks="[a-zA-Z0-9]*"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiPartitionMgr;
import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
Expand Down Expand Up @@ -88,7 +90,7 @@ public class ExternalMetaCacheMgr {
// catalog id -> table schema cache
private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
// hudi partition manager
private final HudiPartitionMgr hudiPartitionMgr;
private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
Expand Down Expand Up @@ -123,7 +125,7 @@ public ExternalMetaCacheMgr() {
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
Expand Down Expand Up @@ -165,7 +167,19 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
}

public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
return hudiPartitionMgr.getPartitionProcessor(catalog);
return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
}

public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
}

public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) {
return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
}

public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
return hudiMetadataCacheMgr;
}

public IcebergMetadataCache getIcebergMetadataCache() {
Expand Down Expand Up @@ -195,7 +209,7 @@ public void removeCache(long catalogId) {
if (schemaCacheMap.remove(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
hudiMetadataCacheMgr.removeCache(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
Expand All @@ -211,7 +225,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
if (metaCache != null) {
metaCache.invalidateTableCache(dbName, tblName);
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
Expand All @@ -230,7 +244,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
Expand All @@ -248,7 +262,7 @@ public void invalidateCatalogCache(long catalogId) {
if (metaCache != null) {
metaCache.invalidateAll();
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -532,34 +534,38 @@ public long getLastDdlTime() {
@Override
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
columns = getIcebergSchema();
return getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
columns = getHudiSchema();
return getHudiSchema();
} else {
columns = getHiveSchema();
return getHiveSchema();
}
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getIcebergSchema() {
return IcebergUtils.getSchema(catalog, dbName, name, IcebergUtils.UNKNOWN_SNAPSHOT_ID);
private Optional<SchemaCacheValue> getIcebergSchema() {
List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name);
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getHudiSchema() {
private Optional<SchemaCacheValue> getHudiSchema() {
org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
List<String> colTypes = Lists.newArrayList();
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
true, null, true, null, "", true, null, -1, null));
colTypes.add(HudiUtils.convertAvroToHiveType(hudiField.schema()));
}
return tmpSchema;
List<Column> partitionColumns = initPartitionColumns(tmpSchema);
HudiSchemaCacheValue hudiSchemaCacheValue = new HudiSchemaCacheValue(tmpSchema, partitionColumns);
hudiSchemaCacheValue.setColTypes(colTypes);
return Optional.of(hudiSchemaCacheValue);
}

private List<Column> getHiveSchema() {
private Optional<SchemaCacheValue> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name);
Expand All @@ -571,7 +577,8 @@ private List<Column> getHiveSchema() {
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
return columns;
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

@Override
Expand Down Expand Up @@ -649,9 +656,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
break;
}
default:
if (LOG.isDebugEnabled()) {
LOG.debug("get column stats for dlaType {} is not supported.", dlaType);
}
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}
return Optional.empty();
}
Expand Down Expand Up @@ -1016,4 +1021,15 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

public HoodieTableMetaClient getHudiClient() {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getMetaClientProcessor(getCatalog())
.getHoodieTableMetaClient(
getDbName(),
getName(),
getRemoteTable().getSd().getLocation(),
getCatalog().getConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -805,9 +804,18 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table
}

public static Schema getHudiTableSchema(HMSExternalTable table) {
HoodieTableMetaClient metaClient = getHudiClient(table);
HoodieTableMetaClient metaClient = table.getHudiClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Schema hudiSchema;

// Here, the timestamp should be reloaded again.
// Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file,
// which is saved in the `metaClient`.
// But the `metaClient` is obtained from cache, so the file obtained may be an old file.
// This file may be deleted by hudi clean task, and an error will be reported.
// So, we should reload timeline so that we can read the latest commit files.
metaClient.reloadActiveTimeline();

try {
hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
Expand All @@ -833,14 +841,6 @@ public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> act
}
}

public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return ugiDoAs(conf, () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration)
.setBasePath(hudiBasePath).build());
}

public static Configuration getConfiguration(HMSExternalTable table) {
return table.getCatalog().getConfiguration();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSSchemaCacheValue;

import java.util.List;

public class HudiSchemaCacheValue extends HMSSchemaCacheValue {

private List<String> colTypes;

public HudiSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema, partitionColumns);
}

public List<String> getColTypes() {
return colTypes;
}

public void setColTypes(List<String> colTypes) {
this.colTypes = colTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -251,7 +253,7 @@ public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> ta
return partitionValues;
}

HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();
Expand Down Expand Up @@ -281,4 +283,12 @@ public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> ta
}
return partitionValues;
}

public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) {
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HiveMetaStoreClientHelper.ugiDoAs(
conf,
() -> HoodieTableMetaClient.builder()
.setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath).build());
}
}
Loading
Loading