From 6be0114be0562d79dc9a8b73ee1809a658975fb9 Mon Sep 17 00:00:00 2001 From: kakachen Date: Thu, 14 Nov 2024 01:25:27 +0800 Subject: [PATCH] [Opt](multi-catalog)Improve performance by introducing cache of list directory files when getting split for each query. --- .../java/org/apache/doris/common/Config.java | 4 + .../org/apache/doris/common/CacheFactory.java | 54 +- .../org/apache/doris/common/EmptyCache.java | 247 ++++++ .../apache/doris/common/EvictableCache.java | 466 ++++++++++++ .../doris/common/EvictableCacheBuilder.java | 286 +++++++ .../doris/datasource/ExternalSchemaCache.java | 2 +- .../datasource/hive/HMSExternalTable.java | 4 +- .../datasource/hive/HiveMetaStoreCache.java | 70 +- .../datasource/hive/source/HiveScanNode.java | 14 +- .../datasource/hudi/source/HudiScanNode.java | 6 +- .../iceberg/IcebergMetadataCache.java | 4 +- .../org/apache/doris/fs/DirectoryLister.java | 29 + .../org/apache/doris/fs/FileSystemCache.java | 2 +- .../doris/fs/FileSystemDirectoryLister.java | 37 + .../doris/fs/FileSystemIOException.java | 65 ++ .../doris/fs/RemoteFileRemoteIterator.java | 47 ++ .../org/apache/doris/fs/RemoteIterator.java | 27 + .../apache/doris/fs/SimpleRemoteIterator.java | 45 ++ .../TransactionDirectoryListingCacheKey.java | 64 ++ ...ransactionScopeCachingDirectoryLister.java | 216 ++++++ ...ionScopeCachingDirectoryListerFactory.java | 59 ++ .../translator/PhysicalPlanTranslator.java | 20 +- .../doris/planner/SingleNodePlanner.java | 16 +- .../doris/common/TestEvictableCache.java | 708 ++++++++++++++++++ ...actionScopeCachingDirectoryListerTest.java | 174 +++++ 25 files changed, 2622 insertions(+), 44 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 66dbff5abed8e64..13a8501f99cc09f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2153,6 +2153,10 @@ public class Config extends ConfigBase { "Max cache number of external table row count"}) public static long max_external_table_row_count_cache_num = 100000; + @ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。", + "Max cache file number of external table split file meta cache at query level."}) + public static long max_external_table_split_file_meta_cache_num = 100000; + /** * Max cache loader thread-pool size. * Max thread pool size for loading external meta cache diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java index 50f46647975e235..4b2b8a2a6cd0b3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java @@ -19,11 +19,13 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Ticker; +import com.github.benmanes.caffeine.cache.Weigher; import org.jetbrains.annotations.NotNull; import java.time.Duration; @@ -44,28 +46,57 @@ * The cache can be created with the above parameters using the buildCache and buildAsyncCache methods. *

