From d7cb6093820ff2b4ea5b8373f2e6a61f1a6571d9 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Wed, 16 Oct 2024 18:04:58 +0800 Subject: [PATCH] support async lookup --- .../java/org/apache/paimon/CoreOptions.java | 7 + .../serializer/RowCompactedSerializer.java | 9 +- .../apache/paimon/io/cache/CacheManager.java | 9 +- .../paimon/utils/ListDelimitedSerializer.java | 7 +- .../paimon/lookup/RocksDBListState.java | 4 +- .../apache/paimon/lookup/RocksDBSetState.java | 5 +- .../apache/paimon/lookup/RocksDBState.java | 2 +- .../paimon/lookup/RocksDBValueState.java | 4 +- .../apache/paimon/mergetree/LookupLevels.java | 79 ++++-- .../paimon/table/query/LocalTableQuery.java | 17 +- .../lookup/AsyncLookupFunctionWrapper.java | 4 +- .../flink/lookup/FileStoreLookupFunction.java | 35 ++- .../flink/lookup/PrimaryKeyLookupTable.java | 8 +- .../lookup/PrimaryKeyPartialLookupTable.java | 22 +- .../paimon/flink/query/RemoteTableQuery.java | 13 +- .../flink/source/BaseDataTableSource.java | 3 +- .../apache/paimon/flink/LookupJoinITCase.java | 233 ++++++++++++++++++ 17 files changed, 389 insertions(+), 72 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 1256c7ba0d87..5c0830a790b2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1450,6 +1450,13 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The serialized refresh handler of materialized table."); + @ExcludeFromDocumentation("Internal use only") + public static final ConfigOption LOOKUP_HASH_ASYNC_THREAD_NUMBER = + key("lookup.hash.async-thread-number") + .intType() + .defaultValue(16) + .withDescription("The thread number for lookup async for hash store."); + private final Options options; public CoreOptions(Map options) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java index 3529349588ee..924d39d074b8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java @@ -101,14 +101,15 @@ public InternalRow copy(InternalRow from) { } @Override - public void serialize(InternalRow record, DataOutputView target) throws IOException { + public synchronized void serialize(InternalRow record, DataOutputView target) + throws IOException { byte[] bytes = serializeToBytes(record); VarLengthIntUtils.encodeInt(target, bytes.length); target.write(bytes); } @Override - public InternalRow deserialize(DataInputView source) throws IOException { + public synchronized InternalRow deserialize(DataInputView source) throws IOException { int len = VarLengthIntUtils.decodeInt(source); byte[] bytes = new byte[len]; source.readFully(bytes); @@ -132,7 +133,7 @@ public int hashCode() { return Objects.hash(rowType); } - public byte[] serializeToBytes(InternalRow record) { + public synchronized byte[] serializeToBytes(InternalRow record) { if (rowWriter == null) { rowWriter = new RowWriter(calculateBitSetInBytes(getters.length)); } @@ -149,7 +150,7 @@ public byte[] serializeToBytes(InternalRow record) { return rowWriter.copyBuffer(); } - public InternalRow deserialize(byte[] bytes) { + public synchronized InternalRow deserialize(byte[] bytes) { if (rowReader == null) { rowReader = new RowReader(calculateBitSetInBytes(getters.length)); } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java index b8f00205ed91..08e3260347fa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java @@ -27,6 +27,7 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; /** Cache manager to cache bytes to paged {@link MemorySegment}s. */ public class CacheManager { @@ -39,7 +40,7 @@ public class CacheManager { private final Cache cache; - private int fileReadCount; + private final AtomicInteger fileReadCount; public CacheManager(MemorySize maxMemorySize) { this.cache = @@ -49,7 +50,7 @@ public CacheManager(MemorySize maxMemorySize) { .removalListener(this::onRemoval) .executor(Runnable::run) .build(); - this.fileReadCount = 0; + this.fileReadCount = new AtomicInteger(0); } @VisibleForTesting @@ -61,7 +62,7 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal CacheValue value = cache.getIfPresent(key); while (value == null || value.isClosed) { try { - this.fileReadCount++; + this.fileReadCount.incrementAndGet(); value = new CacheValue(MemorySegment.wrap(reader.read(key)), callback); } catch (IOException e) { throw new RuntimeException(e); @@ -87,7 +88,7 @@ private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) { } public int fileReadCount() { - return fileReadCount; + return fileReadCount.get(); } private static class CacheValue { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java index 12b3fa54c8d2..d780f9ca7772 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java @@ -39,7 +39,8 @@ public final class ListDelimitedSerializer { private final DataInputDeserializer dataInputView = new DataInputDeserializer(); private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128); - public List deserializeList(byte[] valueBytes, Serializer elementSerializer) { + public synchronized List deserializeList( + byte[] valueBytes, Serializer elementSerializer) { if (valueBytes == null) { return null; } @@ -54,7 +55,7 @@ public List deserializeList(byte[] valueBytes, Serializer elementSeria return result; } - public byte[] serializeList(List valueList, Serializer elementSerializer) + public synchronized byte[] serializeList(List valueList, Serializer elementSerializer) throws IOException { dataOutputView.clear(); @@ -74,7 +75,7 @@ public byte[] serializeList(List valueList, Serializer elementSerializ return dataOutputView.getCopyOfBuffer(); } - public byte[] serializeList(List valueList) throws IOException { + public synchronized byte[] serializeList(List valueList) throws IOException { dataOutputView.clear(); boolean first = true; diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java index 7fe29d209a70..52ee76958757 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java @@ -72,13 +72,13 @@ public List get(K key) throws IOException { }); } - public byte[] serializeValue(V value) throws IOException { + public synchronized byte[] serializeValue(V value) throws IOException { valueOutputView.clear(); valueSerializer.serialize(value, valueOutputView); return valueOutputView.getCopyOfBuffer(); } - public byte[] serializeList(List valueList) throws IOException { + public synchronized byte[] serializeList(List valueList) throws IOException { return listSerializer.serializeList(valueList); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java index 5c06cbd51a42..df080c29ab95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java @@ -64,7 +64,10 @@ public List get(K key) throws IOException { } cache.put(keyBytes, valueBytes); } + return serializeValues(valueBytes); + } + private synchronized List serializeValues(List valueBytes) throws IOException { List values = new ArrayList<>(valueBytes.size()); for (byte[] value : valueBytes) { valueInputView.setBuffer(value); @@ -93,7 +96,7 @@ public void add(K key, V value) throws IOException { } } - private byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException { + private synchronized byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException { checkArgument(value != null); keyOutView.clear(); diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java index 25e58984edbe..13b4b0aee6ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java @@ -85,7 +85,7 @@ public RocksDBState( .build(); } - public byte[] serializeKey(K key) throws IOException { + public synchronized byte[] serializeKey(K key) throws IOException { keyOutView.clear(); keySerializer.serialize(key, keyOutView); return keyOutView.getCopyOfBuffer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java index 444ed5ad061a..05483390be22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java @@ -86,12 +86,12 @@ public void delete(K key) throws IOException { } } - public V deserializeValue(byte[] valueBytes) throws IOException { + public synchronized V deserializeValue(byte[] valueBytes) throws IOException { valueInputView.setBuffer(valueBytes); return valueSerializer.deserialize(valueInputView); } - public byte[] serializeValue(V value) throws IOException { + public synchronized byte[] serializeValue(V value) throws IOException { valueOutputView.clear(); valueSerializer.serialize(value, valueOutputView); return valueOutputView.getCopyOfBuffer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index f7c783875fb0..b66b2dca6286 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE; @@ -67,6 +68,9 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final Cache lookupFileCache; private final Set ownCachedFiles; + private final ReentrantLock[] locks; + private final int lookupAsyncThreadNumber; + public LookupLevels( Levels levels, Comparator keyComparator, @@ -76,7 +80,8 @@ public LookupLevels( Function localFileFactory, LookupStoreFactory lookupStoreFactory, Function bfGenerator, - Cache lookupFileCache) { + Cache lookupFileCache, + int lookupAsyncThreadNumber) { this.levels = levels; this.keyComparator = keyComparator; this.keySerializer = new RowCompactedSerializer(keyType); @@ -87,9 +92,37 @@ public LookupLevels( this.bfGenerator = bfGenerator; this.lookupFileCache = lookupFileCache; this.ownCachedFiles = new HashSet<>(); + this.lookupAsyncThreadNumber = lookupAsyncThreadNumber; + this.locks = new ReentrantLock[lookupAsyncThreadNumber]; + for (int i = 0; i < lookupAsyncThreadNumber; i++) { + locks[i] = new ReentrantLock(); + } levels.addDropFileCallback(this); } + public LookupLevels( + Levels levels, + Comparator keyComparator, + RowType keyType, + ValueProcessor valueProcessor, + IOFunction> fileReaderFactory, + Function localFileFactory, + LookupStoreFactory lookupStoreFactory, + Function bfGenerator, + Cache lookupFileCache) { + this( + levels, + keyComparator, + keyType, + valueProcessor, + fileReaderFactory, + localFileFactory, + lookupStoreFactory, + bfGenerator, + lookupFileCache, + 1); + } + public Levels getLevels() { return levels; } @@ -126,29 +159,37 @@ private T lookup(InternalRow key, SortedRun level) throws IOException { @Nullable private T lookup(InternalRow key, DataFileMeta file) throws IOException { - LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName()); + ReentrantLock lock = locks[Math.abs(file.hashCode()) % lookupAsyncThreadNumber]; + try { + lock.lock(); + LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName()); - boolean newCreatedLookupFile = false; - if (lookupFile == null) { - lookupFile = createLookupFile(file); - newCreatedLookupFile = true; - } + boolean newCreatedLookupFile = false; + if (lookupFile == null) { + lookupFile = createLookupFile(file); + newCreatedLookupFile = true; + } - byte[] valueBytes; - try { - byte[] keyBytes = keySerializer.serializeToBytes(key); - valueBytes = lookupFile.get(keyBytes); + byte[] valueBytes; + try { + byte[] keyBytes = keySerializer.serializeToBytes(key); + valueBytes = lookupFile.get(keyBytes); + } finally { + if (newCreatedLookupFile) { + lookupFileCache.put(file.fileName(), lookupFile); + } + } + if (valueBytes == null) { + return null; + } + + return valueProcessor.readFromDisk( + key, lookupFile.remoteFile().level(), valueBytes, file.fileName()); } finally { - if (newCreatedLookupFile) { - lookupFileCache.put(file.fileName(), lookupFile); + if (lock != null) { + lock.unlock(); } } - if (valueBytes == null) { - return null; - } - - return valueProcessor.readFromDisk( - key, lookupFile.remoteFile().level(), valueBytes, file.fileName()); } private LookupFile createLookupFile(DataFileMeta file) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index d5b392d9e099..e567c7a84693 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -50,9 +50,9 @@ import java.io.IOException; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; @@ -85,7 +85,7 @@ public class LocalTableQuery implements TableQuery { public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); - this.tableView = new HashMap<>(); + this.tableView = new ConcurrentHashMap<>(); FileStore tableStore = table.store(); if (!(tableStore instanceof KeyValueFileStore)) { throw new UnsupportedOperationException( @@ -128,7 +128,7 @@ public void refreshFiles( List beforeFiles, List dataFiles) { LookupLevels lookupLevels = - tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket); + tableView.computeIfAbsent(partition, k -> new ConcurrentHashMap<>()).get(bucket); if (lookupLevels == null) { Preconditions.checkArgument( beforeFiles.isEmpty(), @@ -180,16 +180,17 @@ private void newLookupLevels(BinaryRow partition, int bucket, List .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache, + options.get(CoreOptions.LOOKUP_HASH_ASYNC_THREAD_NUMBER)); - tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels); + tableView + .computeIfAbsent(partition, k -> new ConcurrentHashMap<>()) + .put(bucket, lookupLevels); } - /** TODO remove synchronized and supports multiple thread to lookup. */ @Nullable @Override - public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) - throws IOException { + public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException { Map> buckets = tableView.get(partition); if (buckets == null || buckets.isEmpty()) { return null; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java index 99f3d4643cbb..c78169a9f430 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java @@ -54,9 +54,7 @@ private Collection lookup(RowData keyRow) { Thread.currentThread() .setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader()); try { - synchronized (function) { - return function.lookup(keyRow); - } + return function.lookup(keyRow); } catch (IOException e) { throw new UncheckedIOException(e); } finally { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 347370c6840e..2a26a86ba0e1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -68,6 +68,7 @@ import java.util.stream.IntStream; import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.paimon.CoreOptions.LOOKUP_HASH_ASYNC_THREAD_NUMBER; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; @@ -98,9 +99,14 @@ public class FileStoreLookupFunction implements Serializable, Closeable { protected FunctionContext functionContext; @Nullable private Filter cacheRowFilter; + private final int lookupAsyncThreadNumber; public FileStoreLookupFunction( - Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { + Table table, + int[] projection, + int[] joinKeyIndex, + @Nullable Predicate predicate, + int lookupAsyncThreadNumber) { if (!TableScanUtils.supportCompactDiffStreamingReading(table)) { TableScanUtils.streamingReadingValidate(table); } @@ -131,9 +137,15 @@ public FileStoreLookupFunction( } this.predicate = predicate; + this.lookupAsyncThreadNumber = lookupAsyncThreadNumber; } - public void open(FunctionContext context) throws Exception { + public FileStoreLookupFunction( + Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { + this(table, projection, joinKeyIndex, predicate, 1); + } + + public synchronized void open(FunctionContext context) throws Exception { this.functionContext = context; String tmpDirectory = getTmpDirectory(context); open(tmpDirectory); @@ -162,7 +174,8 @@ private void open() throws Exception { List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); - FileStoreTable storeTable = (FileStoreTable) table; + FileStoreTable storeTable = + setDynamicOptions((FileStoreTable) table, lookupAsyncThreadNumber); if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { @@ -204,6 +217,14 @@ private void open() throws Exception { lookupTable.open(); } + private FileStoreTable setDynamicOptions( + FileStoreTable storeTable, int lookupAsyncThreadNumber) { + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put( + LOOKUP_HASH_ASYNC_THREAD_NUMBER.key(), String.valueOf(lookupAsyncThreadNumber)); + return storeTable.copy(dynamicOptions); + } + @Nullable private Predicate createProjectedPredicate(int[] projection) { Predicate adjustedPredicate = null; @@ -248,7 +269,7 @@ public Collection lookup(RowData keyRow) { } @Nullable - private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception { + private synchronized BinaryRow refreshDynamicPartition(boolean reopen) throws Exception { if (partitionLoader == null) { return null; } @@ -284,7 +305,7 @@ private Predicate createSpecificPartFilter(BinaryRow partition) { return createPartitionPredicate(rowType, partitionMap); } - private void reopen() { + private synchronized void reopen() { try { close(); open(); @@ -293,7 +314,7 @@ private void reopen() { } } - private void checkRefresh() throws Exception { + private synchronized void checkRefresh() throws Exception { if (nextLoadTime > System.currentTimeMillis()) { return; } @@ -319,7 +340,7 @@ private void refresh() throws Exception { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (lookupTable != null) { lookupTable.close(); lookupTable = null; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index c06120d61d81..2feb5f5e6a4f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -37,7 +37,7 @@ /** A {@link LookupTable} for primary key table. */ public class PrimaryKeyLookupTable extends FullCacheLookupTable { - + private final Object lock = new Object(); protected final long lruCacheSize; protected final KeyProjectedRow primaryKeyRow; @@ -86,8 +86,10 @@ protected void createTableState() throws IOException { @Override public List innerGet(InternalRow key) throws IOException { - if (keyRearrange != null) { - key = keyRearrange.replaceRow(key); + synchronized (lock) { + if (keyRearrange != null) { + key = keyRearrange.replaceRow(key); + } } InternalRow value = tableState.get(key); return value == null ? Collections.emptyList() : Collections.singletonList(value); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index ef5543ac9b7c..be383fb56274 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.Triple; import javax.annotation.Nullable; @@ -108,6 +109,19 @@ public void open() throws Exception { @Override public List get(InternalRow key) throws IOException { + Triple partitionAndBucket = extractPartitionAndBucket(key); + InternalRow kv = + queryExecutor.lookup( + partitionAndBucket.f0, partitionAndBucket.f1, partitionAndBucket.f2); + if (kv == null) { + return Collections.emptyList(); + } else { + return Collections.singletonList(kv); + } + } + + private synchronized Triple extractPartitionAndBucket( + InternalRow key) { InternalRow adjustedKey = key; if (keyRearrange != null) { adjustedKey = keyRearrange.replaceRow(adjustedKey); @@ -120,13 +134,7 @@ public List get(InternalRow key) throws IOException { if (trimmedKeyRearrange != null) { trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey); } - - InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey); - if (kv == null) { - return Collections.emptyList(); - } else { - return Collections.singletonList(kv); - } + return Triple.of(partition, bucket, trimmedKey); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java index 34d993eb3db2..ccee94c8cd80 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -42,7 +42,7 @@ /** Implementation for {@link TableQuery} to lookup data from remote service. */ public class RemoteTableQuery implements TableQuery { - + private final Object lock = new Object(); private final FileStoreTable table; private final KvQueryClient client; private final InternalRowSerializer keySerializer; @@ -65,13 +65,12 @@ public static boolean isRemoteServiceAvailable(FileStoreTable table) { @Override public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException { BinaryRow row; + BinaryRow binaryRowKey; try { - row = - client.getValues( - partition, - bucket, - new BinaryRow[] {keySerializer.toBinaryRow(key)}) - .get()[0]; + synchronized (lock) { + binaryRowKey = keySerializer.toBinaryRow(key); + } + row = client.getValues(partition, bucket, new BinaryRow[] {binaryRowKey}).get()[0]; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 9458f7817415..d9fbda3fd7d2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -244,7 +244,8 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { boolean enableAsync = options.get(LOOKUP_ASYNC); int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER); return LookupRuntimeProviderFactory.create( - new FileStoreLookupFunction(table, projection, joinKey, predicate), + new FileStoreLookupFunction( + table, projection, joinKey, predicate, asyncThreadNumber), enableAsync, asyncThreadNumber); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 46399f85632a..8de4e1b9bb0f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -26,6 +26,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,10 @@ private void initTable(LookupCacheMode cacheMode) { "CREATE TABLE PARTITIONED_DIM (i INT, j INT, k1 INT, k2 INT, PRIMARY KEY (i, j) NOT ENFORCED)" + "PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms' %s)"; + String bucketDim = + "CREATE TABLE DIM_BUCKET (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) " + + " WITH ('continuous.discovery-interval'='1 ms', 'bucket' = '16' %s)"; + String fullOption = ", 'lookup.cache' = 'full'"; String lruOption = ", 'changelog-producer'='lookup'"; @@ -63,10 +68,12 @@ private void initTable(LookupCacheMode cacheMode) { case FULL: tEnv.executeSql(String.format(dim, fullOption)); tEnv.executeSql(String.format(partitioned, fullOption)); + tEnv.executeSql(String.format(bucketDim, fullOption)); break; case AUTO: tEnv.executeSql(String.format(dim, lruOption)); tEnv.executeSql(String.format(partitioned, lruOption)); + tEnv.executeSql(String.format(bucketDim, lruOption)); break; default: throw new UnsupportedOperationException(); @@ -982,4 +989,230 @@ public void testPartialCacheBucketKeyOrder(LookupCacheMode mode) throws Exceptio iterator.close(); } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncLookupFullCacheSmallData(LookupCacheMode cacheMode) throws Exception { + initTable(cacheMode); + batchSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)"); + + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM/*+OPTIONS('lookup.async'='true')*/ for system_time " + + "as of " + + "T.proctime AS D" + + " ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + batchSql("INSERT INTO T VALUES (1), (2), (3)"); + List result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), + Row.of(3, null, null, null)); + + batchSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)"); + Thread.sleep(2000); // wait refresh + batchSql("INSERT INTO T VALUES (1), (2), (3), (4)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 44, 444, 4444), + Row.of(3, 33, 333, 3333), + Row.of(4, null, null, null)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncLookupHashCacheSmallData(LookupCacheMode cacheMode) throws Exception { + initTable(cacheMode); + batchSql("INSERT INTO BUCKET_DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)"); + + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN BUCKET_DIM/*+OPTIONS('lookup.async'='true')*/ for system_time " + + "as of " + + "T.proctime AS D" + + " ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + batchSql("INSERT INTO T VALUES (1), (2), (3)"); + List result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), + Row.of(3, null, null, null)); + + batchSql("INSERT INTO BUCKET_DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)"); + Thread.sleep(2000); // wait refresh + batchSql("INSERT INTO T VALUES (1), (2), (3), (4)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 44, 444, 4444), + Row.of(3, 33, 333, 3333), + Row.of(4, null, null, null)); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncLookupHashCacheBigData(LookupCacheMode cacheMode) throws Exception { + initTable(cacheMode); + StringBuilder sb = new StringBuilder(); + // insert the dim table + String sql = "INSERT INTO BUCKET_DIM VALUES "; + sb.append(sql); + int dimTableCount = 1000; + for (int i = 1; i <= dimTableCount; i++) { + sql = String.format("(%d, %d, %d, %d),", i, i * 10, i * 100, i * 1000); + sb.append(sql); + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN BUCKET_DIM/*+OPTIONS('lookup.async'='true')*/ for system_time " + + "as of " + + "T.proctime AS D" + + " ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // insert the dynamic table + int dynamicTableCount = dimTableCount / 2; + sb.setLength(0); + sb.append("INSERT INTO T VALUES "); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + sb.append(String.format("(%d),", i)); + } + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + + // construct the result + List result = iterator.collect(dynamicTableCount / 2); + List expectList = new ArrayList<>(); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + expectList.add(Row.of(i, i * 10, i * 100, i * 1000)); + } + } + assertThat(result).containsExactlyInAnyOrderElementsOf(expectList); + + // insert the dynamic table + sb.setLength(0); + sb.append("INSERT INTO T VALUES "); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + sb.append(String.format("(%d),", i)); + } + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + batchSql(String.format("INSERT INTO T VALUES (%d)", dimTableCount + 1)); + + // update the dim table + batchSql("INSERT INTO BUCKET_DIM VALUES (2, 22, 222, 2222), (4, 44, 444, 4444)"); + + // check result + Thread.sleep(2000); // wait refresh + result = iterator.collect(dynamicTableCount / 2 + 1); + expectList.clear(); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0 && i != 2 && i != 4) { + expectList.add(Row.of(i, i * 10, i * 100, i * 1000)); + } + } + expectList.add(Row.of(2, 22, 222, 2222)); + expectList.add(Row.of(4, 44, 444, 4444)); + expectList.add(Row.of(dimTableCount + 1, null, null, null)); + + assertThat(result).containsExactlyInAnyOrderElementsOf(expectList); + + iterator.close(); + } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testAsyncLookupFullCacheBigData(LookupCacheMode cacheMode) throws Exception { + initTable(cacheMode); + StringBuilder sb = new StringBuilder(); + // insert the dim table + String sql = "INSERT INTO DIM VALUES "; + sb.append(sql); + int dimTableCount = 1000; + for (int i = 1; i <= dimTableCount; i++) { + sql = String.format("(%d, %d, %d, %d),", i, i * 10, i * 100, i * 1000); + sb.append(sql); + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM/*+OPTIONS('lookup.async'='true')*/ for system_time " + + "as of " + + "T.proctime AS D" + + " ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + // insert the dynamic table + int dynamicTableCount = dimTableCount / 2; + sb.setLength(0); + sb.append("INSERT INTO T VALUES "); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + sb.append(String.format("(%d),", i)); + } + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + + // construct the result + List result = iterator.collect(dynamicTableCount / 2); + List expectList = new ArrayList<>(); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + expectList.add(Row.of(i, i * 10, i * 100, i * 1000)); + } + } + assertThat(result).containsExactlyInAnyOrderElementsOf(expectList); + + // insert the dynamic table + sb.setLength(0); + sb.append("INSERT INTO T VALUES "); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0) { + sb.append(String.format("(%d),", i)); + } + } + sb.deleteCharAt(sb.length() - 1); + batchSql(sb.toString()); + batchSql(String.format("INSERT INTO T VALUES (%d)", dimTableCount + 1)); + + // update the dim table + batchSql("INSERT INTO DIM VALUES (2, 22, 222, 2222), (4, 44, 444, 4444)"); + + // check result + Thread.sleep(2000); // wait refresh + result = iterator.collect(dynamicTableCount / 2 + 1); + expectList.clear(); + for (int i = 1; i <= dynamicTableCount; i++) { + if (i % 2 == 0 && i != 2 && i != 4) { + expectList.add(Row.of(i, i * 10, i * 100, i * 1000)); + } + } + expectList.add(Row.of(2, 22, 222, 2222)); + expectList.add(Row.of(4, 44, 444, 4444)); + expectList.add(Row.of(dimTableCount + 1, null, null, null)); + + assertThat(result).containsExactlyInAnyOrderElementsOf(expectList); + + iterator.close(); + } }