From 01dc5bcd2764870b3ae772daa4110f91f6182bdd Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 27 Dec 2024 16:32:22 +0800 Subject: [PATCH] [opt](hms table)Some optimizations for hms external table for 3.0 (#44909) (#46086) bp: #44909 --- fe/check/checkstyle/suppressions.xml | 1 + .../datasource/ExternalMetaCacheMgr.java | 30 +- .../datasource/hive/HMSExternalTable.java | 46 ++- .../hive/HiveMetaStoreClientHelper.java | 20 +- .../datasource/hudi/HudiSchemaCacheValue.java | 40 ++ .../doris/datasource/hudi/HudiUtils.java | 12 +- .../source/HudiCachedFsViewProcessor.java | 134 +++++++ .../source/HudiCachedMetaClientProcessor.java | 152 ++++++++ .../hudi/source/HudiMetadataCacheMgr.java | 143 +++++++ .../hudi/source/HudiPartitionMgr.java | 73 ---- .../datasource/hudi/source/HudiScanNode.java | 48 +-- .../trees/plans/logical/LogicalHudiScan.java | 2 +- .../tablefunction/MetadataGenerator.java | 7 +- .../hbase/io/FSDataInputStreamWrapper.java | 368 ++++++++++++++++++ .../doris/datasource/hudi/HudiUtilsTest.java | 198 ++++++++++ 15 files changed, 1132 insertions(+), 142 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java create mode 100644 fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 84e6666c0c2b32..436b598b594d59 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -64,4 +64,5 @@ under the License. + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 48d170b59166b4..9ad78816278de1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -25,7 +25,9 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; -import org.apache.doris.datasource.hudi.source.HudiPartitionMgr; +import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor; +import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor; +import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr; @@ -88,7 +90,7 @@ public class ExternalMetaCacheMgr { // catalog id -> table schema cache private Map schemaCacheMap = Maps.newHashMap(); // hudi partition manager - private final HudiPartitionMgr hudiPartitionMgr; + private final HudiMetadataCacheMgr hudiMetadataCacheMgr; // all catalogs could share the same fsCache. private FileSystemCache fsCache; // all external table row count cache. @@ -123,7 +125,7 @@ public ExternalMetaCacheMgr() { fsCache = new FileSystemCache(); rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor); - hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); + hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); @@ -165,7 +167,19 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) { } public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) { - return hudiPartitionMgr.getPartitionProcessor(catalog); + return hudiMetadataCacheMgr.getPartitionProcessor(catalog); + } + + public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) { + return hudiMetadataCacheMgr.getFsViewProcessor(catalog); + } + + public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) { + return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog); + } + + public HudiMetadataCacheMgr getHudiMetadataCacheMgr() { + return hudiMetadataCacheMgr; } public IcebergMetadataCache getIcebergMetadataCache() { @@ -195,7 +209,7 @@ public void removeCache(long catalogId) { if (schemaCacheMap.remove(catalogId) != null) { LOG.info("remove schema cache for catalog {}", catalogId); } - hudiPartitionMgr.removePartitionProcessor(catalogId); + hudiMetadataCacheMgr.removeCache(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); paimonMetadataCacheMgr.removeCache(catalogId); @@ -211,7 +225,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) if (metaCache != null) { metaCache.invalidateTableCache(dbName, tblName); } - hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); + hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); @@ -230,7 +244,7 @@ public void invalidateDbCache(long catalogId, String dbName) { if (metaCache != null) { metaCache.invalidateDbCache(dbName); } - hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); + hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); @@ -248,7 +262,7 @@ public void invalidateCatalogCache(long catalogId) { if (metaCache != null) { metaCache.invalidateAll(); } - hudiPartitionMgr.cleanPartitionProcess(catalogId); + hudiMetadataCacheMgr.invalidateCatalogCache(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 26019350fd072d..71c7308b079866 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.MvccSnapshot; @@ -74,6 +75,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -532,34 +534,38 @@ public long getLastDdlTime() { @Override public Optional initSchema() { makeSureInitialized(); - List columns; if (dlaType.equals(DLAType.ICEBERG)) { - columns = getIcebergSchema(); + return getIcebergSchema(); } else if (dlaType.equals(DLAType.HUDI)) { - columns = getHudiSchema(); + return getHudiSchema(); } else { - columns = getHiveSchema(); + return getHiveSchema(); } - List partitionColumns = initPartitionColumns(columns); - return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } - private List getIcebergSchema() { - return IcebergUtils.getSchema(catalog, dbName, name); + private Optional getIcebergSchema() { + List columns = IcebergUtils.getSchema(catalog, dbName, name); + List partitionColumns = initPartitionColumns(columns); + return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } - private List getHudiSchema() { + private Optional getHudiSchema() { org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this); List tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size()); + List colTypes = Lists.newArrayList(); for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) { String columnName = hudiField.name().toLowerCase(Locale.ROOT); tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()), true, null, true, null, "", true, null, -1, null)); + colTypes.add(HudiUtils.convertAvroToHiveType(hudiField.schema())); } - return tmpSchema; + List partitionColumns = initPartitionColumns(tmpSchema); + HudiSchemaCacheValue hudiSchemaCacheValue = new HudiSchemaCacheValue(tmpSchema, partitionColumns); + hudiSchemaCacheValue.setColTypes(colTypes); + return Optional.of(hudiSchemaCacheValue); } - private List getHiveSchema() { + private Optional getHiveSchema() { HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient(); List schema = client.getSchema(dbName, name); Map colDefaultValues = client.getDefaultColumnValues(dbName, name); @@ -571,7 +577,8 @@ private List getHiveSchema() { HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, defaultValue, field.getComment(), true, -1)); } - return columns; + List partitionColumns = initPartitionColumns(columns); + return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } @Override @@ -649,9 +656,7 @@ public Optional getColumnStatistic(String colName) { break; } default: - if (LOG.isDebugEnabled()) { - LOG.debug("get column stats for dlaType {} is not supported.", dlaType); - } + LOG.warn("get column stats for dlaType {} is not supported.", dlaType); } return Optional.empty(); } @@ -1016,4 +1021,15 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { Env.getCurrentEnv().getRefreshManager() .refreshTable(getCatalog().getName(), getDbName(), getName(), true); } + + public HoodieTableMetaClient getHudiClient() { + return Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getMetaClientProcessor(getCatalog()) + .getHoodieTableMetaClient( + getDbName(), + getName(), + getRemoteTable().getSd().getLocation(), + getCatalog().getConfiguration()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 706bd653a85e21..a66bf2cb3906b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -63,7 +63,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -805,9 +804,18 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table } public static Schema getHudiTableSchema(HMSExternalTable table) { - HoodieTableMetaClient metaClient = getHudiClient(table); + HoodieTableMetaClient metaClient = table.getHudiClient(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); Schema hudiSchema; + + // Here, the timestamp should be reloaded again. + // Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file, + // which is saved in the `metaClient`. + // But the `metaClient` is obtained from cache, so the file obtained may be an old file. + // This file may be deleted by hudi clean task, and an error will be reported. + // So, we should reload timeline so that we can read the latest commit files. + metaClient.reloadActiveTimeline(); + try { hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); } catch (Exception e) { @@ -833,14 +841,6 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act } } - public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { - String hudiBasePath = table.getRemoteTable().getSd().getLocation(); - Configuration conf = getConfiguration(table); - HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); - return ugiDoAs(conf, () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration) - .setBasePath(hudiBasePath).build()); - } - public static Configuration getConfiguration(HMSExternalTable table) { return table.getCatalog().getConfiguration(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java new file mode 100644 index 00000000000000..8c58ffa2006f16 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.hive.HMSSchemaCacheValue; + +import java.util.List; + +public class HudiSchemaCacheValue extends HMSSchemaCacheValue { + + private List colTypes; + + public HudiSchemaCacheValue(List schema, List partitionColumns) { + super(schema, partitionColumns); + } + + public List getColTypes() { + return colTypes; + } + + public void setColTypes(List colTypes) { + this.colTypes = colTypes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index 0f38abafaa4d98..3dbbcf03da0452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -34,12 +34,14 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -251,7 +253,7 @@ public static TablePartitionValues getPartitionValues(Optional ta return partitionValues; } - HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); + HoodieTableMetaClient hudiClient = hmsTable.getHudiClient(); HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition(); @@ -281,4 +283,12 @@ public static TablePartitionValues getPartitionValues(Optional ta } return partitionValues; } + + public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) { + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); + return HiveMetaStoreClientHelper.ugiDoAs( + conf, + () -> HoodieTableMetaClient.builder() + .setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath).build()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java new file mode 100644 index 00000000000000..bbbe87cac87a83 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Maps; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Objects; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class HudiCachedFsViewProcessor { + private static final Logger LOG = LogManager.getLogger(HudiCachedFsViewProcessor.class); + private final LoadingCache fsViewCache; + + public HudiCachedFsViewProcessor(ExecutorService executor) { + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.fsViewCache = partitionCacheFactory.buildCache(this::createFsView, null, executor); + } + + private HoodieTableFileSystemView createFsView(FsViewKey key) { + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); + HudiLocalEngineContext ctx = new HudiLocalEngineContext(key.getClient().getStorageConf()); + return FileSystemViewManager.createInMemoryFileSystemView(ctx, key.getClient(), metadataConfig); + } + + public HoodieTableFileSystemView getFsView(String dbName, String tbName, HoodieTableMetaClient hudiClient) { + return fsViewCache.get(new FsViewKey(dbName, tbName, hudiClient)); + } + + public void cleanUp() { + fsViewCache.cleanUp(); + } + + public void invalidateAll() { + fsViewCache.invalidateAll(); + } + + public void invalidateDbCache(String dbName) { + fsViewCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName)) { + fsViewCache.invalidate(k); + } + }); + } + + public void invalidateTableCache(String dbName, String tbName) { + fsViewCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) { + fsViewCache.invalidate(k); + } + }); + } + + private static class FsViewKey { + String dbName; + String tbName; + HoodieTableMetaClient client; + + public FsViewKey(String dbName, String tbName, HoodieTableMetaClient client) { + this.dbName = dbName; + this.tbName = tbName; + this.client = client; + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + public HoodieTableMetaClient getClient() { + return client; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FsViewKey fsViewKey = (FsViewKey) o; + return Objects.equals(dbName, fsViewKey.dbName) && Objects.equals(tbName, fsViewKey.tbName) + && Objects.equals(client.getBasePathV2(), fsViewKey.client.getBasePathV2()); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName, client.getBasePathV2()); + } + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("hudi_fs_view_cache", + ExternalMetaCacheMgr.getCacheStats(fsViewCache.stats(), fsViewCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java new file mode 100644 index 00000000000000..07726a54ffe5f1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Objects; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class HudiCachedMetaClientProcessor { + private static final Logger LOG = LogManager.getLogger(HudiCachedMetaClientProcessor.class); + private final LoadingCache hudiTableMetaClientCache; + + public HudiCachedMetaClientProcessor(ExecutorService executor) { + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + + this.hudiTableMetaClientCache = + partitionCacheFactory.buildCache( + this::createHoodieTableMetaClient, + null, + executor); + } + + private HoodieTableMetaClient createHoodieTableMetaClient(HudiCachedClientKey key) { + LOG.debug("create hudi table meta client for {}.{}", key.getDbName(), key.getTbName()); + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(key.getConf()); + return HiveMetaStoreClientHelper.ugiDoAs( + key.getConf(), + () -> HoodieTableMetaClient + .builder() + .setConf(hadoopStorageConfiguration) + .setBasePath(key.getHudiBasePath()) + .build()); + } + + public HoodieTableMetaClient getHoodieTableMetaClient( + String dbName, String tbName, String hudiBasePath, Configuration conf) { + return hudiTableMetaClientCache.get(new HudiCachedClientKey(dbName, tbName, hudiBasePath, conf)); + } + + public void cleanUp() { + hudiTableMetaClientCache.cleanUp(); + } + + public void invalidateAll() { + hudiTableMetaClientCache.invalidateAll(); + } + + public void invalidateDbCache(String dbName) { + hudiTableMetaClientCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName)) { + hudiTableMetaClientCache.invalidate(k); + } + }); + } + + public void invalidateTableCache(String dbName, String tbName) { + hudiTableMetaClientCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) { + hudiTableMetaClientCache.invalidate(k); + } + }); + } + + private static class HudiCachedClientKey { + String dbName; + String tbName; + String hudiBasePath; + Configuration conf; + + public HudiCachedClientKey(String dbName, String tbName, String hudiBasePath, Configuration conf) { + this.dbName = dbName; + this.tbName = tbName; + this.hudiBasePath = hudiBasePath; + this.conf = conf; + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + public String getHudiBasePath() { + return hudiBasePath; + } + + public Configuration getConf() { + return conf; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiCachedClientKey that = (HudiCachedClientKey) o; + return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName) + && Objects.equals(hudiBasePath, that.hudiBasePath); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName, hudiBasePath); + } + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("hudi_meta_client_cache", ExternalMetaCacheMgr.getCacheStats(hudiTableMetaClientCache.stats(), + hudiTableMetaClientCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java new file mode 100644 index 00000000000000..4ede5c73cfaa97 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class HudiMetadataCacheMgr { + private final Map partitionProcessors = Maps.newConcurrentMap(); + private final Map fsViewProcessors = Maps.newConcurrentMap(); + private final Map metaClientProcessors = Maps.newConcurrentMap(); + + private final ExecutorService executor; + + public HudiMetadataCacheMgr(ExecutorService executor) { + this.executor = executor; + } + + public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) { + return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> { + if (catalog instanceof HMSExternalCatalog) { + return new HudiCachedPartitionProcessor(catalogId, executor); + } else { + throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); + } + }); + } + + public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) { + return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> { + if (catalog instanceof HMSExternalCatalog) { + return new HudiCachedFsViewProcessor(executor); + } else { + throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); + } + }); + } + + public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(ExternalCatalog catalog) { + return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId -> { + if (catalog instanceof HMSExternalCatalog) { + return new HudiCachedMetaClientProcessor(executor); + } else { + throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); + } + }); + } + + public void removeCache(long catalogId) { + HudiPartitionProcessor partitionProcessor = partitionProcessors.remove(catalogId); + if (partitionProcessor != null) { + partitionProcessor.cleanUp(); + } + HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.remove(catalogId); + if (fsViewProcessor != null) { + fsViewProcessor.cleanUp(); + } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.remove(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.cleanUp(); + } + } + + public void invalidateCatalogCache(long catalogId) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanUp(); + } + HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId); + if (fsViewProcessor != null) { + fsViewProcessor.invalidateAll(); + } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateAll(); + } + } + + public void invalidateDbCache(long catalogId, String dbName) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanDatabasePartitions(dbName); + } + HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId); + if (fsViewProcessor != null) { + fsViewProcessor.invalidateDbCache(dbName); + } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateDbCache(dbName); + } + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanTablePartitions(dbName, tblName); + } + HudiCachedFsViewProcessor fsViewProcessor = fsViewProcessors.get(catalogId); + if (fsViewProcessor != null) { + fsViewProcessor.invalidateTableCache(dbName, tblName); + } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateTableCache(dbName, tblName); + } + } + + public Map> getCacheStats(ExternalCatalog catalog) { + Map> res = Maps.newHashMap(); + + HudiCachedPartitionProcessor partitionProcessor = (HudiCachedPartitionProcessor) getPartitionProcessor(catalog); + res.putAll(partitionProcessor.getCacheStats()); + + HudiCachedFsViewProcessor fsViewProcessor = getFsViewProcessor(catalog); + res.putAll(fsViewProcessor.getCacheStats()); + + HudiCachedMetaClientProcessor metaClientProcessor = getHudiMetaClientProcessor(catalog); + res.putAll(metaClientProcessor.getCacheStats()); + + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java deleted file mode 100644 index b6bfd0ec499427..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.hudi.source; - -import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.hive.HMSExternalCatalog; - -import com.google.common.collect.Maps; - -import java.util.Map; -import java.util.concurrent.ExecutorService; - -public class HudiPartitionMgr { - private final Map partitionProcessors = Maps.newConcurrentMap(); - private final ExecutorService executor; - - public HudiPartitionMgr(ExecutorService executor) { - this.executor = executor; - } - - public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) { - return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> { - if (catalog instanceof HMSExternalCatalog) { - return new HudiCachedPartitionProcessor(catalogId, executor); - } else { - throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); - } - }); - } - - public void removePartitionProcessor(long catalogId) { - HudiPartitionProcessor processor = partitionProcessors.remove(catalogId); - if (processor != null) { - processor.cleanUp(); - } - } - - public void cleanPartitionProcess(long catalogId) { - HudiPartitionProcessor processor = partitionProcessors.get(catalogId); - if (processor != null) { - processor.cleanUp(); - } - } - - public void cleanDatabasePartitions(long catalogId, String dbName) { - HudiPartitionProcessor processor = partitionProcessors.get(catalogId); - if (processor != null) { - processor.cleanDatabasePartitions(dbName); - } - } - - public void cleanTablePartitions(long catalogId, String dbName, String tblName) { - HudiPartitionProcessor processor = partitionProcessors.get(catalogId); - if (processor != null) { - processor.cleanTablePartitions(dbName, tblName); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index b1eb47095f33c4..35c9905d6e8a7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; @@ -28,13 +29,15 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; -import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -48,21 +51,17 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.avro.Schema; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -71,7 +70,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -110,6 +108,7 @@ public class HudiScanNode extends HiveScanNode { private boolean incrementalRead = false; private TableScanParams scanParams; private IncrementalRelation incrementalRelation; + private HoodieTableFileSystemView fsView; /** * External file scan node for Query Hudi table @@ -159,25 +158,16 @@ protected void doInitialize() throws UserException { initBackendPolicy(); initSchemaParams(); - hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); + hudiClient = hmsTable.getHudiClient(); hudiClient.reloadActiveTimeline(); basePath = hmsTable.getRemoteTable().getSd().getLocation(); inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib(); - columnNames = new ArrayList<>(); - columnTypes = new ArrayList<>(); - TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient); - Schema hudiSchema; - try { - hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); - } catch (Exception e) { - throw new UserException("Cannot get hudi table schema."); - } - for (Schema.Field hudiField : hudiSchema.getFields()) { - columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); - String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema()); - columnTypes.add(columnType); - } + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog()); + Optional schemaCacheValue = cache.getSchemaValue(table.getDbName(), table.getName()); + HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue) schemaCacheValue.get(); + columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList()); + columnTypes = hudiSchemaCacheValue.getColTypes(); if (scanParams != null && !scanParams.incrementalRead()) { // Only support incremental read @@ -218,6 +208,10 @@ protected void doInitialize() throws UserException { } queryInstant = snapshotInstant.get().getTimestamp(); } + fsView = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getFsViewProcessor(hmsTable.getCatalog()) + .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient); } @Override @@ -326,23 +320,17 @@ private List getIncrementalSplits() { } private void getPartitionSplits(HivePartition partition, List splits) throws IOException { - String globPath; + String partitionName; if (partition.isDummyPartition()) { partitionName = ""; - globPath = hudiClient.getBasePathV2().toString() + "/*"; } else { partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), new StoragePath(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); } - List statuses = FSUtils.getGlobStatusExcludingMetaFolder( - hudiClient.getRawHoodieStorage(), new StoragePath(globPath)); - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses); if (canUseNativeReader()) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { + fsView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); long fileSize = baseFile.getFileSize(); @@ -352,7 +340,7 @@ private void getPartitionSplits(HivePartition partition, List splits) thr new String[0], partition.getPartitionValues())); }); } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + fsView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) .forEach(fileSlice -> splits.add( generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java index 51e68eb07631ae..b742142093bdf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java @@ -187,7 +187,7 @@ public LogicalHudiScan withScanParams(HMSExternalTable table, TableScanParams sc optParams.put(k, v); } }); - HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(table); + HoodieTableMetaClient hudiClient = table.getHudiClient(); try { boolean isCowOrRoTable = table.isHoodieCowTable(); if (isCowOrRoTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 53b1da3292f70a..b96ec3ef9b9d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -56,7 +56,7 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; -import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor; +import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; @@ -1355,9 +1355,8 @@ private static TFetchSchemaTableDataResult metaCacheStatsMetadataResult(TSchemaT fillBatch(dataBatch, cache.getStats(), catalog.getName()); } // 2. hudi cache - HudiCachedPartitionProcessor processor - = (HudiCachedPartitionProcessor) mgr.getHudiPartitionProcess(catalog); - fillBatch(dataBatch, processor.getCacheStats(), catalog.getName()); + HudiMetadataCacheMgr hudiMetadataCacheMgr = mgr.getHudiMetadataCacheMgr(); + fillBatch(dataBatch, hudiMetadataCacheMgr.getCacheStats(catalog), catalog.getName()); } else if (catalogIf instanceof IcebergExternalCatalog) { // 3. iceberg cache IcebergMetadataCache icebergCache = mgr.getIcebergMetadataCache(); diff --git a/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java new file mode 100644 index 00000000000000..b0624fc6ea0d34 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Copy code from: +// https://github.com/apache/hbase/blob/rel/2.4.9/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +// to solve hudi dependency with this class + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +/** + * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, + * as well as closing streams. Initialization is not thread-safe, but normal operation is; + * see method comments. + */ +@InterfaceAudience.Private +public class FSDataInputStreamWrapper implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); + private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); + + private final HFileSystem hfs; + private final Path path; + private final FileLink link; + private final boolean doCloseStreams; + private final boolean dropBehind; + private final long readahead; + + /** Two stream handles, one with and one without FS-level checksum. + * HDFS checksum setting is on FS level, not single read level, so you have to keep two + * FS objects and two handles open to interleave different reads freely, which is very sad. + * This is what we do: + * 1) First, we need to read the trailer of HFile to determine checksum parameters. + * We always use FS checksum to do that, so ctor opens {@link #stream}. + * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream}; + * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum}, + * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen; + * if they don't, (2.1) will be the default. + * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to + * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could + * return both in one call). This stream is guaranteed to be set. + * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}. + * That will take lock, and open {@link #stream}. While this is going on, others will + * continue to use the old stream; if they also want to fall back, they'll also call + * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. + * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. + * We will have 2 handles; however we presume checksums fail so rarely that we don't care. + */ + private volatile FSDataInputStream stream = null; + private volatile FSDataInputStream streamNoFsChecksum = null; + private final Object streamNoFsChecksumFirstCreateLock = new Object(); + + // The configuration states that we should validate hbase checksums + private boolean useHBaseChecksumConfigured; + + // Record the current state of this reader with respect to + // validating checkums in HBase. This is originally set the same + // value as useHBaseChecksumConfigured, but can change state as and when + // we encounter checksum verification failures. + private volatile boolean useHBaseChecksum; + + // In the case of a checksum failure, do these many succeeding + // reads without hbase checksum verification. + private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); + + private final static ReadStatistics readStatistics = new ReadStatistics(); + + private static class ReadStatistics { + long totalBytesRead; + long totalLocalBytesRead; + long totalShortCircuitBytesRead; + long totalZeroCopyBytesRead; + } + + private Boolean instanceOfCanUnbuffer = null; + private CanUnbuffer unbuffer = null; + + public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { + this(fs, path, false, -1L); + } + + public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { + this(fs, null, path, dropBehind, readahead); + } + + public FSDataInputStreamWrapper(FileSystem fs, FileLink link, + boolean dropBehind, long readahead) throws IOException { + this(fs, link, null, dropBehind, readahead); + } + + private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, + long readahead) throws IOException { + assert (path == null) != (link == null); + this.path = path; + this.link = link; + this.doCloseStreams = true; + this.dropBehind = dropBehind; + this.readahead = readahead; + // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem + // that wraps over the specified fs. In this case, we will not be able to avoid + // checksumming inside the filesystem. + this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); + + // Initially we are going to read the tail block. Open the reader w/FS checksum. + this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; + this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + setStreamOptions(stream); + } + + private void setStreamOptions(FSDataInputStream in) { + try { + in.setDropBehind(dropBehind); + } catch (Exception e) { + // Skipped. + } + if (readahead >= 0) { + try { + in.setReadahead(readahead); + } catch (Exception e) { + // Skipped. + } + } + } + + /** + * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any + * reads finish and before any other reads start (what happens in reality is we read the + * tail, then call this based on what's in the tail, then read blocks). + * @param forceNoHBaseChecksum Force not using HBase checksum. + */ + public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { + if (hfs == null) return; + assert this.stream != null && !this.useHBaseChecksumConfigured; + boolean useHBaseChecksum = + !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); + + if (useHBaseChecksum) { + FileSystem fsNc = hfs.getNoChecksumFs(); + this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); + setStreamOptions(streamNoFsChecksum); + this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; + // Close the checksum stream; we will reopen it if we get an HBase checksum failure. + this.stream.close(); + this.stream = null; + } + } + + /** For use in tests. */ + public FSDataInputStreamWrapper(FSDataInputStream fsdis) { + this(fsdis, fsdis); + } + + /** For use in tests. */ + public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { + doCloseStreams = false; + stream = fsdis; + streamNoFsChecksum = noChecksum; + path = null; + link = null; + hfs = null; + useHBaseChecksumConfigured = useHBaseChecksum = false; + dropBehind = false; + readahead = 0; + } + + /** + * @return Whether we are presently using HBase checksum. + */ + public boolean shouldUseHBaseChecksum() { + return this.useHBaseChecksum; + } + + /** + * Get the stream to use. Thread-safe. + * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned + * at some point in the past, otherwise the result is undefined. + */ + public FSDataInputStream getStream(boolean useHBaseChecksum) { + return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; + } + + /** + * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. + * @param offCount For how many checksumOk calls to turn off the HBase checksum. + */ + public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { + // checksumOffCount is speculative, but let's try to reset it less. + boolean partOfConvoy = false; + if (this.stream == null) { + synchronized (streamNoFsChecksumFirstCreateLock) { + partOfConvoy = (this.stream != null); + if (!partOfConvoy) { + this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + } + } + } + if (!partOfConvoy) { + this.useHBaseChecksum = false; + this.hbaseChecksumOffCount.set(offCount); + } + return this.stream; + } + + /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ + public void checksumOk() { + if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum + && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) { + // The stream we need is already open (because we were using HBase checksum in the past). + assert this.streamNoFsChecksum != null; + this.useHBaseChecksum = true; + } + } + + private void updateInputStreamStatistics(FSDataInputStream stream) { + // If the underlying file system is HDFS, update read statistics upon close. + if (stream instanceof HdfsDataInputStream) { + /** + * Because HDFS ReadStatistics is calculated per input stream, it is not + * feasible to update the aggregated number in real time. Instead, the + * metrics are updated when an input stream is closed. + */ + HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream; + synchronized (readStatistics) { + readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalBytesRead(); + readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalLocalBytesRead(); + readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalShortCircuitBytesRead(); + readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalZeroCopyBytesRead(); + } + } + } + + public static long getTotalBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalBytesRead; + } + } + + public static long getLocalBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalLocalBytesRead; + } + } + + public static long getShortCircuitBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalShortCircuitBytesRead; + } + } + + public static long getZeroCopyBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalZeroCopyBytesRead; + } + } + + /** CloseClose stream(s) if necessary. */ + @Override + public void close() { + if (!doCloseStreams) { + return; + } + updateInputStreamStatistics(this.streamNoFsChecksum); + // we do not care about the close exception as it is for reading, no data loss issue. + Closeables.closeQuietly(streamNoFsChecksum); + + + updateInputStreamStatistics(stream); + Closeables.closeQuietly(stream); + } + + public HFileSystem getHfs() { + return this.hfs; + } + + /** + * This will free sockets and file descriptors held by the stream only when the stream implements + * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients + * using this stream to read the blocks have finished reading. If by chance the stream is + * unbuffered and there are clients still holding this stream for read then on next client read + * request a new socket will be opened by Datanode without client knowing about it and will serve + * its read request. Note: If this socket is idle for some time then the DataNode will close the + * socket and the socket will move into CLOSE_WAIT state and on the next client request on this + * stream, the current socket will be closed and a new socket will be opened to serve the + * requests. + */ + @SuppressWarnings({ "rawtypes" }) + public void unbuffer() { + FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); + if (stream != null) { + InputStream wrappedStream = stream.getWrappedStream(); + // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop + // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the + // CanUnbuffer interface or not and based on that call the unbuffer api. + final Class streamClass = wrappedStream.getClass(); + if (this.instanceOfCanUnbuffer == null) { + // To ensure we compute whether the stream is instance of CanUnbuffer only once. + this.instanceOfCanUnbuffer = false; + if (wrappedStream instanceof CanUnbuffer) { + this.unbuffer = (CanUnbuffer) wrappedStream; + this.instanceOfCanUnbuffer = true; + } + } + if (this.instanceOfCanUnbuffer) { + try { + this.unbuffer.unbuffer(); + } catch (UnsupportedOperationException e){ + if (isLogTraceEnabled) { + LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass + + " . So there may be the stream does not support unbuffering.", e); + } + } + } else { + if (isLogTraceEnabled) { + LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); + } + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java new file mode 100644 index 00000000000000..d636f0bf84d53d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalDatabase; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; + +import mockit.Mock; +import mockit.MockUp; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class HudiUtilsTest { + + @Test + public void testGetHudiSchemaWithCleanCommit() throws IOException { + + /* + example table: + CREATE TABLE tbx ( + c1 INT) + USING hudi + TBLPROPERTIES ( + 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS', + 'hoodie.clean.automatic' = 'true', + 'hoodie.cleaner.commits.retained' = '2' + ); + */ + + String commitContent1 = "{\n" + + " \"partitionToWriteStats\" : {\n" + + " \"\" : [ {\n" + + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n" + + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2164-2318_20241219214517936.parquet\",\n" + + " \"cdcStats\" : null,\n" + + " \"prevCommit\" : \"20241219214431757\",\n" + + " \"numWrites\" : 2,\n" + + " \"numDeletes\" : 0,\n" + + " \"numUpdateWrites\" : 0,\n" + + " \"numInserts\" : 1,\n" + + " \"totalWriteBytes\" : 434370,\n" + + " \"totalWriteErrors\" : 0,\n" + + " \"tempPath\" : null,\n" + + " \"partitionPath\" : \"\",\n" + + " \"totalLogRecords\" : 0,\n" + + " \"totalLogFilesCompacted\" : 0,\n" + + " \"totalLogSizeCompacted\" : 0,\n" + + " \"totalUpdatedRecordsCompacted\" : 0,\n" + + " \"totalLogBlocks\" : 0,\n" + + " \"totalCorruptLogBlock\" : 0,\n" + + " \"totalRollbackBlocks\" : 0,\n" + + " \"fileSizeInBytes\" : 434370,\n" + + " \"minEventTime\" : null,\n" + + " \"maxEventTime\" : null,\n" + + " \"runtimeStats\" : {\n" + + " \"totalScanTime\" : 0,\n" + + " \"totalUpsertTime\" : 87,\n" + + " \"totalCreateTime\" : 0\n" + + " }\n" + + " } ]\n" + + " },\n" + + " \"compacted\" : false,\n" + + " \"extraMetadata\" : {\n" + + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n" + + " },\n" + + " \"operationType\" : \"INSERT\"\n" + + "}"; + + String commitContent2 = "{\n" + + " \"partitionToWriteStats\" : {\n" + + " \"\" : [ {\n" + + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n" + + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2180-2334_20241219214518880.parquet\",\n" + + " \"cdcStats\" : null,\n" + + " \"prevCommit\" : \"20241219214517936\",\n" + + " \"numWrites\" : 3,\n" + + " \"numDeletes\" : 0,\n" + + " \"numUpdateWrites\" : 0,\n" + + " \"numInserts\" : 1,\n" + + " \"totalWriteBytes\" : 434397,\n" + + " \"totalWriteErrors\" : 0,\n" + + " \"tempPath\" : null,\n" + + " \"partitionPath\" : \"\",\n" + + " \"totalLogRecords\" : 0,\n" + + " \"totalLogFilesCompacted\" : 0,\n" + + " \"totalLogSizeCompacted\" : 0,\n" + + " \"totalUpdatedRecordsCompacted\" : 0,\n" + + " \"totalLogBlocks\" : 0,\n" + + " \"totalCorruptLogBlock\" : 0,\n" + + " \"totalRollbackBlocks\" : 0,\n" + + " \"fileSizeInBytes\" : 434397,\n" + + " \"minEventTime\" : null,\n" + + " \"maxEventTime\" : null,\n" + + " \"runtimeStats\" : {\n" + + " \"totalScanTime\" : 0,\n" + + " \"totalUpsertTime\" : 86,\n" + + " \"totalCreateTime\" : 0\n" + + " }\n" + + " } ]\n" + + " },\n" + + " \"compacted\" : false,\n" + + " \"extraMetadata\" : {\n" + + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n" + + " },\n" + + " \"operationType\" : \"INSERT\"\n" + + "}"; + + String propContent = "#Updated at 2024-12-19T13:44:32.166Z\n" + + "#Thu Dec 19 21:44:32 CST 2024\n" + + "hoodie.datasource.write.drop.partition.columns=false\n" + + "hoodie.table.type=COPY_ON_WRITE\n" + + "hoodie.archivelog.folder=archived\n" + + "hoodie.timeline.layout.version=1\n" + + "hoodie.table.version=6\n" + + "hoodie.table.metadata.partitions=files\n" + + "hoodie.database.name=mmc_hudi\n" + + "hoodie.datasource.write.partitionpath.urlencode=false\n" + + "hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator\n" + + "hoodie.table.name=tbx\n" + + "hoodie.table.metadata.partitions.inflight=\n" + + "hoodie.datasource.write.hive_style_partitioning=true\n" + + "hoodie.table.checksum=1632286010\n" + + "hoodie.table.create.schema={\"type\"\\:\"record\",\"name\"\\:\"tbx_record\",\"namespace\"\\:\"hoodie.tbx\",\"fields\"\\:[{\"name\"\\:\"c1\",\"type\"\\:[\"int\",\"null\"]}]}"; + + + // 1. prepare table path + Path hudiTable = Files.createTempDirectory("hudiTable"); + File meta = new File(hudiTable + "/.hoodie"); + Assert.assertTrue(meta.mkdirs()); + + new MockUp(HMSExternalTable.class) { + @Mock + public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() { + Table table = new Table(); + StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setLocation("file://" + hudiTable.toAbsolutePath()); + table.setSd(storageDescriptor); + return table; + } + }; + + // 2. generate properties and commit + File prop = new File(meta + "/hoodie.properties"); + Files.write(prop.toPath(), propContent.getBytes()); + File commit1 = new File(meta + "/1.commit"); + Files.write(commit1.toPath(), commitContent1.getBytes()); + + // 3. now, we can get the schema from this table. + HMSExternalCatalog catalog = new HMSExternalCatalog(); + HMSExternalDatabase db = new HMSExternalDatabase(catalog, 1, "db", "db"); + HMSExternalTable hmsExternalTable = new HMSExternalTable(2, "tb", "tb", catalog, db); + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable); + + // 4. delete the commit file, + // this operation is used to imitate the clean operation in hudi + Assert.assertTrue(commit1.delete()); + + // 5. generate a new commit + File commit2 = new File(meta + "/2.commit"); + Files.write(commit2.toPath(), commitContent2.getBytes()); + + // 6. we should get schema correctly + // because we will refresh timeline in this `getHudiTableSchema` method, + // and we can get the latest commit. + // so that this error: `Could not read commit details from file /.hoodie/1.commit` will be not reported. + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable); + + // 7. clean up + Assert.assertTrue(commit2.delete()); + Assert.assertTrue(prop.delete()); + Assert.assertTrue(meta.delete()); + Files.delete(hudiTable); + } +}