diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml
index 84e6666c0c2b32..436b598b594d59 100644
--- a/fe/check/checkstyle/suppressions.xml
+++ b/fe/check/checkstyle/suppressions.xml
@@ -64,4 +64,5 @@ under the License.
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 48d170b59166b4..9ad78816278de1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -25,7 +25,9 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
-import org.apache.doris.datasource.hudi.source.HudiPartitionMgr;
+import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
+import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
+import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
@@ -88,7 +90,7 @@ public class ExternalMetaCacheMgr {
// catalog id -> table schema cache
private Map schemaCacheMap = Maps.newHashMap();
// hudi partition manager
- private final HudiPartitionMgr hudiPartitionMgr;
+ private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
@@ -123,7 +125,7 @@ public ExternalMetaCacheMgr() {
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
- hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
+ hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
@@ -165,7 +167,19 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
}
public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
- return hudiPartitionMgr.getPartitionProcessor(catalog);
+ return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
+ }
+
+ public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
+ return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
+ }
+
+ public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) {
+ return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
+ }
+
+ public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
+ return hudiMetadataCacheMgr;
}
public IcebergMetadataCache getIcebergMetadataCache() {
@@ -195,7 +209,7 @@ public void removeCache(long catalogId) {
if (schemaCacheMap.remove(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
- hudiPartitionMgr.removePartitionProcessor(catalogId);
+ hudiMetadataCacheMgr.removeCache(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
@@ -211,7 +225,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
if (metaCache != null) {
metaCache.invalidateTableCache(dbName, tblName);
}
- hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
+ hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
@@ -230,7 +244,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
- hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
+ hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
@@ -248,7 +262,7 @@ public void invalidateCatalogCache(long catalogId) {
if (metaCache != null) {
metaCache.invalidateAll();
}
- hudiPartitionMgr.cleanPartitionProcess(catalogId);
+ hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 26019350fd072d..71c7308b079866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -34,6 +34,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
@@ -74,6 +75,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -532,34 +534,38 @@ public long getLastDdlTime() {
@Override
public Optional initSchema() {
makeSureInitialized();
- List columns;
if (dlaType.equals(DLAType.ICEBERG)) {
- columns = getIcebergSchema();
+ return getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
- columns = getHudiSchema();
+ return getHudiSchema();
} else {
- columns = getHiveSchema();
+ return getHiveSchema();
}
- List partitionColumns = initPartitionColumns(columns);
- return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
- private List getIcebergSchema() {
- return IcebergUtils.getSchema(catalog, dbName, name);
+ private Optional getIcebergSchema() {
+ List columns = IcebergUtils.getSchema(catalog, dbName, name);
+ List partitionColumns = initPartitionColumns(columns);
+ return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
- private List getHudiSchema() {
+ private Optional getHudiSchema() {
org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this);
List tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
+ List colTypes = Lists.newArrayList();
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
true, null, true, null, "", true, null, -1, null));
+ colTypes.add(HudiUtils.convertAvroToHiveType(hudiField.schema()));
}
- return tmpSchema;
+ List partitionColumns = initPartitionColumns(tmpSchema);
+ HudiSchemaCacheValue hudiSchemaCacheValue = new HudiSchemaCacheValue(tmpSchema, partitionColumns);
+ hudiSchemaCacheValue.setColTypes(colTypes);
+ return Optional.of(hudiSchemaCacheValue);
}
- private List getHiveSchema() {
+ private Optional getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List schema = client.getSchema(dbName, name);
Map colDefaultValues = client.getDefaultColumnValues(dbName, name);
@@ -571,7 +577,8 @@ private List getHiveSchema() {
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
- return columns;
+ List partitionColumns = initPartitionColumns(columns);
+ return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
@Override
@@ -649,9 +656,7 @@ public Optional getColumnStatistic(String colName) {
break;
}
default:
- if (LOG.isDebugEnabled()) {
- LOG.debug("get column stats for dlaType {} is not supported.", dlaType);
- }
+ LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}
return Optional.empty();
}
@@ -1016,4 +1021,15 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}
+
+ public HoodieTableMetaClient getHudiClient() {
+ return Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getMetaClientProcessor(getCatalog())
+ .getHoodieTableMetaClient(
+ getDbName(),
+ getName(),
+ getRemoteTable().getSd().getLocation(),
+ getCatalog().getConfiguration());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 706bd653a85e21..a66bf2cb3906b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -63,7 +63,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -805,9 +804,18 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table
}
public static Schema getHudiTableSchema(HMSExternalTable table) {
- HoodieTableMetaClient metaClient = getHudiClient(table);
+ HoodieTableMetaClient metaClient = table.getHudiClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Schema hudiSchema;
+
+ // Here, the timestamp should be reloaded again.
+ // Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file,
+ // which is saved in the `metaClient`.
+ // But the `metaClient` is obtained from cache, so the file obtained may be an old file.
+ // This file may be deleted by hudi clean task, and an error will be reported.
+ // So, we should reload timeline so that we can read the latest commit files.
+ metaClient.reloadActiveTimeline();
+
try {
hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
@@ -833,14 +841,6 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act
}
}
- public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
- String hudiBasePath = table.getRemoteTable().getSd().getLocation();
- Configuration conf = getConfiguration(table);
- HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
- return ugiDoAs(conf, () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration)
- .setBasePath(hudiBasePath).build());
- }
-
public static Configuration getConfiguration(HMSExternalTable table) {
return table.getCatalog().getConfiguration();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
new file mode 100644
index 00000000000000..8c58ffa2006f16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.hive.HMSSchemaCacheValue;
+
+import java.util.List;
+
+public class HudiSchemaCacheValue extends HMSSchemaCacheValue {
+
+ private List colTypes;
+
+ public HudiSchemaCacheValue(List schema, List partitionColumns) {
+ super(schema, partitionColumns);
+ }
+
+ public List getColTypes() {
+ return colTypes;
+ }
+
+ public void setColTypes(List colTypes) {
+ this.colTypes = colTypes;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index 0f38abafaa4d98..3dbbcf03da0452 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -34,12 +34,14 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -251,7 +253,7 @@ public static TablePartitionValues getPartitionValues(Optional ta
return partitionValues;
}
- HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
+ HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();
@@ -281,4 +283,12 @@ public static TablePartitionValues getPartitionValues(Optional ta
}
return partitionValues;
}
+
+ public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) {
+ HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
+ return HiveMetaStoreClientHelper.ugiDoAs(
+ conf,
+ () -> HoodieTableMetaClient.builder()
+ .setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath).build());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
new file mode 100644
index 00000000000000..bbbe87cac87a83
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.Maps;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+
+public class HudiCachedFsViewProcessor {
+ private static final Logger LOG = LogManager.getLogger(HudiCachedFsViewProcessor.class);
+ private final LoadingCache fsViewCache;
+
+ public HudiCachedFsViewProcessor(ExecutorService executor) {
+ CacheFactory partitionCacheFactory = new CacheFactory(
+ OptionalLong.of(28800L),
+ OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+ Config.max_external_table_cache_num,
+ true,
+ null);
+ this.fsViewCache = partitionCacheFactory.buildCache(this::createFsView, null, executor);
+ }
+
+ private HoodieTableFileSystemView createFsView(FsViewKey key) {
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
+ HudiLocalEngineContext ctx = new HudiLocalEngineContext(key.getClient().getStorageConf());
+ return FileSystemViewManager.createInMemoryFileSystemView(ctx, key.getClient(), metadataConfig);
+ }
+
+ public HoodieTableFileSystemView getFsView(String dbName, String tbName, HoodieTableMetaClient hudiClient) {
+ return fsViewCache.get(new FsViewKey(dbName, tbName, hudiClient));
+ }
+
+ public void cleanUp() {
+ fsViewCache.cleanUp();
+ }
+
+ public void invalidateAll() {
+ fsViewCache.invalidateAll();
+ }
+
+ public void invalidateDbCache(String dbName) {
+ fsViewCache.asMap().forEach((k, v) -> {
+ if (k.getDbName().equals(dbName)) {
+ fsViewCache.invalidate(k);
+ }
+ });
+ }
+
+ public void invalidateTableCache(String dbName, String tbName) {
+ fsViewCache.asMap().forEach((k, v) -> {
+ if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) {
+ fsViewCache.invalidate(k);
+ }
+ });
+ }
+
+ private static class FsViewKey {
+ String dbName;
+ String tbName;
+ HoodieTableMetaClient client;
+
+ public FsViewKey(String dbName, String tbName, HoodieTableMetaClient client) {
+ this.dbName = dbName;
+ this.tbName = tbName;
+ this.client = client;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ public HoodieTableMetaClient getClient() {
+ return client;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FsViewKey fsViewKey = (FsViewKey) o;
+ return Objects.equals(dbName, fsViewKey.dbName) && Objects.equals(tbName, fsViewKey.tbName)
+ && Objects.equals(client.getBasePathV2(), fsViewKey.client.getBasePathV2());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tbName, client.getBasePathV2());
+ }
+ }
+
+ public Map> getCacheStats() {
+ Map> res = Maps.newHashMap();
+ res.put("hudi_fs_view_cache",
+ ExternalMetaCacheMgr.getCacheStats(fsViewCache.stats(), fsViewCache.estimatedSize()));
+ return res;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
new file mode 100644
index 00000000000000..07726a54ffe5f1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+
+public class HudiCachedMetaClientProcessor {
+ private static final Logger LOG = LogManager.getLogger(HudiCachedMetaClientProcessor.class);
+ private final LoadingCache hudiTableMetaClientCache;
+
+ public HudiCachedMetaClientProcessor(ExecutorService executor) {
+ CacheFactory partitionCacheFactory = new CacheFactory(
+ OptionalLong.of(28800L),
+ OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+ Config.max_external_table_cache_num,
+ true,
+ null);
+
+ this.hudiTableMetaClientCache =
+ partitionCacheFactory.buildCache(
+ this::createHoodieTableMetaClient,
+ null,
+ executor);
+ }
+
+ private HoodieTableMetaClient createHoodieTableMetaClient(HudiCachedClientKey key) {
+ LOG.debug("create hudi table meta client for {}.{}", key.getDbName(), key.getTbName());
+ HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(key.getConf());
+ return HiveMetaStoreClientHelper.ugiDoAs(
+ key.getConf(),
+ () -> HoodieTableMetaClient
+ .builder()
+ .setConf(hadoopStorageConfiguration)
+ .setBasePath(key.getHudiBasePath())
+ .build());
+ }
+
+ public HoodieTableMetaClient getHoodieTableMetaClient(
+ String dbName, String tbName, String hudiBasePath, Configuration conf) {
+ return hudiTableMetaClientCache.get(new HudiCachedClientKey(dbName, tbName, hudiBasePath, conf));
+ }
+
+ public void cleanUp() {
+ hudiTableMetaClientCache.cleanUp();
+ }
+
+ public void invalidateAll() {
+ hudiTableMetaClientCache.invalidateAll();
+ }
+
+ public void invalidateDbCache(String dbName) {
+ hudiTableMetaClientCache.asMap().forEach((k, v) -> {
+ if (k.getDbName().equals(dbName)) {
+ hudiTableMetaClientCache.invalidate(k);
+ }
+ });
+ }
+
+ public void invalidateTableCache(String dbName, String tbName) {
+ hudiTableMetaClientCache.asMap().forEach((k, v) -> {
+ if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) {
+ hudiTableMetaClientCache.invalidate(k);
+ }
+ });
+ }
+
+ private static class HudiCachedClientKey {
+ String dbName;
+ String tbName;
+ String hudiBasePath;
+ Configuration conf;
+
+ public HudiCachedClientKey(String dbName, String tbName, String hudiBasePath, Configuration conf) {
+ this.dbName = dbName;
+ this.tbName = tbName;
+ this.hudiBasePath = hudiBasePath;
+ this.conf = conf;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ public String getHudiBasePath() {
+ return hudiBasePath;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HudiCachedClientKey that = (HudiCachedClientKey) o;
+ return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName)
+ && Objects.equals(hudiBasePath, that.hudiBasePath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tbName, hudiBasePath);
+ }
+ }
+
+ public Map> getCacheStats() {
+ Map> res = Maps.newHashMap();
+ res.put("hudi_meta_client_cache", ExternalMetaCacheMgr.getCacheStats(hudiTableMetaClientCache.stats(),
+ hudiTableMetaClientCache.estimatedSize()));
+ return res;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
new file mode 100644
index 00000000000000..4ede5c73cfaa97
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+public class HudiMetadataCacheMgr {
+ private final Map partitionProcessors = Maps.newConcurrentMap();
+ private final Map fsViewProcessors = Maps.newConcurrentMap();
+ private final Map metaClientProcessors = Maps.newConcurrentMap();
+
+ private final ExecutorService executor;
+
+ public HudiMetadataCacheMgr(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) {
+ return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
+ if (catalog instanceof HMSExternalCatalog) {
+ return new HudiCachedPartitionProcessor(catalogId, executor);
+ } else {
+ throw new RuntimeException("Hudi only supports hive(or compatible) catalog now");
+ }
+ });
+ }
+
+ public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
+ return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
+ if (catalog instanceof HMSExternalCatalog) {
+ return new HudiCachedFsViewProcessor(executor);
+ } else {
+ throw new RuntimeException("Hudi only supports hive(or compatible) catalog now");
+ }
+ });
+ }
+
+ public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(ExternalCatalog catalog) {
+ return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
+ if (catalog instanceof HMSExternalCatalog) {
+ return new HudiCachedMetaClientProcessor(executor);
+ } else {
+ throw new RuntimeException("Hudi only supports hive(or compatible) catalog now");
+ }
+ });
+ }
+
+ public void removeCache(long catalogId) {
+ HudiPartitionProcessor partitionProcessor = partitionProcessors.remove(catalogId);
+ if (partitionProcessor != null) {
+ partitionProcessor.cleanUp();
+ }
+ HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId);
+ if (fsViewProcessor != null) {
+ fsViewProcessor.cleanUp();
+ }
+ HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.remove(catalogId);
+ if (metaClientProcessor != null) {
+ metaClientProcessor.cleanUp();
+ }
+ }
+
+ public void invalidateCatalogCache(long catalogId) {
+ HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+ if (processor != null) {
+ processor.cleanUp();
+ }
+ HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId);
+ if (fsViewProcessor != null) {
+ fsViewProcessor.invalidateAll();
+ }
+ HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId);
+ if (metaClientProcessor != null) {
+ metaClientProcessor.invalidateAll();
+ }
+ }
+
+ public void invalidateDbCache(long catalogId, String dbName) {
+ HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+ if (processor != null) {
+ processor.cleanDatabasePartitions(dbName);
+ }
+ HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId);
+ if (fsViewProcessor != null) {
+ fsViewProcessor.invalidateDbCache(dbName);
+ }
+ HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId);
+ if (metaClientProcessor != null) {
+ metaClientProcessor.invalidateDbCache(dbName);
+ }
+ }
+
+ public void invalidateTableCache(long catalogId, String dbName, String tblName) {
+ HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+ if (processor != null) {
+ processor.cleanTablePartitions(dbName, tblName);
+ }
+ HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId);
+ if (fsViewProcessor != null) {
+ fsViewProcessor.invalidateTableCache(dbName, tblName);
+ }
+ HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId);
+ if (metaClientProcessor != null) {
+ metaClientProcessor.invalidateTableCache(dbName, tblName);
+ }
+ }
+
+ public Map> getCacheStats(ExternalCatalog catalog) {
+ Map> res = Maps.newHashMap();
+
+ HudiCachedPartitionProcessor partitionProcessor = (HudiCachedPartitionProcessor) getPartitionProcessor(catalog);
+ res.putAll(partitionProcessor.getCacheStats());
+
+ HudiCachedFsViewProcessor fsViewProcessor = getFsViewProcessor(catalog);
+ res.putAll(fsViewProcessor.getCacheStats());
+
+ HudiCachedMetaClientProcessor metaClientProcessor = getHudiMetaClientProcessor(catalog);
+ res.putAll(metaClientProcessor.getCacheStats());
+
+ return res;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java
deleted file mode 100644
index b6bfd0ec499427..00000000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.hudi.source;
-
-import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.hive.HMSExternalCatalog;
-
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-public class HudiPartitionMgr {
- private final Map partitionProcessors = Maps.newConcurrentMap();
- private final ExecutorService executor;
-
- public HudiPartitionMgr(ExecutorService executor) {
- this.executor = executor;
- }
-
- public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) {
- return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
- if (catalog instanceof HMSExternalCatalog) {
- return new HudiCachedPartitionProcessor(catalogId, executor);
- } else {
- throw new RuntimeException("Hudi only supports hive(or compatible) catalog now");
- }
- });
- }
-
- public void removePartitionProcessor(long catalogId) {
- HudiPartitionProcessor processor = partitionProcessors.remove(catalogId);
- if (processor != null) {
- processor.cleanUp();
- }
- }
-
- public void cleanPartitionProcess(long catalogId) {
- HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
- if (processor != null) {
- processor.cleanUp();
- }
- }
-
- public void cleanDatabasePartitions(long catalogId, String dbName) {
- HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
- if (processor != null) {
- processor.cleanDatabasePartitions(dbName);
- }
- }
-
- public void cleanTablePartitions(long catalogId, String dbName, String tblName) {
- HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
- if (processor != null) {
- processor.cleanTablePartitions(dbName, tblName);
- }
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index b1eb47095f33c4..35c9905d6e8a7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -20,6 +20,7 @@
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
@@ -28,13 +29,15 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
-import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -48,21 +51,17 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.avro.Schema;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,7 +70,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -110,6 +108,7 @@ public class HudiScanNode extends HiveScanNode {
private boolean incrementalRead = false;
private TableScanParams scanParams;
private IncrementalRelation incrementalRelation;
+ private HoodieTableFileSystemView fsView;
/**
* External file scan node for Query Hudi table
@@ -159,25 +158,16 @@ protected void doInitialize() throws UserException {
initBackendPolicy();
initSchemaParams();
- hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
+ hudiClient = hmsTable.getHudiClient();
hudiClient.reloadActiveTimeline();
basePath = hmsTable.getRemoteTable().getSd().getLocation();
inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
- columnNames = new ArrayList<>();
- columnTypes = new ArrayList<>();
- TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient);
- Schema hudiSchema;
- try {
- hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
- } catch (Exception e) {
- throw new UserException("Cannot get hudi table schema.");
- }
- for (Schema.Field hudiField : hudiSchema.getFields()) {
- columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
- String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema());
- columnTypes.add(columnType);
- }
+ ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog());
+ Optional schemaCacheValue = cache.getSchemaValue(table.getDbName(), table.getName());
+ HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue) schemaCacheValue.get();
+ columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
+ columnTypes = hudiSchemaCacheValue.getColTypes();
if (scanParams != null && !scanParams.incrementalRead()) {
// Only support incremental read
@@ -218,6 +208,10 @@ protected void doInitialize() throws UserException {
}
queryInstant = snapshotInstant.get().getTimestamp();
}
+ fsView = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getFsViewProcessor(hmsTable.getCatalog())
+ .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient);
}
@Override
@@ -326,23 +320,17 @@ private List getIncrementalSplits() {
}
private void getPartitionSplits(HivePartition partition, List splits) throws IOException {
- String globPath;
+
String partitionName;
if (partition.isDummyPartition()) {
partitionName = "";
- globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new StoragePath(partition.getPath()));
- globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
- List statuses = FSUtils.getGlobStatusExcludingMetaFolder(
- hudiClient.getRawHoodieStorage(), new StoragePath(globPath));
- HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
- timeline, statuses);
if (canUseNativeReader()) {
- fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
+ fsView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
@@ -352,7 +340,7 @@ private void getPartitionSplits(HivePartition partition, List splits) thr
new String[0], partition.getPartitionValues()));
});
} else {
- fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+ fsView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 51e68eb07631ae..b742142093bdf1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -187,7 +187,7 @@ public LogicalHudiScan withScanParams(HMSExternalTable table, TableScanParams sc
optParams.put(k, v);
}
});
- HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(table);
+ HoodieTableMetaClient hudiClient = table.getHudiClient();
try {
boolean isCowOrRoTable = table.isHoodieCowTable();
if (isCowOrRoTable) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 53b1da3292f70a..b96ec3ef9b9d3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -56,7 +56,7 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
-import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;
+import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
@@ -1355,9 +1355,8 @@ private static TFetchSchemaTableDataResult metaCacheStatsMetadataResult(TSchemaT
fillBatch(dataBatch, cache.getStats(), catalog.getName());
}
// 2. hudi cache
- HudiCachedPartitionProcessor processor
- = (HudiCachedPartitionProcessor) mgr.getHudiPartitionProcess(catalog);
- fillBatch(dataBatch, processor.getCacheStats(), catalog.getName());
+ HudiMetadataCacheMgr hudiMetadataCacheMgr = mgr.getHudiMetadataCacheMgr();
+ fillBatch(dataBatch, hudiMetadataCacheMgr.getCacheStats(catalog), catalog.getName());
} else if (catalogIf instanceof IcebergExternalCatalog) {
// 3. iceberg cache
IcebergMetadataCache icebergCache = mgr.getIcebergMetadataCache();
diff --git a/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
new file mode 100644
index 00000000000000..b0624fc6ea0d34
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Copy code from:
+// https://github.com/apache/hbase/blob/rel/2.4.9/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+// to solve hudi dependency with this class
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
+ * as well as closing streams. Initialization is not thread-safe, but normal operation is;
+ * see method comments.
+ */
+@InterfaceAudience.Private
+public class FSDataInputStreamWrapper implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
+ private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
+
+ private final HFileSystem hfs;
+ private final Path path;
+ private final FileLink link;
+ private final boolean doCloseStreams;
+ private final boolean dropBehind;
+ private final long readahead;
+
+ /** Two stream handles, one with and one without FS-level checksum.
+ * HDFS checksum setting is on FS level, not single read level, so you have to keep two
+ * FS objects and two handles open to interleave different reads freely, which is very sad.
+ * This is what we do:
+ * 1) First, we need to read the trailer of HFile to determine checksum parameters.
+ * We always use FS checksum to do that, so ctor opens {@link #stream}.
+ * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
+ * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
+ * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
+ * if they don't, (2.1) will be the default.
+ * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
+ * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
+ * return both in one call). This stream is guaranteed to be set.
+ * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
+ * That will take lock, and open {@link #stream}. While this is going on, others will
+ * continue to use the old stream; if they also want to fall back, they'll also call
+ * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
+ * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
+ * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
+ */
+ private volatile FSDataInputStream stream = null;
+ private volatile FSDataInputStream streamNoFsChecksum = null;
+ private final Object streamNoFsChecksumFirstCreateLock = new Object();
+
+ // The configuration states that we should validate hbase checksums
+ private boolean useHBaseChecksumConfigured;
+
+ // Record the current state of this reader with respect to
+ // validating checkums in HBase. This is originally set the same
+ // value as useHBaseChecksumConfigured, but can change state as and when
+ // we encounter checksum verification failures.
+ private volatile boolean useHBaseChecksum;
+
+ // In the case of a checksum failure, do these many succeeding
+ // reads without hbase checksum verification.
+ private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
+
+ private final static ReadStatistics readStatistics = new ReadStatistics();
+
+ private static class ReadStatistics {
+ long totalBytesRead;
+ long totalLocalBytesRead;
+ long totalShortCircuitBytesRead;
+ long totalZeroCopyBytesRead;
+ }
+
+ private Boolean instanceOfCanUnbuffer = null;
+ private CanUnbuffer unbuffer = null;
+
+ public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
+ this(fs, path, false, -1L);
+ }
+
+ public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
+ this(fs, null, path, dropBehind, readahead);
+ }
+
+ public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+ boolean dropBehind, long readahead) throws IOException {
+ this(fs, link, null, dropBehind, readahead);
+ }
+
+ private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
+ long readahead) throws IOException {
+ assert (path == null) != (link == null);
+ this.path = path;
+ this.link = link;
+ this.doCloseStreams = true;
+ this.dropBehind = dropBehind;
+ this.readahead = readahead;
+ // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
+ // that wraps over the specified fs. In this case, we will not be able to avoid
+ // checksumming inside the filesystem.
+ this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
+
+ // Initially we are going to read the tail block. Open the reader w/FS checksum.
+ this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
+ this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ setStreamOptions(stream);
+ }
+
+ private void setStreamOptions(FSDataInputStream in) {
+ try {
+ in.setDropBehind(dropBehind);
+ } catch (Exception e) {
+ // Skipped.
+ }
+ if (readahead >= 0) {
+ try {
+ in.setReadahead(readahead);
+ } catch (Exception e) {
+ // Skipped.
+ }
+ }
+ }
+
+ /**
+ * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
+ * reads finish and before any other reads start (what happens in reality is we read the
+ * tail, then call this based on what's in the tail, then read blocks).
+ * @param forceNoHBaseChecksum Force not using HBase checksum.
+ */
+ public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
+ if (hfs == null) return;
+ assert this.stream != null && !this.useHBaseChecksumConfigured;
+ boolean useHBaseChecksum =
+ !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
+
+ if (useHBaseChecksum) {
+ FileSystem fsNc = hfs.getNoChecksumFs();
+ this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
+ setStreamOptions(streamNoFsChecksum);
+ this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
+ // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
+ this.stream.close();
+ this.stream = null;
+ }
+ }
+
+ /** For use in tests. */
+ public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
+ this(fsdis, fsdis);
+ }
+
+ /** For use in tests. */
+ public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
+ doCloseStreams = false;
+ stream = fsdis;
+ streamNoFsChecksum = noChecksum;
+ path = null;
+ link = null;
+ hfs = null;
+ useHBaseChecksumConfigured = useHBaseChecksum = false;
+ dropBehind = false;
+ readahead = 0;
+ }
+
+ /**
+ * @return Whether we are presently using HBase checksum.
+ */
+ public boolean shouldUseHBaseChecksum() {
+ return this.useHBaseChecksum;
+ }
+
+ /**
+ * Get the stream to use. Thread-safe.
+ * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
+ * at some point in the past, otherwise the result is undefined.
+ */
+ public FSDataInputStream getStream(boolean useHBaseChecksum) {
+ return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
+ }
+
+ /**
+ * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
+ * @param offCount For how many checksumOk calls to turn off the HBase checksum.
+ */
+ public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
+ // checksumOffCount is speculative, but let's try to reset it less.
+ boolean partOfConvoy = false;
+ if (this.stream == null) {
+ synchronized (streamNoFsChecksumFirstCreateLock) {
+ partOfConvoy = (this.stream != null);
+ if (!partOfConvoy) {
+ this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ }
+ }
+ }
+ if (!partOfConvoy) {
+ this.useHBaseChecksum = false;
+ this.hbaseChecksumOffCount.set(offCount);
+ }
+ return this.stream;
+ }
+
+ /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
+ public void checksumOk() {
+ if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
+ && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) {
+ // The stream we need is already open (because we were using HBase checksum in the past).
+ assert this.streamNoFsChecksum != null;
+ this.useHBaseChecksum = true;
+ }
+ }
+
+ private void updateInputStreamStatistics(FSDataInputStream stream) {
+ // If the underlying file system is HDFS, update read statistics upon close.
+ if (stream instanceof HdfsDataInputStream) {
+ /**
+ * Because HDFS ReadStatistics is calculated per input stream, it is not
+ * feasible to update the aggregated number in real time. Instead, the
+ * metrics are updated when an input stream is closed.
+ */
+ HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
+ synchronized (readStatistics) {
+ readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
+ getTotalBytesRead();
+ readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
+ getTotalLocalBytesRead();
+ readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
+ getTotalShortCircuitBytesRead();
+ readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
+ getTotalZeroCopyBytesRead();
+ }
+ }
+ }
+
+ public static long getTotalBytesRead() {
+ synchronized (readStatistics) {
+ return readStatistics.totalBytesRead;
+ }
+ }
+
+ public static long getLocalBytesRead() {
+ synchronized (readStatistics) {
+ return readStatistics.totalLocalBytesRead;
+ }
+ }
+
+ public static long getShortCircuitBytesRead() {
+ synchronized (readStatistics) {
+ return readStatistics.totalShortCircuitBytesRead;
+ }
+ }
+
+ public static long getZeroCopyBytesRead() {
+ synchronized (readStatistics) {
+ return readStatistics.totalZeroCopyBytesRead;
+ }
+ }
+
+ /** CloseClose stream(s) if necessary. */
+ @Override
+ public void close() {
+ if (!doCloseStreams) {
+ return;
+ }
+ updateInputStreamStatistics(this.streamNoFsChecksum);
+ // we do not care about the close exception as it is for reading, no data loss issue.
+ Closeables.closeQuietly(streamNoFsChecksum);
+
+
+ updateInputStreamStatistics(stream);
+ Closeables.closeQuietly(stream);
+ }
+
+ public HFileSystem getHfs() {
+ return this.hfs;
+ }
+
+ /**
+ * This will free sockets and file descriptors held by the stream only when the stream implements
+ * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
+ * using this stream to read the blocks have finished reading. If by chance the stream is
+ * unbuffered and there are clients still holding this stream for read then on next client read
+ * request a new socket will be opened by Datanode without client knowing about it and will serve
+ * its read request. Note: If this socket is idle for some time then the DataNode will close the
+ * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
+ * stream, the current socket will be closed and a new socket will be opened to serve the
+ * requests.
+ */
+ @SuppressWarnings({ "rawtypes" })
+ public void unbuffer() {
+ FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
+ if (stream != null) {
+ InputStream wrappedStream = stream.getWrappedStream();
+ // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
+ // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
+ // CanUnbuffer interface or not and based on that call the unbuffer api.
+ final Class extends InputStream> streamClass = wrappedStream.getClass();
+ if (this.instanceOfCanUnbuffer == null) {
+ // To ensure we compute whether the stream is instance of CanUnbuffer only once.
+ this.instanceOfCanUnbuffer = false;
+ if (wrappedStream instanceof CanUnbuffer) {
+ this.unbuffer = (CanUnbuffer) wrappedStream;
+ this.instanceOfCanUnbuffer = true;
+ }
+ }
+ if (this.instanceOfCanUnbuffer) {
+ try {
+ this.unbuffer.unbuffer();
+ } catch (UnsupportedOperationException e){
+ if (isLogTraceEnabled) {
+ LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
+ + " . So there may be the stream does not support unbuffering.", e);
+ }
+ }
+ } else {
+ if (isLogTraceEnabled) {
+ LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
+ }
+ }
+ }
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
new file mode 100644
index 00000000000000..d636f0bf84d53d
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalDatabase;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class HudiUtilsTest {
+
+ @Test
+ public void testGetHudiSchemaWithCleanCommit() throws IOException {
+
+ /*
+ example table:
+ CREATE TABLE tbx (
+ c1 INT)
+ USING hudi
+ TBLPROPERTIES (
+ 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',
+ 'hoodie.clean.automatic' = 'true',
+ 'hoodie.cleaner.commits.retained' = '2'
+ );
+ */
+
+ String commitContent1 = "{\n"
+ + " \"partitionToWriteStats\" : {\n"
+ + " \"\" : [ {\n"
+ + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n"
+ + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2164-2318_20241219214517936.parquet\",\n"
+ + " \"cdcStats\" : null,\n"
+ + " \"prevCommit\" : \"20241219214431757\",\n"
+ + " \"numWrites\" : 2,\n"
+ + " \"numDeletes\" : 0,\n"
+ + " \"numUpdateWrites\" : 0,\n"
+ + " \"numInserts\" : 1,\n"
+ + " \"totalWriteBytes\" : 434370,\n"
+ + " \"totalWriteErrors\" : 0,\n"
+ + " \"tempPath\" : null,\n"
+ + " \"partitionPath\" : \"\",\n"
+ + " \"totalLogRecords\" : 0,\n"
+ + " \"totalLogFilesCompacted\" : 0,\n"
+ + " \"totalLogSizeCompacted\" : 0,\n"
+ + " \"totalUpdatedRecordsCompacted\" : 0,\n"
+ + " \"totalLogBlocks\" : 0,\n"
+ + " \"totalCorruptLogBlock\" : 0,\n"
+ + " \"totalRollbackBlocks\" : 0,\n"
+ + " \"fileSizeInBytes\" : 434370,\n"
+ + " \"minEventTime\" : null,\n"
+ + " \"maxEventTime\" : null,\n"
+ + " \"runtimeStats\" : {\n"
+ + " \"totalScanTime\" : 0,\n"
+ + " \"totalUpsertTime\" : 87,\n"
+ + " \"totalCreateTime\" : 0\n"
+ + " }\n"
+ + " } ]\n"
+ + " },\n"
+ + " \"compacted\" : false,\n"
+ + " \"extraMetadata\" : {\n"
+ + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n"
+ + " },\n"
+ + " \"operationType\" : \"INSERT\"\n"
+ + "}";
+
+ String commitContent2 = "{\n"
+ + " \"partitionToWriteStats\" : {\n"
+ + " \"\" : [ {\n"
+ + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n"
+ + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2180-2334_20241219214518880.parquet\",\n"
+ + " \"cdcStats\" : null,\n"
+ + " \"prevCommit\" : \"20241219214517936\",\n"
+ + " \"numWrites\" : 3,\n"
+ + " \"numDeletes\" : 0,\n"
+ + " \"numUpdateWrites\" : 0,\n"
+ + " \"numInserts\" : 1,\n"
+ + " \"totalWriteBytes\" : 434397,\n"
+ + " \"totalWriteErrors\" : 0,\n"
+ + " \"tempPath\" : null,\n"
+ + " \"partitionPath\" : \"\",\n"
+ + " \"totalLogRecords\" : 0,\n"
+ + " \"totalLogFilesCompacted\" : 0,\n"
+ + " \"totalLogSizeCompacted\" : 0,\n"
+ + " \"totalUpdatedRecordsCompacted\" : 0,\n"
+ + " \"totalLogBlocks\" : 0,\n"
+ + " \"totalCorruptLogBlock\" : 0,\n"
+ + " \"totalRollbackBlocks\" : 0,\n"
+ + " \"fileSizeInBytes\" : 434397,\n"
+ + " \"minEventTime\" : null,\n"
+ + " \"maxEventTime\" : null,\n"
+ + " \"runtimeStats\" : {\n"
+ + " \"totalScanTime\" : 0,\n"
+ + " \"totalUpsertTime\" : 86,\n"
+ + " \"totalCreateTime\" : 0\n"
+ + " }\n"
+ + " } ]\n"
+ + " },\n"
+ + " \"compacted\" : false,\n"
+ + " \"extraMetadata\" : {\n"
+ + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n"
+ + " },\n"
+ + " \"operationType\" : \"INSERT\"\n"
+ + "}";
+
+ String propContent = "#Updated at 2024-12-19T13:44:32.166Z\n"
+ + "#Thu Dec 19 21:44:32 CST 2024\n"
+ + "hoodie.datasource.write.drop.partition.columns=false\n"
+ + "hoodie.table.type=COPY_ON_WRITE\n"
+ + "hoodie.archivelog.folder=archived\n"
+ + "hoodie.timeline.layout.version=1\n"
+ + "hoodie.table.version=6\n"
+ + "hoodie.table.metadata.partitions=files\n"
+ + "hoodie.database.name=mmc_hudi\n"
+ + "hoodie.datasource.write.partitionpath.urlencode=false\n"
+ + "hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator\n"
+ + "hoodie.table.name=tbx\n"
+ + "hoodie.table.metadata.partitions.inflight=\n"
+ + "hoodie.datasource.write.hive_style_partitioning=true\n"
+ + "hoodie.table.checksum=1632286010\n"
+ + "hoodie.table.create.schema={\"type\"\\:\"record\",\"name\"\\:\"tbx_record\",\"namespace\"\\:\"hoodie.tbx\",\"fields\"\\:[{\"name\"\\:\"c1\",\"type\"\\:[\"int\",\"null\"]}]}";
+
+
+ // 1. prepare table path
+ Path hudiTable = Files.createTempDirectory("hudiTable");
+ File meta = new File(hudiTable + "/.hoodie");
+ Assert.assertTrue(meta.mkdirs());
+
+ new MockUp(HMSExternalTable.class) {
+ @Mock
+ public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() {
+ Table table = new Table();
+ StorageDescriptor storageDescriptor = new StorageDescriptor();
+ storageDescriptor.setLocation("file://" + hudiTable.toAbsolutePath());
+ table.setSd(storageDescriptor);
+ return table;
+ }
+ };
+
+ // 2. generate properties and commit
+ File prop = new File(meta + "/hoodie.properties");
+ Files.write(prop.toPath(), propContent.getBytes());
+ File commit1 = new File(meta + "/1.commit");
+ Files.write(commit1.toPath(), commitContent1.getBytes());
+
+ // 3. now, we can get the schema from this table.
+ HMSExternalCatalog catalog = new HMSExternalCatalog();
+ HMSExternalDatabase db = new HMSExternalDatabase(catalog, 1, "db", "db");
+ HMSExternalTable hmsExternalTable = new HMSExternalTable(2, "tb", "tb", catalog, db);
+ HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable);
+
+ // 4. delete the commit file,
+ // this operation is used to imitate the clean operation in hudi
+ Assert.assertTrue(commit1.delete());
+
+ // 5. generate a new commit
+ File commit2 = new File(meta + "/2.commit");
+ Files.write(commit2.toPath(), commitContent2.getBytes());
+
+ // 6. we should get schema correctly
+ // because we will refresh timeline in this `getHudiTableSchema` method,
+ // and we can get the latest commit.
+ // so that this error: `Could not read commit details from file /.hoodie/1.commit` will be not reported.
+ HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable);
+
+ // 7. clean up
+ Assert.assertTrue(commit2.delete());
+ Assert.assertTrue(prop.delete());
+ Assert.assertTrue(meta.delete());
+ Files.delete(hudiTable);
+ }
+}