From 65ef2e1c1e38311d60a457950cd44b724c752797 Mon Sep 17 00:00:00 2001 From: kakachen Date: Thu, 14 Nov 2024 01:25:27 +0800 Subject: [PATCH] [Opt](multi-catalog)Improve performance by introducing cache of list directory files when getting split for each query. --- .../java/org/apache/doris/common/Config.java | 4 + .../org/apache/doris/common/CacheFactory.java | 6 + .../datasource/hive/HMSExternalTable.java | 4 +- .../datasource/hive/HiveMetaStoreCache.java | 58 +++-- .../datasource/hive/source/HiveScanNode.java | 14 +- .../datasource/hudi/source/HudiScanNode.java | 5 +- .../org/apache/doris/fs/DirectoryLister.java | 31 +++ .../doris/fs/FileSystemDirectoryLister.java | 38 ++++ .../doris/fs/RemoteFileRemoteIterator.java | 48 ++++ .../org/apache/doris/fs/RemoteIterator.java | 29 +++ .../apache/doris/fs/SimpleRemoteIterator.java | 46 ++++ .../TransactionDirectoryListingCacheKey.java | 63 ++++++ ...ransactionScopeCachingDirectoryLister.java | 206 +++++++++++++++++ ...ionScopeCachingDirectoryListerFactory.java | 58 +++++ .../translator/PhysicalPlanTranslator.java | 11 +- .../doris/planner/SingleNodePlanner.java | 12 +- ...actionScopeCachingDirectoryListerTest.java | 214 ++++++++++++++++++ 17 files changed, 813 insertions(+), 34 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 993702c4dac57f8..fd78b2cf2ed4cc5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2134,6 +2134,10 @@ public class Config extends ConfigBase { "Max cache number of external table row count"}) public static long max_external_table_row_count_cache_num = 100000; + @ConfField(description = {"每个查询 list 文件目录数。", + "Max cache number of partition directories at query level."}) + public static long per_query_list_dir_cache_num = 10000; + /** * Max cache loader thread-pool size. * Max thread pool size for loading external meta cache diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java index 50f46647975e235..60d6b79567c7e0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java @@ -19,6 +19,7 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -85,6 +86,11 @@ public LoadingCache buildCache(CacheLoader cacheLoader, return builder.build(cacheLoader); } + public Cache buildCache() { + Caffeine builder = buildWithParams(); + return builder.build(); + } + // Build an async loading cache public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cacheLoader, ExecutorService executor) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index aacd9268ae35cfc..1d652c1e1103503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -952,7 +953,8 @@ private List getFilesForPartitions( LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString()); } } - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName, + new FileSystemDirectoryLister(), null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ea42dfa2f52a01d..fb4c2c8c0597677 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; @@ -39,7 +40,10 @@ import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.fs.FileSystemCache; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.RemoteIterator; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -81,6 +85,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.FileNotFoundException; +import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -195,7 +201,7 @@ protected ExecutorService getExecutor() { @Override public FileCacheValue load(FileCacheKey key) { - return loadFiles(key); + return loadFiles(key, new FileSystemDirectoryLister(), null); } }; @@ -348,7 +354,9 @@ private Map loadPartitions(Iterable partitionValues, - String bindBrokerName) throws UserException { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( @@ -363,34 +371,28 @@ private FileCacheValue getFileCache(String location, String inputFormat, // /user/hive/warehouse/region_tmp_union_all2/2 // So we need to recursively list data location. // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - List remoteFiles = new ArrayList<>(); - boolean isRecursiveDirectories = Boolean.valueOf( - catalog.getProperties().getOrDefault("hive.recursive_directories", "false")); - Status status = fs.listFiles(location, isRecursiveDirectories, remoteFiles); - if (status.ok()) { - for (RemoteFile remoteFile : remoteFiles) { + try { + RemoteIterator iterator = directoryLister.listFilesRecursively(fs, table, location); + while (iterator.hasNext()) { + RemoteFile remoteFile = iterator.next(); String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); result.addFile(remoteFile, locationPath); } - } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { + } catch (FileNotFoundException e) { // User may manually remove partition under HDFS, in this case, // Hive doesn't aware that the removed partition is missing. // Here is to support this case without throw an exception. LOG.warn(String.format("File %s not exist.", location)); - if (!Boolean.valueOf(catalog.getProperties() - .getOrDefault("hive.ignore_absent_partitions", "true"))) { - throw new UserException("Partition location does not exist: " + location); - } - } else { - throw new RuntimeException(status.getErrMsg()); + } catch (IOException e) { + throw new RuntimeException(e); } // Must copy the partitionValues to avoid concurrent modification of key and value result.setPartitionValues(Lists.newArrayList(partitionValues)); return result; } - private FileCacheValue loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -415,7 +417,7 @@ private FileCacheValue loadFiles(FileCacheKey key) { FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf, - key.getPartitionValues(), key.bindBrokerName); + key.getPartitionValues(), key.bindBrokerName, directoryLister, table); // Replace default hive partition with a null_string. for (int i = 0; i < result.getValuesSize(); i++) { if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) { @@ -469,19 +471,25 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { } public List getFilesByPartitionsWithCache(List partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, true, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, true, true, bindBrokerName, directoryLister, table); } public List getFilesByPartitionsWithoutCache(List partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, false, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, false, true, bindBrokerName, directoryLister, table); } public List getFilesByPartitions(List partitions, boolean withCache, boolean concurrent, - String bindBrokerName) { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { long start = System.currentTimeMillis(); List keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( @@ -497,13 +505,15 @@ public List getFilesByPartitions(List partitions, } else { if (concurrent) { List> pList = keys.stream().map( - key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList()); + key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table))) + .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); for (Future p : pList) { fileLists.add(p.get()); } } else { - fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList()); + fileLists = keys.stream().map((key) -> loadFiles(key, directoryLister, table)) + .collect(Collectors.toList()); } } } catch (ExecutionException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index e710bdb935d7bc7..2af13e547fd016d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -41,6 +41,7 @@ import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -84,6 +85,8 @@ public class HiveScanNode extends FileQueryScanNode { @Setter private SelectedPartitions selectedPartitions = null; + private DirectoryLister directoryLister; + private boolean partitionInit = false; private final AtomicReference batchException = new AtomicReference<>(null); private List prunedPartitions; @@ -98,17 +101,21 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + DirectoryLister directoryLister) { super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, + DirectoryLister directoryLister) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } @Override @@ -269,7 +276,8 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List 0; - fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); + fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName, + directoryLister, hmsTable); } if (tableSample != null) { List hiveFileStatuses = selectFiles(fileCaches); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d0..09c8de7d797450b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -121,8 +122,8 @@ public class HudiScanNode extends HiveScanNode { * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - Optional scanParams, Optional incrementalRelation) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); + Optional scanParams, Optional incrementalRelation, DirectoryLister directoryLister) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, directoryLister); isCowOrRoTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowOrRoTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java new file mode 100644 index 000000000000000..c7170d75580545c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import java.io.IOException; + +public interface DirectoryLister { + RemoteIterator listFilesRecursively(FileSystem fs, TableIf table, String location) + throws IOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java new file mode 100644 index 000000000000000..bdc73cc8b11676a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FileSystemDirectoryLister implements DirectoryLister { + public RemoteIterator listFilesRecursively(FileSystem fs, TableIf table, String location) + throws IOException { + List result = new ArrayList<>(); + Status status = fs.listFiles(location, true, result); + if (!status.ok()) { + throw new IOException(status.getErrMsg()); + } + return new RemoteFileRemoteIterator(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java new file mode 100644 index 000000000000000..2cc4b42557aa4e6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +public class RemoteFileRemoteIterator + implements RemoteIterator { + private final List remoteFileList; + private int currentIndex = 0; + + public RemoteFileRemoteIterator(List remoteFileList) { + this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator is null"); + } + + @Override + public boolean hasNext() throws IOException { + return currentIndex < remoteFileList.size(); + } + + @Override + public RemoteFile next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more elements in RemoteFileRemoteIterator"); + } + return remoteFileList.get(currentIndex++); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java new file mode 100644 index 000000000000000..b398cbc1d3fd4f7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java +// and modified by Doris + +package org.apache.doris.fs; + +import java.io.IOException; + +public interface RemoteIterator { + boolean hasNext() throws IOException; + + T next() throws IOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java new file mode 100644 index 000000000000000..b19130dfbb48eef --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java +// and modified by Doris + +class SimpleRemoteIterator implements RemoteIterator { + private final Iterator iterator; + + public SimpleRemoteIterator(Iterator iterator) { + this.iterator = Objects.requireNonNull(iterator, "iterator is null"); + } + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public RemoteFile next() throws IOException { + return iterator.next(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java new file mode 100644 index 000000000000000..3a8e9aeb276192d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +// and modified by Doris + +package org.apache.doris.fs; + +import java.util.Objects; + +public class TransactionDirectoryListingCacheKey { + private final long transactionId; + private final String path; + + public TransactionDirectoryListingCacheKey(long transactionId, String path) { + this.transactionId = transactionId; + this.path = Objects.requireNonNull(path, "path is null"); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; + return transactionId == that.transactionId && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(transactionId, path); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("TransactionDirectoryListingCacheKey{"); + sb.append("transactionId=").append(transactionId); + sb.append(", path='").append(path).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java new file mode 100644 index 000000000000000..50dda64d41c603b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.commons.collections.ListUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import javax.annotation.Nullable; + +/** + * Caches directory content (including listings that were started concurrently). + * {@link TransactionScopeCachingDirectoryLister} assumes that all listings + * are performed by same user within single transaction, therefore any failure can + * be shared between concurrent listings. + */ +public class TransactionScopeCachingDirectoryLister implements DirectoryLister { + private final long transactionId; + + @VisibleForTesting + public Cache getCache() { + return cache; + } + + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache cache; + private final DirectoryLister delegate; + + public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId, + Cache cache) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + this.transactionId = transactionId; + this.cache = Objects.requireNonNull(cache, "cache is null"); + } + + @Override + public RemoteIterator listFilesRecursively(FileSystem fs, TableIf table, String location) { + return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + private RemoteIterator listInternal(FileSystem fs, TableIf table, + TransactionDirectoryListingCacheKey cacheKey) { + FetchingValueHolder cachedValueHolder = cache.get(cacheKey, transactionDirectoryListingCacheKey -> { + try { + return new FetchingValueHolder(createListingRemoteIterator(fs, table, cacheKey)); + } catch (IOException e) { + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), e); + } + }); + if (cachedValueHolder == null) { + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath()); + } + if (cachedValueHolder.isFullyCached()) { + return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles()); + } + + return cachingRemoteIterator(cachedValueHolder, cacheKey); + } + + private RemoteIterator createListingRemoteIterator(FileSystem fs, TableIf table, + TransactionDirectoryListingCacheKey cacheKey) + throws IOException { + return delegate.listFilesRecursively(fs, table, cacheKey.getPath()); + } + + + private RemoteIterator cachingRemoteIterator(FetchingValueHolder cachedValueHolder, + TransactionDirectoryListingCacheKey cacheKey) { + return new RemoteIterator() { + private int fileIndex; + + @Override + public boolean hasNext() + throws IOException { + try { + boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent(); + // Update cache weight of cachedValueHolder for a given path. + // The cachedValueHolder acts as an invalidation guard. + // If a cache invalidation happens while this iterator goes over the files from the specified path, + // the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(cacheKey, cachedValueHolder, cachedValueHolder); + return hasNext; + } catch (Exception exception) { + // invalidate cached value to force retry of directory listing + cache.invalidate(cacheKey); + throw exception; + } + } + + @Override + public RemoteFile next() + throws IOException { + // force cache entry weight update in case next file is cached + Preconditions.checkState(hasNext()); + return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new); + } + }; + } + + @VisibleForTesting + boolean isCached(String location) { + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + @VisibleForTesting + boolean isCached(TransactionDirectoryListingCacheKey cacheKey) { + FetchingValueHolder cached = cache.getIfPresent(cacheKey); + return cached != null && cached.isFullyCached(); + } + + static class FetchingValueHolder { + private final List cachedFiles = ListUtils.synchronizedList(new ArrayList()); + @GuardedBy("this") + @Nullable + private RemoteIterator fileIterator; + @GuardedBy("this") + @Nullable + private Exception exception; + + public FetchingValueHolder(RemoteIterator fileIterator) { + this.fileIterator = Objects.requireNonNull(fileIterator, "fileIterator is null"); + } + + public synchronized boolean isFullyCached() { + return fileIterator == null && exception == null; + } + + public Iterator getCachedFiles() { + Preconditions.checkState(isFullyCached()); + return cachedFiles.iterator(); + } + + public Optional getCachedFile(int index) + throws IOException { + int filesSize = cachedFiles.size(); + Preconditions.checkArgument(index >= 0 && index <= filesSize, + "File index (%s) out of bounds [0, %s]", index, filesSize); + + // avoid fileIterator synchronization (and thus blocking) for already cached files + if (index < filesSize) { + return Optional.of(cachedFiles.get(index)); + } + + return fetchNextCachedFile(index); + } + + private synchronized Optional fetchNextCachedFile(int index) + throws IOException { + if (exception != null) { + throw new IOException("Exception while listing directory", exception); + } + + if (index < cachedFiles.size()) { + // file was fetched concurrently + return Optional.of(cachedFiles.get(index)); + } + + try { + if (fileIterator == null || !fileIterator.hasNext()) { + // no more files + fileIterator = null; + return Optional.empty(); + } + + RemoteFile fileStatus = fileIterator.next(); + cachedFiles.add(fileStatus); + return Optional.of(fileStatus); + } catch (Exception exception) { + fileIterator = null; + this.exception = exception; + throw exception; + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java new file mode 100644 index 000000000000000..454810e266bdf84 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.common.CacheFactory; +import org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder; + +import com.github.benmanes.caffeine.cache.Cache; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionScopeCachingDirectoryListerFactory { + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Optional> cache; + private final AtomicLong nextTransactionId = new AtomicLong(); + + public TransactionScopeCachingDirectoryListerFactory(long maxSize) { + if (maxSize > 0) { + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.empty(), + maxSize, + true, + null); + this.cache = Optional.of(cacheFactory.buildCache()); + } else { + cache = Optional.empty(); + } + } + + public DirectoryLister get(DirectoryLister delegate) { + return cache + .map(cache -> (DirectoryLister) new TransactionScopeCachingDirectoryLister(delegate, + nextTransactionId.getAndIncrement(), cache)) + .orElse(delegate); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 654ccc8ca1155aa..7159e0ac0c8ac51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -68,6 +68,9 @@ import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.DistributionSpecAny; @@ -241,6 +244,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor scanNodes = Lists.newArrayList(); private Map> selectStmtToScanNodes = Maps.newHashMap(); + private final DirectoryLister directoryLister; + public SingleNodePlanner(PlannerContext ctx) { this.ctx = ctx; + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory(Config.per_query_list_dir_cache_num).get( + new FileSystemDirectoryLister()); } public PlannerContext getPlannerContext() { @@ -1969,13 +1977,13 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty(), directoryLister); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, directoryLister); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java new file mode 100644 index 000000000000000..234cc4366335ed7 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java @@ -0,0 +1,214 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import static java.util.Objects.requireNonNull; +import mockit.Mocked; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Execution(SAME_THREAD) +public class TransactionScopeCachingDirectoryListerTest +{ + + + // @Override + // protected TransactionScopeCachingDirectoryLister createDirectoryLister() + // { + // return (TransactionScopeCachingDirectoryLister) new TransactionScopeCachingDirectoryListerFactory(DataSize.of(1, MEGABYTE), Optional.empty()).get(new FileSystemDirectoryLister()); + // } + // + // @Override + // protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, String location) + // { + // return directoryLister.isCached(location); + // } + + @Test + public void testConcurrentDirectoryListing(@Mocked TableIf table) + throws IOException + { + RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1); + RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1); + RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1); + + String path1 = "file:/x"; + String path2 = "file:/y"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister( + ImmutableMap.of( + path1, ImmutableList.of(firstFile, secondFile), + path2, ImmutableList.of(thirdFile))); + + // Set concurrencyLevel to 1 as EvictableCache with higher concurrencyLimit is not deterministic + // due to Token being a key in segmented cache. + TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister); + + // assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2), true), ImmutableList.of(thirdFile)); + assertFiles(cachingLister.listFilesRecursively(null, table, path2), ImmutableList.of(thirdFile)); + + assertEquals(1, countingLister.getListCount()); + + // listing path2 again shouldn't increase listing count + assertTrue(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFilesRecursively(null, table, path2), ImmutableList.of(thirdFile)); + assertEquals(1, countingLister.getListCount()); + // + // // start listing path1 concurrently + RemoteIterator path1FilesA = cachingLister.listFilesRecursively(null, table, path1); + RemoteIterator path1FilesB = cachingLister.listFilesRecursively(null, table, path1); + assertEquals(2, countingLister.getListCount()); + // + // // list path1 files using both iterators concurrently + assertEquals(firstFile, path1FilesA.next()); + assertEquals(firstFile, path1FilesB.next()); + assertEquals(secondFile, path1FilesB.next()); + assertEquals(secondFile, path1FilesA.next()); + assertFalse(path1FilesA.hasNext()); + assertFalse(path1FilesB.hasNext()); + assertEquals(2, countingLister.getListCount()); + // + cachingLister.getCache().cleanUp(); + // // listing path2 again should increase listing count because 2 files were cached for path1 + assertFalse(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFilesRecursively(null, table, path2), ImmutableList.of(thirdFile)); + assertEquals(3, countingLister.getListCount()); + } + + @Test + public void testConcurrentDirectoryListingException(@Mocked TableIf table) + throws IOException + { + RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1); + + String path = "file:/x"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); + DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister); + + // start listing path concurrently + countingLister.setThrowException(true); + RemoteIterator filesA = cachingLister.listFilesRecursively(null, table, path); + RemoteIterator filesB = cachingLister.listFilesRecursively(null, table, path); + assertEquals(1, countingLister.getListCount()); + + // listing should throw an exception + // assertThatThrownBy(filesA::hasNext).isInstanceOf(IOException.class); + assertThrows(IOException.class, () -> filesA.hasNext()); + + + // listing again should succeed + countingLister.setThrowException(false); + assertFiles(cachingLister.listFilesRecursively(null, table, path), ImmutableList.of(file)); + assertEquals(2, countingLister.getListCount()); + + // listing using second concurrently initialized DirectoryLister should fail + // assertThatThrownBy(filesB::hasNext).isInstanceOf(IOException.class); + assertThrows(IOException.class, () -> filesB.hasNext()); + + } + + private void assertFiles(RemoteIterator iterator, List expectedFiles) + throws IOException + { + ImmutableList.Builder actualFiles = ImmutableList.builder(); + while (iterator.hasNext()) { + actualFiles.add(iterator.next()); + } + assertEquals(expectedFiles, actualFiles.build()); + } + + private static class CountingDirectoryLister + implements DirectoryLister + { + private final Map> fileStatuses; + private int listCount; + private boolean throwException; + + public CountingDirectoryLister(Map> fileStatuses) + { + this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null"); + } + + @Override + public RemoteIterator listFilesRecursively(FileSystem fs, TableIf table, String location) + { + // No specific recursive files-only listing implementation + listCount++; + return throwingRemoteIterator(requireNonNull(fileStatuses.get(location)), throwException); + } + + public void setThrowException(boolean throwException) + { + this.throwException = throwException; + } + + public int getListCount() + { + return listCount; + } + + // @Override + // public void invalidate(Partition partition) + // { + // } + // + // @Override + // public void invalidate(Table table) + // { + // } + } + + static RemoteIterator throwingRemoteIterator(List files, boolean throwException) + { + return new RemoteIterator() + { + private final Iterator iterator = ImmutableList.copyOf(files).iterator(); + + @Override + public boolean hasNext() + throws IOException + { + if (throwException) { + throw new IOException(); + } + return iterator.hasNext(); + } + + @Override + public RemoteFile next() + { + return iterator.next(); + } + }; + } +}