*/ -public class CacheFactory { +public class CacheFactory { private OptionalLong expireAfterWriteSec; private OptionalLong refreshAfterWriteSec; - private long maxSize; + private OptionalLong maxSize; private boolean enableStats; // Ticker is used to provide a time source for the cache. // Only used for test, to provide a fake time source. // If not provided, the system time is used. private Ticker ticker; + private OptionalLong maxWeight; + + private Weigher weigher; + public CacheFactory( OptionalLong expireAfterWriteSec, OptionalLong refreshAfterWriteSec, long maxSize, boolean enableStats, Ticker ticker) { + this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.of(maxSize), enableStats, ticker, + OptionalLong.empty(), null); + } + + public CacheFactory( + OptionalLong expireAfterWriteSec, + OptionalLong refreshAfterWriteSec, + boolean enableStats, + Ticker ticker, + long maxWeight, + Weigher weigher) { + this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.empty(), enableStats, ticker, + OptionalLong.of(maxWeight), weigher); + } + + private CacheFactory( + OptionalLong expireAfterWriteSec, + OptionalLong refreshAfterWriteSec, + OptionalLong maxSize, + boolean enableStats, + Ticker ticker, + OptionalLong maxWeight, + Weigher weigher) { this.expireAfterWriteSec = expireAfterWriteSec; this.refreshAfterWriteSec = refreshAfterWriteSec; this.maxSize = maxSize; this.enableStats = enableStats; this.ticker = ticker; + this.maxWeight = maxWeight; + this.weigher = weigher; } // Build a loading cache, without executor, it will use fork-join pool for refresh @@ -85,6 +116,11 @@ public LoadingCache buildCache(CacheLoader cacheLoader, return builder.build(cacheLoader); } + public Cache buildCache() { + Caffeine builder = buildWithParams(); + return builder.build(); + } + // Build an async loading cache public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cacheLoader, ExecutorService executor) { @@ -94,9 +130,11 @@ public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cac } @NotNull - private Caffeine buildWithParams() { + private Caffeine buildWithParams() { Caffeine builder = Caffeine.newBuilder(); - builder.maximumSize(maxSize); + if (maxSize.isPresent()) { + builder.maximumSize(maxSize.getAsLong()); + } if (expireAfterWriteSec.isPresent()) { builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong())); @@ -112,6 +150,14 @@ private Caffeine buildWithParams() { if (ticker != null) { builder.ticker(ticker); } + + if (maxWeight.isPresent()) { + builder.maximumWeight(maxWeight.getAsLong()); + } + + if (weigher != null) { + builder.weigher(weigher); + } return builder; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java new file mode 100644 index 000000000000000..5942eb2b1f3a9aa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheLoader.InvalidCacheLoadException; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +class EmptyCache + extends AbstractLoadingCache { + private final CacheLoader loader; + private final StatsCounter statsCounter; + + EmptyCache(CacheLoader loader, boolean recordStats) { + this.loader = Objects.requireNonNull(loader, "loader is null"); + this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter(); + } + + @Override + public V getIfPresent(Object key) { + statsCounter.recordMisses(1); + return null; + } + + @Override + public V get(K key) + throws ExecutionException { + return get(key, () -> loader.load(key)); + } + + @Override + public ImmutableMap getAll(Iterable keys) + throws ExecutionException { + try { + Set keySet = ImmutableSet.copyOf(keys); + statsCounter.recordMisses(keySet.size()); + @SuppressWarnings("unchecked") // safe since all keys extend K + ImmutableMap result = (ImmutableMap) loader.loadAll(keySet); + for (K key : keySet) { + if (!result.containsKey(key)) { + throw new InvalidCacheLoadException("loadAll failed to return a value for " + key); + } + } + statsCounter.recordLoadSuccess(1); + return result; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public V get(K key, Callable valueLoader) + throws ExecutionException { + statsCounter.recordMisses(1); + try { + V value = valueLoader.call(); + statsCounter.recordLoadSuccess(1); + return value; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public void put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void refresh(K key) {} + + @Override + public void invalidate(Object key) {} + + @Override + public void invalidateAll(Iterable keys) {} + + @Override + public void invalidateAll() { + + } + + @Override + public long size() { + return 0; + } + + @Override + public CacheStats stats() { + return statsCounter.snapshot(); + } + + @Override + public ConcurrentMap asMap() { + return new ConcurrentMap() { + @Override + public V putIfAbsent(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + // putIfAbsent returns the previous value + return null; + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return false; + } + + @Override + public V replace(K key, V value) { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public boolean containsValue(Object value) { + return false; + } + + @Override + @Nullable + public V get(Object key) { + return null; + } + + @Override + @Nullable + public V put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + return null; + } + + @Override + @Nullable + public V remove(Object key) { + return null; + } + + @Override + public void putAll(Map m) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void clear() { + + } + + @Override + public Set keySet() { + return ImmutableSet.of(); + } + + @Override + public Collection values() { + return ImmutableSet.of(); + } + + @Override + public Set> entrySet() { + return ImmutableSet.of(); + } + }; + } + + private static class NoopStatsCounter + implements StatsCounter { + private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot(); + + @Override + public void recordHits(int count) {} + + @Override + public void recordMisses(int count) {} + + @Override + public void recordLoadSuccess(long loadTime) {} + + @Override + public void recordLoadException(long loadTime) {} + + @Override + public void recordEviction() {} + + @Override + public CacheStats snapshot() { + return EMPTY_STATS; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java new file mode 100644 index 000000000000000..a2cb05d82c5556c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Verify; +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalCause; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import javax.annotation.Nullable; + +/** + * A {@link Cache} and {@link LoadingCache} implementation similar to ones + * produced by {@link CacheBuilder#build()}, but one that does not + * exhibit Guava issue #1881: + * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} + * is guaranteed to return fresh state after {@link #invalidate(Object)}, + * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + * + * @see EvictableCacheBuilder + */ +// @ElementTypesAreNonnullByDefault +class EvictableCache + extends AbstractLoadingCache + implements LoadingCache { + // Invariant: for every (K, token) entry in the tokens map, there is a live + // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens' + // entry to be removed. + private final ConcurrentHashMap> tokens = new ConcurrentHashMap<>(); + // The dataCache can have entries with no corresponding tokens in the tokens map. + // For example, this can happen when invalidation concurs with load. + // The dataCache must be bounded. + private final LoadingCache, V> dataCache; + + private final AtomicInteger invalidations = new AtomicInteger(); + + EvictableCache(CacheBuilder, ? super V> cacheBuilder, CacheLoader cacheLoader) { + dataCache = buildUnsafeCache( + cacheBuilder + ., V>removalListener(removal -> { + Token token = removal.getKey(); + Verify.verify(token != null, "token is null"); + if (removal.getCause() != RemovalCause.REPLACED) { + tokens.remove(token.getKey(), token); + } + }), + new TokenCacheLoader<>(cacheLoader)); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static LoadingCache buildUnsafeCache(CacheBuilder cacheBuilder, + CacheLoader cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + @Override + public V getIfPresent(Object key) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.get(key); + if (token == null) { + return null; + } + return dataCache.getIfPresent(token); + } + + @Override + public V get(K key, Callable valueLoader) + throws ExecutionException { + Token newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token, valueLoader); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public V get(K key) + throws ExecutionException { + Token newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public ImmutableMap getAll(Iterable keys) + throws ExecutionException { + List> newTokens = new ArrayList<>(); + List> temporaryTokens = new ArrayList<>(); + try { + Map result = new LinkedHashMap<>(); + for (K key : keys) { + if (result.containsKey(key)) { + continue; + } + // This is not bulk, but is fast local operation + Token newToken = new Token<>(key); + Token oldToken = tokens.putIfAbsent(key, newToken); + if (oldToken != null) { + // Token exists but a data may not exist (e.g. due to concurrent eviction) + V value = dataCache.getIfPresent(oldToken); + if (value != null) { + result.put(key, value); + continue; + } + // Old token exists but value wasn't found. This can happen when there is concurrent + // eviction/invalidation or when the value is still being loaded. + // The new token is not registered in tokens, so won't be used by subsequent invocations. + temporaryTokens.add(newToken); + } + newTokens.add(newToken); + } + + Map, V> values = dataCache.getAll(newTokens); + for (Map.Entry, V> entry : values.entrySet()) { + Token newToken = entry.getKey(); + result.put(newToken.getKey(), entry.getValue()); + } + return ImmutableMap.copyOf(result); + } catch (Throwable e) { + for (Token token : newTokens) { + // Failed to load and it was our new token (potentially) persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(token.getKey(), token); + } + throw e; + } finally { + dataCache.invalidateAll(temporaryTokens); + } + } + + @Override + public void refresh(K key) { + // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token. + // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created. + // In such case we would leak the newly created token. + throw new UnsupportedOperationException(); + } + + @Override + public long size() { + return dataCache.size(); + } + + @Override + public void cleanUp() { + dataCache.cleanUp(); + } + + @VisibleForTesting + int tokensCount() { + return tokens.size(); + } + + @Override + public void invalidate(Object key) { + invalidations.incrementAndGet(); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.remove(key); + if (token != null) { + dataCache.invalidate(token); + } + } + + @Override + public void invalidateAll() { + invalidations.incrementAndGet(); + dataCache.invalidateAll(); + tokens.clear(); + } + + // Not thread safe, test only. + @VisibleForTesting + void clearDataCacheOnly() { + Map> tokensCopy = new HashMap<>(tokens); + dataCache.asMap().clear(); + Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction"); + tokens.putAll(tokensCopy); + } + + @Override + public CacheStats stats() { + return dataCache.stats(); + } + + @Override + public ConcurrentMap asMap() { + return new ConcurrentMap() { + private final ConcurrentMap, V> dataCacheMap = dataCache.asMap(); + + @Override + public V putIfAbsent(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported," + + " as in inherently races with cache invalidation"); + } + + @Override + public V compute(K key, BiFunction remappingFunction) { + // default implementation of ConcurrentMap#compute uses not supported putIfAbsent in some cases + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation"); + } + + @Override + public boolean remove(Object key, Object value) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.get(key); + if (token != null) { + return dataCacheMap.remove(token, value); + } + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + Token token = tokens.get(key); + if (token != null) { + return dataCacheMap.replace(token, oldValue, newValue); + } + return false; + } + + @Override + public V replace(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races" + + " with cache invalidation"); + } + + @Override + public int size() { + return dataCache.asMap().size(); + } + + @Override + public boolean isEmpty() { + return dataCache.asMap().isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return tokens.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return values().contains(value); + } + + @Override + @Nullable + public V get(Object key) { + return getIfPresent(key); + } + + @Override + public V put(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + @Nullable + public V remove(Object key) { + Token token = tokens.remove(key); + if (token != null) { + return dataCacheMap.remove(token); + } + return null; + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public void clear() { + dataCacheMap.clear(); + tokens.clear(); + } + + @Override + public Set keySet() { + return tokens.keySet(); + } + + @Override + public Collection values() { + return dataCacheMap.values(); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException(); + } + }; + } + + // instance-based equality + static final class Token { + private final K key; + + Token(K key) { + this.key = Objects.requireNonNull(key, "key is null"); + } + + K getKey() { + return key; + } + + @Override + public String toString() { + return String.format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key); + } + } + + private static class TokenCacheLoader + extends CacheLoader, V> { + private final CacheLoader delegate; + + public TokenCacheLoader(CacheLoader delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public V load(Token token) + throws Exception { + return delegate.load(token.getKey()); + } + + @Override + public ListenableFuture reload(Token token, V oldValue) + throws Exception { + return delegate.reload(token.getKey(), oldValue); + } + + @Override + public Map, V> loadAll(Iterable> tokens) + throws Exception { + List> tokenList = ImmutableList.copyOf(tokens); + List keys = new ArrayList<>(); + for (Token token : tokenList) { + keys.add(token.getKey()); + } + Map values; + try { + values = delegate.loadAll(keys); + } catch (UnsupportedLoadingOperationException e) { + // Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll + // to fall back from bulk loading (without load sharing) to loading individual + // values (with load sharing). EvictableCache implementation does not currently + // support the fallback mechanism, so the individual values would be loaded + // without load sharing. This would be an unintentional and non-obvious behavioral + // discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled. + throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache" + + " when CacheLoader.loadAll is not implemented", e); + } + + ImmutableMap.Builder, V> result = ImmutableMap.builder(); + for (int i = 0; i < tokenList.size(); i++) { + Token token = tokenList.get(i); + K key = keys.get(i); + V value = values.get(key); + // CacheLoader.loadAll is not guaranteed to return values for all the keys + if (value != null) { + result.put(token, value); + } + } + return result.buildOrThrow(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .addValue(delegate) + .toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java new file mode 100644 index 000000000000000..8da3f8c8d5602eb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCache.Token; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.google.errorprone.annotations.CheckReturnValue; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder}, + * but creating cache implementations that do not exhibit + * Guava issue #1881: + * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)} + * is guaranteed to return fresh state after {@link Cache#invalidate(Object)}, + * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called. + */ +public final class EvictableCacheBuilder { + @CheckReturnValue + public static EvictableCacheBuilder newBuilder() { + return new EvictableCacheBuilder<>(); + } + + private Optional ticker = Optional.empty(); + private Optional expireAfterWrite = Optional.empty(); + private Optional refreshAfterWrite = Optional.empty(); + private Optional maximumSize = Optional.empty(); + private Optional maximumWeight = Optional.empty(); + private Optional concurrencyLevel = Optional.empty(); + private Optional, ? super V>> weigher = Optional.empty(); + private boolean recordStats; + private Optional disabledCacheImplementation = Optional.empty(); + + private EvictableCacheBuilder() {} + + /** + * Pass-through for {@link CacheBuilder#ticker(Ticker)}. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder ticker(Ticker ticker) { + this.ticker = Optional.of(ticker); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder expireAfterWrite(long duration, TimeUnit unit) { + return expireAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder expireAfterWrite(Duration duration) { + Preconditions.checkState(!this.expireAfterWrite.isPresent(), "expireAfterWrite already set"); + this.expireAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder refreshAfterWrite(long duration, TimeUnit unit) { + return refreshAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder refreshAfterWrite(Duration duration) { + Preconditions.checkState(!this.refreshAfterWrite.isPresent(), "refreshAfterWrite already set"); + this.refreshAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder maximumSize(long maximumSize) { + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + this.maximumSize = Optional.of(maximumSize); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder maximumWeight(long maximumWeight) { + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + this.maximumWeight = Optional.of(maximumWeight); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder concurrencyLevel(int concurrencyLevel) { + Preconditions.checkState(!this.concurrencyLevel.isPresent(), "concurrencyLevel already set"); + this.concurrencyLevel = Optional.of(concurrencyLevel); + return this; + } + + public EvictableCacheBuilder weigher(Weigher weigher) { + Preconditions.checkState(!this.weigher.isPresent(), "weigher already set"); + @SuppressWarnings("unchecked") // see com.google.common.cache.CacheBuilder.weigher + EvictableCacheBuilder cast = (EvictableCacheBuilder) this; + cast.weigher = Optional.of(new TokenWeigher<>(weigher)); + return cast; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder recordStats() { + recordStats = true; + return this; + } + + /** + * Choose a behavior for case when caching is disabled that may allow data and failure + * sharing between concurrent callers. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder shareResultsAndFailuresEvenIfDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.GUAVA); + } + + /** + * Choose a behavior for case when caching is disabled that prevents data and + * failure sharing between concurrent callers. + * Note: disabled cache won't report any statistics. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder shareNothingWhenDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.NOOP); + } + + @VisibleForTesting + EvictableCacheBuilder disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) { + Preconditions.checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set"); + disabledCacheImplementation = Optional.of(cacheImplementation); + return this; + } + + @CheckReturnValue + public Cache build() { + return build(unimplementedCacheLoader()); + } + + @CheckReturnValue + public LoadingCache build(CacheLoader loader) { + if (cacheDisabled()) { + // Silently providing a behavior different from Guava's could be surprising, so require explicit choice. + DisabledCacheImplementation disabledCacheImplementation = this.disabledCacheImplementation.orElseThrow( + () -> new IllegalStateException( + "Even when cache is disabled, the loads are synchronized and both load results and failures" + + " are shared between threads. " + "This is rarely desired, thus builder caller is" + + " expected to either opt-in into this behavior with" + + " shareResultsAndFailuresEvenIfDisabled(), or choose not to share results (and failures)" + + " between concurrent invocations with shareNothingWhenDisabled().")); + + switch (disabledCacheImplementation) { + case NOOP: + return new EmptyCache<>(loader, recordStats); + case GUAVA: { + // Disabled cache is always empty, so doesn't exhibit invalidation problems. + // Avoid overhead of EvictableCache wrapper. + CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.SECONDS); + if (recordStats) { + cacheBuilder.recordStats(); + } + return buildUnsafeCache(cacheBuilder, loader); + } + default: + throw new IllegalStateException("Unexpected value: " + disabledCacheImplementation); + } + } + + if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) { + // EvictableCache invalidation (e.g. invalidateAll) happening concurrently with a load may + // lead to an entry remaining in the cache, without associated token. This would lead to + // a memory leak in an unbounded cache. + throw new IllegalStateException("Unbounded cache is not supported"); + } + + // CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls. + CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); + ticker.ifPresent(cacheBuilder::ticker); + expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite); + refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite); + maximumSize.ifPresent(cacheBuilder::maximumSize); + maximumWeight.ifPresent(cacheBuilder::maximumWeight); + weigher.ifPresent(cacheBuilder::weigher); + concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel); + if (recordStats) { + cacheBuilder.recordStats(); + } + return new EvictableCache<>(cacheBuilder, loader); + } + + private boolean cacheDisabled() { + return (maximumSize.isPresent() && maximumSize.get() == 0) + || (expireAfterWrite.isPresent() && expireAfterWrite.get().isZero()); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static LoadingCache buildUnsafeCache(CacheBuilder cacheBuilder, + CacheLoader cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + private static CacheLoader unimplementedCacheLoader() { + return CacheLoader.from(ignored -> { + throw new UnsupportedOperationException(); + }); + } + + private static final class TokenWeigher + implements Weigher, V> { + private final Weigher delegate; + + private TokenWeigher(Weigher delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public int weigh(Token key, V value) { + return delegate.weigh(key.getKey(), value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TokenWeigher that = (TokenWeigher) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } + + @Override + public String toString() { + return "TokenWeigher{" + "delegate=" + delegate + '}'; + } + } + + private static Duration toDuration(long duration, TimeUnit unit) { + // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated + return Duration.ofNanos(unit.toNanos(duration)); + } + + @VisibleForTesting + enum DisabledCacheImplementation { + NOOP, + GUAVA, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e814002..9fa502be42b962c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -51,7 +51,7 @@ public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { } private void init(ExecutorService executor) { - CacheFactory schemaCacheFactory = new CacheFactory( + CacheFactory> schemaCacheFactory = new CacheFactory( OptionalLong.of(86400L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), Config.max_external_schema_cache_num, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 6d65f8bcdbccb7e..8c8389b6653c558 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -969,7 +970,8 @@ private List getFilesForPartitions( LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString()); } } - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName, + new FileSystemDirectoryLister(), null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ea42dfa2f52a01d..509d25d760e1acf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; @@ -39,7 +40,11 @@ import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.fs.FileSystemCache; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.FileSystemIOException; +import org.apache.doris.fs.RemoteIterator; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -138,7 +143,7 @@ public HiveMetaStoreCache(HMSExternalCatalog catalog, * which will bring out thread deadlock. **/ private void init() { - CacheFactory partitionValuesCacheFactory = new CacheFactory( + CacheFactory partitionValuesCacheFactory = new CacheFactory( OptionalLong.of(28800L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), Config.max_hive_partition_table_cache_num, @@ -195,7 +200,7 @@ protected ExecutorService getExecutor() { @Override public FileCacheValue load(FileCacheKey key) { - return loadFiles(key); + return loadFiles(key, new FileSystemDirectoryLister(), null); } }; @@ -348,7 +353,9 @@ private Map loadPartitions(Iterable partitionValues, - String bindBrokerName) throws UserException { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( @@ -363,34 +370,37 @@ private FileCacheValue getFileCache(String location, String inputFormat, // /user/hive/warehouse/region_tmp_union_all2/2 // So we need to recursively list data location. // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - List remoteFiles = new ArrayList<>(); boolean isRecursiveDirectories = Boolean.valueOf( catalog.getProperties().getOrDefault("hive.recursive_directories", "false")); - Status status = fs.listFiles(location, isRecursiveDirectories, remoteFiles); - if (status.ok()) { - for (RemoteFile remoteFile : remoteFiles) { + try { + RemoteIterator iterator = directoryLister.listFiles(fs, isRecursiveDirectories, + table, location); + while (iterator.hasNext()) { + RemoteFile remoteFile = iterator.next(); String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); result.addFile(remoteFile, locationPath); } - } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { - // User may manually remove partition under HDFS, in this case, - // Hive doesn't aware that the removed partition is missing. - // Here is to support this case without throw an exception. - LOG.warn(String.format("File %s not exist.", location)); - if (!Boolean.valueOf(catalog.getProperties() - .getOrDefault("hive.ignore_absent_partitions", "true"))) { - throw new UserException("Partition location does not exist: " + location); + } catch (FileSystemIOException e) { + if (e.getErrorCode().isPresent() && e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) { + // User may manually remove partition under HDFS, in this case, + // Hive doesn't aware that the removed partition is missing. + // Here is to support this case without throw an exception. + LOG.warn(String.format("File %s not exist.", location)); + if (!Boolean.valueOf(catalog.getProperties() + .getOrDefault("hive.ignore_absent_partitions", "true"))) { + throw new UserException("Partition location does not exist: " + location); + } + } else { + throw new RuntimeException(e); } - } else { - throw new RuntimeException(status.getErrMsg()); } // Must copy the partitionValues to avoid concurrent modification of key and value result.setPartitionValues(Lists.newArrayList(partitionValues)); return result; } - private FileCacheValue loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -415,7 +425,7 @@ private FileCacheValue loadFiles(FileCacheKey key) { FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf, - key.getPartitionValues(), key.bindBrokerName); + key.getPartitionValues(), key.bindBrokerName, directoryLister, table); // Replace default hive partition with a null_string. for (int i = 0; i < result.getValuesSize(); i++) { if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) { @@ -469,19 +479,25 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { } public List getFilesByPartitionsWithCache(List partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, true, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, true, true, bindBrokerName, directoryLister, table); } public List getFilesByPartitionsWithoutCache(List partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, false, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, false, true, bindBrokerName, directoryLister, table); } public List getFilesByPartitions(List partitions, boolean withCache, boolean concurrent, - String bindBrokerName) { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { long start = System.currentTimeMillis(); List keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( @@ -497,13 +513,15 @@ public List getFilesByPartitions(List partitions, } else { if (concurrent) { List> pList = keys.stream().map( - key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList()); + key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table))) + .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); for (Future p : pList) { fileLists.add(p.get()); } } else { - fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList()); + fileLists = keys.stream().map((key) -> loadFiles(key, directoryLister, table)) + .collect(Collectors.toList()); } } } catch (ExecutionException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index e710bdb935d7bc7..2af13e547fd016d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -41,6 +41,7 @@ import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -84,6 +85,8 @@ public class HiveScanNode extends FileQueryScanNode { @Setter private SelectedPartitions selectedPartitions = null; + private DirectoryLister directoryLister; + private boolean partitionInit = false; private final AtomicReference batchException = new AtomicReference<>(null); private List prunedPartitions; @@ -98,17 +101,21 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + DirectoryLister directoryLister) { super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, + DirectoryLister directoryLister) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } @Override @@ -269,7 +276,8 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List 0; - fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); + fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName, + directoryLister, hmsTable); } if (tableSample != null) { List hiveFileStatuses = selectFiles(fileCaches); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d0..3f7870ff2e3c0b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -121,8 +122,9 @@ public class HudiScanNode extends HiveScanNode { * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - Optional scanParams, Optional incrementalRelation) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); + Optional scanParams, Optional incrementalRelation, + DirectoryLister directoryLister) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, directoryLister); isCowOrRoTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowOrRoTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index ad347ca78f2a4f3..74383ca7a2da1b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -51,7 +51,7 @@ public class IcebergMetadataCache { private final LoadingCache tableCache; public IcebergMetadataCache(ExecutorService executor) { - CacheFactory snapshotListCacheFactory = new CacheFactory( + CacheFactory> snapshotListCacheFactory = new CacheFactory( OptionalLong.of(28800L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), Config.max_external_table_cache_num, @@ -59,7 +59,7 @@ public IcebergMetadataCache(ExecutorService executor) { null); this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), null, executor); - CacheFactory tableCacheFactory = new CacheFactory( + CacheFactory tableCacheFactory = new CacheFactory( OptionalLong.of(28800L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), Config.max_external_table_cache_num, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java new file mode 100644 index 000000000000000..e97bc2c684fb0f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +public interface DirectoryLister { + RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index e96258dc719fbd2..80a76dc4eda11f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -36,7 +36,7 @@ public class FileSystemCache { public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed - CacheFactory fsCacheFactory = new CacheFactory( + CacheFactory fsCacheFactory = new CacheFactory<>( OptionalLong.of(86400L), OptionalLong.empty(), Config.max_remote_file_system_cache_num, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java new file mode 100644 index 000000000000000..dcd7eeb16b36d2c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.ArrayList; +import java.util.List; + +public class FileSystemDirectoryLister implements DirectoryLister { + public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + List result = new ArrayList<>(); + Status status = fs.listFiles(location, recursive, result); + if (!status.ok()) { + throw new FileSystemIOException(status.getErrCode(), status.getErrMsg()); + } + return new RemoteFileRemoteIterator(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java new file mode 100644 index 000000000000000..c9e45d0352b2823 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status.ErrCode; + +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nullable; + +public class FileSystemIOException extends IOException { + + @Nullable + private ErrCode errCode; + + public FileSystemIOException(ErrCode errCode, String message) { + super(message); + this.errCode = errCode; + } + + public FileSystemIOException(ErrCode errCode, String message, Throwable cause) { + super(message, cause); + this.errCode = errCode; + } + + public FileSystemIOException(String message) { + super(message); + this.errCode = null; + } + + public FileSystemIOException(String message, Throwable cause) { + super(message, cause); + this.errCode = null; + } + + public Optional getErrorCode() { + return Optional.ofNullable(errCode); + } + + @Override + public String getMessage() { + if (errCode != null) { + return String.format("[%s]: %s", + errCode, + super.getMessage()); + } else { + return super.getMessage(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java new file mode 100644 index 000000000000000..6ac8eb3b0c6e1d4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +public class RemoteFileRemoteIterator + implements RemoteIterator { + private final List remoteFileList; + private int currentIndex = 0; + + public RemoteFileRemoteIterator(List remoteFileList) { + this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return currentIndex < remoteFileList.size(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + if (!hasNext()) { + throw new NoSuchElementException("No more elements in RemoteFileRemoteIterator"); + } + return remoteFileList.get(currentIndex++); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java new file mode 100644 index 000000000000000..b9d212a15a5f3b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java +// and modified by Doris + +package org.apache.doris.fs; + +public interface RemoteIterator { + boolean hasNext() throws FileSystemIOException; + + T next() throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java new file mode 100644 index 000000000000000..4332a5fed35b6c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.Iterator; +import java.util.Objects; +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java +// and modified by Doris + +class SimpleRemoteIterator implements RemoteIterator { + private final Iterator iterator; + + public SimpleRemoteIterator(Iterator iterator) { + this.iterator = Objects.requireNonNull(iterator, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return iterator.hasNext(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + return iterator.next(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java new file mode 100644 index 000000000000000..6be3c03f824d048 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +// and modified by Doris + +package org.apache.doris.fs; + +import java.util.Objects; + +public class TransactionDirectoryListingCacheKey { + + private final long transactionId; + private final String path; + + public TransactionDirectoryListingCacheKey(long transactionId, String path) { + this.transactionId = transactionId; + this.path = Objects.requireNonNull(path, "path is null"); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; + return transactionId == that.transactionId && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(transactionId, path); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("TransactionDirectoryListingCacheKey{"); + sb.append("transactionId=").append(transactionId); + sb.append(", path='").append(path).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java new file mode 100644 index 000000000000000..37acec6864f8f51 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.commons.collections.ListUtils; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +/** + * Caches directory content (including listings that were started concurrently). + * {@link TransactionScopeCachingDirectoryLister} assumes that all listings + * are performed by same user within single transaction, therefore any failure can + * be shared between concurrent listings. + */ +public class TransactionScopeCachingDirectoryLister implements DirectoryLister { + private final long transactionId; + + @VisibleForTesting + public Cache getCache() { + return cache; + } + + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache cache; + private final DirectoryLister delegate; + + public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId, + Cache cache) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + this.transactionId = transactionId; + this.cache = Objects.requireNonNull(cache, "cache is null"); + } + + @Override + public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + return listInternal(fs, recursive, table, new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + private RemoteIterator listInternal(FileSystem fs, boolean recursive, TableIf table, + TransactionDirectoryListingCacheKey cacheKey) throws FileSystemIOException { + FetchingValueHolder cachedValueHolder; + try { + cachedValueHolder = cache.get(cacheKey, + () -> new FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, cacheKey))); + } catch (ExecutionException | UncheckedExecutionException e) { + Throwable throwable = e.getCause(); + Throwables.throwIfInstanceOf(throwable, FileSystemIOException.class); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable); + } + + if (cachedValueHolder.isFullyCached()) { + return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles()); + } + + return cachingRemoteIterator(cachedValueHolder, cacheKey); + } + + private RemoteIterator createListingRemoteIterator(FileSystem fs, boolean recursive, + TableIf table, TransactionDirectoryListingCacheKey cacheKey) + throws FileSystemIOException { + return delegate.listFiles(fs, recursive, table, cacheKey.getPath()); + } + + + private RemoteIterator cachingRemoteIterator(FetchingValueHolder cachedValueHolder, + TransactionDirectoryListingCacheKey cacheKey) { + return new RemoteIterator() { + private int fileIndex; + + @Override + public boolean hasNext() + throws FileSystemIOException { + try { + boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent(); + // Update cache weight of cachedValueHolder for a given path. + // The cachedValueHolder acts as an invalidation guard. + // If a cache invalidation happens while this iterator goes over the files from the specified path, + // the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(cacheKey, cachedValueHolder, cachedValueHolder); + return hasNext; + } catch (Exception exception) { + // invalidate cached value to force retry of directory listing + cache.invalidate(cacheKey); + throw exception; + } + } + + @Override + public RemoteFile next() + throws FileSystemIOException { + // force cache entry weight update in case next file is cached + Preconditions.checkState(hasNext()); + return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new); + } + }; + } + + @VisibleForTesting + boolean isCached(String location) { + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + @VisibleForTesting + boolean isCached(TransactionDirectoryListingCacheKey cacheKey) { + FetchingValueHolder cached = cache.getIfPresent(cacheKey); + return cached != null && cached.isFullyCached(); + } + + static class FetchingValueHolder { + + private final List cachedFiles = ListUtils.synchronizedList(new ArrayList()); + + @GuardedBy("this") + @Nullable + private RemoteIterator fileIterator; + @GuardedBy("this") + @Nullable + private Exception exception; + + public FetchingValueHolder(RemoteIterator fileIterator) { + this.fileIterator = Objects.requireNonNull(fileIterator, "fileIterator is null"); + } + + public synchronized boolean isFullyCached() { + return fileIterator == null && exception == null; + } + + public long getCacheFileCount() { + return cachedFiles.size(); + } + + public Iterator getCachedFiles() { + Preconditions.checkState(isFullyCached()); + return cachedFiles.iterator(); + } + + public Optional getCachedFile(int index) + throws FileSystemIOException { + int filesSize = cachedFiles.size(); + Preconditions.checkArgument(index >= 0 && index <= filesSize, + "File index (%s) out of bounds [0, %s]", index, filesSize); + + // avoid fileIterator synchronization (and thus blocking) for already cached files + if (index < filesSize) { + return Optional.of(cachedFiles.get(index)); + } + + return fetchNextCachedFile(index); + } + + private synchronized Optional fetchNextCachedFile(int index) + throws FileSystemIOException { + if (exception != null) { + throw new FileSystemIOException("Exception while listing directory", exception); + } + + if (index < cachedFiles.size()) { + // file was fetched concurrently + return Optional.of(cachedFiles.get(index)); + } + + try { + if (fileIterator == null || !fileIterator.hasNext()) { + // no more files + fileIterator = null; + return Optional.empty(); + } + + RemoteFile fileStatus = fileIterator.next(); + cachedFiles.add(fileStatus); + return Optional.of(fileStatus); + } catch (Exception exception) { + fileIterator = null; + this.exception = exception; + throw exception; + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java new file mode 100644 index 000000000000000..c3c9c347c3d2b65 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.common.EvictableCacheBuilder; +import org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder; + +import com.google.common.cache.Cache; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionScopeCachingDirectoryListerFactory { + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + // private final Optional> cache; + + private final Optional> cache; + + private final AtomicLong nextTransactionId = new AtomicLong(); + + public TransactionScopeCachingDirectoryListerFactory(long maxSize) { + if (maxSize > 0) { + EvictableCacheBuilder cacheBuilder = + EvictableCacheBuilder.newBuilder() + .maximumWeight(maxSize) + .weigher((key, value) -> + Math.toIntExact(value.getCacheFileCount())); + this.cache = Optional.of(cacheBuilder.build()); + } else { + cache = Optional.empty(); + } + } + + public DirectoryLister get(DirectoryLister delegate) { + return cache + .map(cache -> (DirectoryLister) new TransactionScopeCachingDirectoryLister(delegate, + nextTransactionId.getAndIncrement(), cache)) + .orElse(delegate); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 56ae65ec7229413..46b188163822c14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -46,6 +46,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.EsExternalTable; @@ -68,6 +69,9 @@ import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.DistributionSpecAny; @@ -241,6 +245,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor slots = fileScan.getOutput(); ExternalTable table = fileScan.getTable(); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); @@ -648,7 +664,7 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla + " for Hudi table"); PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); + hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), directoryLister); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index d94ad0a2552240f..f69c8ac79641448 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -58,6 +58,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; @@ -74,6 +75,9 @@ import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.statistics.StatisticalType; @@ -115,6 +119,8 @@ public class SingleNodePlanner { private final ArrayList scanNodes = Lists.newArrayList(); private Map> selectStmtToScanNodes = Maps.newHashMap(); + private DirectoryLister directoryLister; + public SingleNodePlanner(PlannerContext ctx) { this.ctx = ctx; } @@ -1959,6 +1965,11 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); break; case HMS_EXTERNAL_TABLE: + // TransactionScopeCachingDirectoryLister is only used in hms external tables. + if (directoryLister != null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } TableIf table = tblRef.getDesc().getTable(); switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -1968,14 +1979,15 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s throw new UserException("Hudi incremental read is not supported, " + "please set enable_nereids_planner = true to enable new optimizer"); } + scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty(), directoryLister); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, directoryLister); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java new file mode 100644 index 000000000000000..3bfdc73b78e358f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java @@ -0,0 +1,708 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation; + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class TestEvictableCache { + private static class TestingTicker extends Ticker { + private volatile long time; + + public TestingTicker() { + } + + public long read() { + return this.time; + } + + public synchronized void increment(long delta, TimeUnit unit) { + Preconditions.checkArgument(delta >= 0L, "delta is negative"); + this.time += unit.toNanos(delta); + } + } + + private static final int TEST_TIMEOUT_SECONDS = 10; + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoad() + throws Exception { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Assert.assertEquals("abc", cache.get(42, () -> "abc")); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictBySize() + throws Exception { + int maximumSize = 10; + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(maximumSize) + .build(); + + for (int i = 0; i < 10_000; i++) { + int value = i * 10; + Assert.assertEquals(value, (Object) cache.get(i, () -> value)); + } + cache.cleanUp(); + Assert.assertEquals(maximumSize, cache.size()); + Assert.assertEquals(maximumSize, ((EvictableCache) cache).tokensCount()); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 10_000 - 1; + Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByWeight() throws Exception { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(20) + .weigher((Integer key, String value) -> value.length()) + .build(); + + for (int i = 0; i < 10; i++) { + String value = String.join("", Collections.nCopies(i, "a")); + Assert.assertEquals(value, cache.get(i, () -> value)); + } + + cache.cleanUp(); + // It's not deterministic which entries get evicted + int cacheSize = Math.toIntExact(cache.size()); + + Assert.assertEquals(cacheSize, ((EvictableCache) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + + int keySum = cache.asMap().keySet().stream() + .mapToInt(i -> i) + .sum(); + Assert.assertTrue("key sum should be <= 20", keySum <= 20); + + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + + int valuesLengthSum = cache.asMap().values().stream() + .mapToInt(String::length) + .sum(); + Assert.assertTrue("values length sum should be <= 20", valuesLengthSum <= 20); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 9; // 10 - 1 + String expected = String.join("", Collections.nCopies(lastKey, "a")); // java8 替代 repeat + Assert.assertEquals(expected, cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByTime() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + + Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma kota")); + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma kota")); + cache.cleanUp(); + + // First entry should be expired and its token removed + int cacheSize = Math.toIntExact(cache.size()); + Assert.assertEquals(1, cacheSize); + Assert.assertEquals(cacheSize, ((EvictableCache) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testPreserveValueLoadedAfterTimeExpiration() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + int key = 11; + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma kota")); + Assert.assertEquals(1, ((EvictableCache) cache).tokensCount()); + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something else")); + Assert.assertEquals(1, ((EvictableCache) cache).tokensCount()); + + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("new value", cache.get(key, () -> "new value")); + Assert.assertEquals(1, ((EvictableCache) cache).tokensCount()); + + Assert.assertEquals("new value", cache.get(key, () -> "something yet different")); + Assert.assertEquals(1, ((EvictableCache) cache).tokensCount()); + + Assert.assertEquals(1, cache.size()); + Assert.assertEquals(1, ((EvictableCache) cache).tokensCount()); + Assert.assertEquals(1, cache.asMap().keySet().size()); + Assert.assertEquals(1, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testReplace() throws Exception { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + + int key = 10; + int initialValue = 20; + int replacedValue = 21; + + cache.get(key, () -> initialValue); + + Assert.assertTrue("Should successfully replace value", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should contain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace when current value is different", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should maintain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace non-existent key", cache.asMap().replace(100000, replacedValue, 22)); + Assert.assertEquals("Cache should only contain original key", ImmutableSet.of(key), cache.asMap().keySet()); + Assert.assertEquals("Original key should maintain its value", replacedValue, cache.getIfPresent(key).intValue()); + + int anotherKey = 13; + int anotherInitialValue = 14; + cache.get(anotherKey, () -> anotherInitialValue); + cache.invalidate(anotherKey); + + Assert.assertFalse("Should not replace after invalidation", cache.asMap().replace(anotherKey, anotherInitialValue, 15)); + Assert.assertEquals("Cache should only contain original key after invalidation", ImmutableSet.of(key), cache.asMap().keySet()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testDisabledCache() throws Exception { + Exception exception = Assert.assertThrows(IllegalStateException.class, () -> + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .build()); + + Assert.assertEquals("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled().", + exception.getMessage()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareNothingWhenDisabled() + .build()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareResultsAndFailuresEvenIfDisabled() + .build()); + } + + private void testDisabledCache(Cache cache) throws Exception { + for (int i = 0; i < 10; i++) { + int value = i * 10; + Assert.assertEquals(value, cache.get(i, () -> value).intValue()); + } + + cache.cleanUp(); + Assert.assertEquals(0, cache.size()); + Assert.assertTrue(cache.asMap().keySet().isEmpty()); + Assert.assertTrue(cache.asMap().values().isEmpty()); + } + + private static class CacheStatsAssertions { + public static CacheStatsAssertions assertCacheStats(Cache cache) { + Objects.requireNonNull(cache, "cache is null"); + return assertCacheStats(cache::stats); + } + + public static CacheStatsAssertions assertCacheStats(Supplier statsSupplier) { + return new CacheStatsAssertions(statsSupplier); + } + + private final Supplier stats; + + private long loads; + private long hits; + private long misses; + + private CacheStatsAssertions(Supplier stats) { + this.stats = Objects.requireNonNull(stats, "stats is null"); + } + + public CacheStatsAssertions loads(long value) { + this.loads = value; + return this; + } + + public CacheStatsAssertions hits(long value) { + this.hits = value; + return this; + } + + public CacheStatsAssertions misses(long value) { + this.misses = value; + return this; + } + + public void afterRunning(Runnable runnable) { + try { + calling(() -> { + runnable.run(); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public T calling(Callable callable) + throws Exception { + CacheStats beforeStats = stats.get(); + T value = callable.call(); + CacheStats afterStats = stats.get(); + + long loadDelta = afterStats.loadCount() - beforeStats.loadCount(); + long missesDelta = afterStats.missCount() - beforeStats.missCount(); + long hitsDelta = afterStats.hitCount() - beforeStats.hitCount(); + + Assert.assertEquals(loads, loadDelta); + Assert.assertEquals(hits, hitsDelta); + Assert.assertEquals(misses, missesDelta); + + return value; + } + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadStats() + throws Exception { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(); + + Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats()); + + String value = CacheStatsAssertions.assertCacheStats(cache) + .misses(1) + .loads(1) + .calling(() -> cache.get(42, () -> "abc")); + Assert.assertEquals("abc", value); + + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(42, () -> "xyz")); + Assert.assertEquals("abc", value); + + // with equal, but not the same key + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(newInteger(42), () -> "xyz")); + Assert.assertEquals("abc", value); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadFailure() + throws Exception { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.DAYS) + .shareResultsAndFailuresEvenIfDisabled() + .build(); + int key = 10; + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Exchanger exchanger = new Exchanger<>(); + CountDownLatch secondUnblocked = new CountDownLatch(1); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + boolean first = i == 0; + futures.add(executor.submit(() -> { + if (!first) { + // Wait for the first one to start the call + exchanger.exchange(Thread.currentThread(), 10, TimeUnit.SECONDS); + // Prove that we are back in RUNNABLE state. + secondUnblocked.countDown(); + } + return cache.get(key, () -> { + if (first) { + Thread secondThread = exchanger.exchange(null, 10, TimeUnit.SECONDS); + Assert.assertTrue(secondUnblocked.await(10, TimeUnit.SECONDS)); + // Wait for the second one to hang inside the cache.get call. + long start = System.nanoTime(); + while (!Thread.currentThread().isInterrupted()) { + try { + Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState()); + break; + } catch (Exception | AssertionError e) { + if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(30)) { + throw e; + } + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + throw new RuntimeException("first attempt is poised to fail"); + } + return "success"; + }); + })); + } + + List results = new ArrayList<>(); + for (Future future : futures) { + try { + results.add(future.get()); + } catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + // Note: if this starts to fail, that suggests that Guava implementation changed and NoopCache may be redundant now. + String expectedError = "com.google.common.util.concurrent.UncheckedExecutionException: " + + "java.lang.RuntimeException: first attempt is poised to fail"; + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedError, results.get(0)); + Assert.assertEquals(expectedError, results.get(1)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + @SuppressModernizer + private static Integer newInteger(int value) { + Integer integer = value; + @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", "CachedNumberConstructorCall", "removal"}) + Integer newInteger = new Integer(value); + Assert.assertNotSame(integer, newInteger); + return newInteger; + } + + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys. + * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see + * https://github.com/trinodb/trino/issues/11067), + * the test exists primarily to document current state and support discussion, should the current state change. + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testConcurrentGetWithCallableShareLoad() + throws Exception { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + int key = invocation; + barrier.await(10, TimeUnit.SECONDS); + int value = cache.get(key, () -> { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + Preconditions.checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + }); + Assert.assertEquals(-invocation, value); + } + return null; + })); + } + + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + Assert.assertTrue( + String.format( + "loads (%d) should be between %d and %d", + loads.intValue(), + invocationsPerThread, + threads * invocationsPerThread - 1), + loads.intValue() >= invocationsPerThread && loads.intValue() <= threads * invocationsPerThread - 1); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + enum Invalidation { + INVALIDATE_KEY, + INVALIDATE_PREDEFINED_KEYS, + INVALIDATE_SELECTED_KEYS, + INVALIDATE_ALL, + /**/; + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateOngoingLoad() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + + CountDownLatch loadOngoing = new CountDownLatch(1); + CountDownLatch invalidated = new CountDownLatch(1); + CountDownLatch getReturned = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // thread A + Future threadA = executor.submit(() -> { + String value = cache.get(key, () -> { + loadOngoing.countDown(); // 1 + Assert.assertTrue(invalidated.await(10, TimeUnit.SECONDS)); // 2 + return "stale value"; + }); + getReturned.countDown(); // 3 + return value; + }); + + // thread B + Future threadB = executor.submit(() -> { + Assert.assertTrue(loadOngoing.await(10, TimeUnit.SECONDS)); // 1 + + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + invalidated.countDown(); // 2 + // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed. + Assert.assertTrue(getReturned.await(10, TimeUnit.SECONDS)); // 3 + + return cache.get(key, () -> "fresh value"); + }); + + Assert.assertEquals("stale value", threadA.get()); + Assert.assertEquals("fresh value", threadB.get()); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateAndLoadConcurrently() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + int[] primes = {2, 3, 5, 7}; + AtomicLong remoteState = new AtomicLong(1); + + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + int threads = 4; + + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + // prime the cache + Assert.assertEquals(1L, (long) cache.get(key, remoteState::get)); + int prime = primes[threadNumber]; + + barrier.await(10, TimeUnit.SECONDS); + + // modify underlying state + remoteState.updateAndGet(current -> current * prime); + + // invalidate + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + // read through cache + long current = cache.get(key, remoteState::get); + if (current % prime != 0) { + throw new AssertionError(String.format("The value read through cache (%s) in thread (%s) is not divisible by (%s)", current, threadNumber, prime)); + } + + return (Void) null; + })) + .collect(ImmutableList.toImmutableList()); + + for (Future future : futures) { + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to get future value", e); + } + } + + Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get()); + Assert.assertEquals(remoteState.get(), (long) cache.get(key, remoteState::get)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + @Test + public void testPutOnEmptyCacheImplementation() { + for (DisabledCacheImplementation disabledCacheImplementation : DisabledCacheImplementation.values()) { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .disabledCacheImplementation(disabledCacheImplementation) + .build(); + Map cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + } + } + + @Test + public void testPutOnNonEmptyCacheImplementation() { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + Map cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + + Exception putException = Assert.assertThrows("put operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.put(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead.", + putException.getMessage()); + + Exception putIfAbsentException = Assert.assertThrows("putIfAbsent operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.putIfAbsent(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation", + putIfAbsentException.getMessage()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java new file mode 100644 index 000000000000000..d6c6ed4b93bebbf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Execution(ExecutionMode.SAME_THREAD) +public class TransactionScopeCachingDirectoryListerTest { + @Test + public void testConcurrentDirectoryListing(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1); + RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1); + RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1); + + String path1 = "file:/x"; + String path2 = "file:/y"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister( + ImmutableMap.of( + path1, ImmutableList.of(firstFile, secondFile), + path2, ImmutableList.of(thirdFile))); + + TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) + new TransactionScopeCachingDirectoryListerFactory(2).get(countingLister); + + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + + Assert.assertEquals(1, countingLister.getListCount()); + + // listing path2 again shouldn't increase listing count + Assert.assertTrue(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(1, countingLister.getListCount()); + + + // start listing path1 concurrently + RemoteIterator path1FilesA = cachingLister.listFiles(null, true, table, path1); + RemoteIterator path1FilesB = cachingLister.listFiles(null, true, table, path1); + Assert.assertEquals(2, countingLister.getListCount()); + + // list path1 files using both iterators concurrently + Assert.assertEquals(firstFile, path1FilesA.next()); + Assert.assertEquals(firstFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesA.next()); + Assert.assertFalse(path1FilesA.hasNext()); + Assert.assertFalse(path1FilesB.hasNext()); + Assert.assertEquals(2, countingLister.getListCount()); + + Assert.assertFalse(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(3, countingLister.getListCount()); + } + + @Test + public void testConcurrentDirectoryListingException(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1); + + String path = "file:/x"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); + DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister); + + // start listing path concurrently + countingLister.setThrowException(true); + RemoteIterator filesA = cachingLister.listFiles(null, true, table, path); + RemoteIterator filesB = cachingLister.listFiles(null, true, table, path); + Assert.assertEquals(1, countingLister.getListCount()); + + // listing should throw an exception + Assert.assertThrows(FileSystemIOException.class, () -> filesA.hasNext()); + + + // listing again should succeed + countingLister.setThrowException(false); + assertFiles(cachingLister.listFiles(null, true, table, path), ImmutableList.of(file)); + Assert.assertEquals(2, countingLister.getListCount()); + + // listing using second concurrently initialized DirectoryLister should fail + Assert.assertThrows(FileSystemIOException.class, () -> filesB.hasNext()); + + } + + private void assertFiles(RemoteIterator iterator, List expectedFiles) + throws FileSystemIOException { + ImmutableList.Builder actualFiles = ImmutableList.builder(); + while (iterator.hasNext()) { + actualFiles.add(iterator.next()); + } + Assert.assertEquals(expectedFiles, actualFiles.build()); + } + + private static class CountingDirectoryLister + implements DirectoryLister { + private final Map> fileStatuses; + private int listCount; + private boolean throwException; + + public CountingDirectoryLister(Map> fileStatuses) { + this.fileStatuses = Objects.requireNonNull(fileStatuses, "fileStatuses is null"); + } + + @Override + public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + // No specific recursive files-only listing implementation + listCount++; + return throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), throwException); + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + public int getListCount() { + return listCount; + } + } + + static RemoteIterator throwingRemoteIterator(List files, boolean throwException) { + return new RemoteIterator() { + private final Iterator iterator = ImmutableList.copyOf(files).iterator(); + + @Override + public boolean hasNext() + throws FileSystemIOException { + if (throwException) { + throw new FileSystemIOException("File system io exception."); + } + return iterator.hasNext(); + } + + @Override + public RemoteFile next() { + return iterator.next(); + } + }; + } +}