From 2c3fff49b385e7d1c7c862cb659df569bbfac4cd Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Thu, 2 Jan 2025 18:14:46 +0800 Subject: [PATCH] [kv] Improve PrefixLookup implementation (#222) In the previous commit, the bucket key is only allowed to be a prefix of primary key, but we should allow it to be a subset of primary key. Besides, this commit fixes various bugs around PrefixLookup. --- .../fluss/client/lookup/AbstractLookup.java | 8 +- .../alibaba/fluss/client/lookup/Lookup.java | 9 +- .../fluss/client/lookup/LookupClient.java | 2 +- .../fluss/client/lookup/LookupQueue.java | 19 +- .../fluss/client/lookup/LookupResult.java | 19 +- .../fluss/client/lookup/LookupSender.java | 39 ++- .../fluss/client/lookup/PrefixLookup.java | 11 +- .../client/lookup/PrefixLookupBatch.java | 2 +- .../client/lookup/PrefixLookupResult.java | 2 +- .../fluss/client/table/FlussTable.java | 15 +- .../com/alibaba/fluss/client/table/Table.java | 7 +- .../client/table/writer/UpsertWriter.java | 23 +- .../fluss/client/lookup/LookupQueueTest.java | 65 +++++ .../fluss/metadata/TableDescriptor.java | 85 +++---- .../com/alibaba/fluss/utils/ArrayUtils.java | 89 +++++++ .../fluss/metadata/TableDescriptorTest.java | 68 ++++-- .../alibaba/fluss/utils/ArrayUtilsTest.java | 83 ++++++- .../flink/catalog/FlinkTableFactory.java | 31 ++- .../flink/source/FlinkTableSource.java | 29 ++- .../lookup/FlinkAsyncLookupFunction.java | 13 +- .../source/lookup/FlinkLookupFunction.java | 6 +- .../flink/source/lookup/LookupNormalizer.java | 222 ++++++++++++------ .../utils/FlinkConnectorOptionsUtils.java | 26 ++ .../connector/flink/utils/PushdownUtils.java | 6 +- .../flink/source/FlinkTableSourceITCase.java | 119 ++++++++-- .../lookup/FlinkLookupFunctionTest.java | 18 +- .../entity/PrefixLookupResultForBucket.java | 2 +- .../rpc/gateway/TabletServerGateway.java | 4 +- .../rpc/netty/server/RequestsMetrics.java | 2 +- .../fluss/server/kv/rocksdb/RocksDBKv.java | 27 +-- .../group/PhysicalTableMetricGroup.java | 2 +- .../alibaba/fluss/server/replica/Replica.java | 11 - .../fluss/server/replica/ReplicaManager.java | 19 +- .../server/replica/ReplicaManagerTest.java | 19 +- .../server/tablet/TabletServiceITCase.java | 17 +- website/docs/maintenance/monitor-metrics.md | 8 +- 36 files changed, 757 insertions(+), 370 deletions(-) create mode 100644 fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java index b6aaef98..9d1e9a2a 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java @@ -25,9 +25,11 @@ @Internal public abstract class AbstractLookup { + private final TableBucket tableBucket; private final byte[] key; - public AbstractLookup(byte[] key) { + public AbstractLookup(TableBucket tableBucket, byte[] key) { + this.tableBucket = tableBucket; this.key = key; } @@ -35,7 +37,9 @@ public byte[] key() { return key; } - public abstract TableBucket tableBucket(); + public TableBucket tableBucket() { + return tableBucket; + } public abstract LookupType lookupType(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java index f8b18c3f..8d340a9d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java @@ -28,20 +28,13 @@ @Internal public class Lookup extends AbstractLookup { - private final TableBucket tableBucket; private final CompletableFuture future; Lookup(TableBucket tableBucket, byte[] key) { - super(key); - this.tableBucket = tableBucket; + super(tableBucket, key); this.future = new CompletableFuture<>(); } - @Override - public TableBucket tableBucket() { - return tableBucket; - } - @Override public LookupType lookupType() { return LookupType.LOOKUP; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index 7d53e4c7..72927a36 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -84,7 +84,7 @@ public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes public CompletableFuture> prefixLookup( long tableId, int bucketId, byte[] keyBytes) { - // TODO index lookup support partition table. + // TODO prefix lookup support partition table (#266) PrefixLookup prefixLookup = new PrefixLookup(new TableBucket(tableId, bucketId), keyBytes); lookupQueue.appendLookup(prefixLookup); return prefixLookup.future(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java index c1f2a40f..7cc20e39 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java @@ -29,23 +29,23 @@ /** * A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when - * call method {@link #drain(LookupType)}. + * call method {@link #drain()}. */ @ThreadSafe @Internal class LookupQueue { private volatile boolean closed; - // Buffering both the Lookup and IndexLookup. + // buffering both the Lookup and PrefixLookup. private final ArrayBlockingQueue> lookupQueue; private final int maxBatchSize; - private final long batchTimeoutMs; + private final long batchTimeoutNanos; LookupQueue(Configuration conf) { this.lookupQueue = new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE)); this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE); - this.batchTimeoutMs = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toMillis(); + this.batchTimeoutNanos = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toNanos(); this.closed = false; } @@ -68,20 +68,23 @@ boolean hasUnDrained() { /** Drain a batch of {@link Lookup}s from the lookup queue. */ List> drain() throws Exception { - long start = System.currentTimeMillis(); + final long startNanos = System.nanoTime(); List> lookupOperations = new ArrayList<>(maxBatchSize); int count = 0; while (true) { - if (System.currentTimeMillis() - start > batchTimeoutMs) { + long waitNanos = batchTimeoutNanos - (System.nanoTime() - startNanos); + if (waitNanos <= 0) { break; } - AbstractLookup lookup = lookupQueue.poll(300, TimeUnit.MILLISECONDS); + AbstractLookup lookup = lookupQueue.poll(waitNanos, TimeUnit.NANOSECONDS); if (lookup == null) { break; } - count++; lookupOperations.add(lookup); + count++; + int transferred = lookupQueue.drainTo(lookupOperations, maxBatchSize - count); + count += transferred; if (count >= maxBatchSize) { break; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java index d11c8d19..d31f7bbe 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java @@ -20,6 +20,10 @@ import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.row.InternalRow; +import javax.annotation.Nullable; + +import java.util.Objects; + /** * The result of {@link Table#lookup(InternalRow)}. * @@ -27,13 +31,13 @@ */ @PublicEvolving public final class LookupResult { - private final InternalRow row; + private final @Nullable InternalRow row; - public LookupResult(InternalRow row) { + public LookupResult(@Nullable InternalRow row) { this.row = row; } - public InternalRow getRow() { + public @Nullable InternalRow getRow() { return row; } @@ -45,17 +49,18 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - LookupResult that = (LookupResult) o; - return row.equals(that.row); + + LookupResult lookupResult = (LookupResult) o; + return Objects.equals(row, lookupResult.row); } @Override public int hashCode() { - return row.hashCode(); + return Objects.hash(row); } @Override public String toString() { - return "LookupResult{" + "row=" + row + '}'; + return "LookupResult{row=" + row + '}'; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java index bd142f14..cd3be7be 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java @@ -110,13 +110,12 @@ private void sendLookups(List> lookups) { if (lookups.isEmpty()) { return; } - // group by to lookup batches + // group by to lookup batches Map, List>> lookupBatches = groupByLeaderAndType(lookups); // now, send the batches lookupBatches.forEach( - (destinationAndType, batch) -> - sendLookups(destinationAndType.f0, destinationAndType.f1, batch)); + (destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch)); } private Map, List>> groupByLeaderAndType( @@ -124,22 +123,21 @@ private Map, List>> groupByLeaderA // -> lookup batches Map, List>> lookupBatchesByLeader = new HashMap<>(); - for (AbstractLookup abstractLookup : lookups) { + for (AbstractLookup lookup : lookups) { int leader; // lookup the leader node - TableBucket tb = abstractLookup.tableBucket(); + TableBucket tb = lookup.tableBucket(); try { // TODO this can be a re-triable operation. We should retry here instead of // throwing exception. leader = metadataUpdater.leaderFor(tb); } catch (Exception e) { - abstractLookup.future().completeExceptionally(e); + lookup.future().completeExceptionally(e); continue; } lookupBatchesByLeader - .computeIfAbsent( - Tuple2.of(leader, abstractLookup.lookupType()), k -> new ArrayList<>()) - .add(abstractLookup); + .computeIfAbsent(Tuple2.of(leader, lookup.lookupType()), k -> new ArrayList<>()) + .add(lookup); } return lookupBatchesByLeader; } @@ -157,11 +155,10 @@ private void sendLookups( } } - private void sendLookupRequest( - TabletServerGateway gateway, List> lookupBatches) { + private void sendLookupRequest(TabletServerGateway gateway, List> lookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : lookupBatches) { + for (AbstractLookup abstractLookup : lookups) { Lookup lookup = (Lookup) abstractLookup; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); @@ -181,10 +178,10 @@ private void sendLookupRequest( } private void sendPrefixLookupRequest( - TabletServerGateway gateway, List> indexLookupBatches) { + TabletServerGateway gateway, List> prefixLookups) { // table id -> (bucket -> lookups) Map> lookupByTableId = new HashMap<>(); - for (AbstractLookup abstractLookup : indexLookupBatches) { + for (AbstractLookup abstractLookup : prefixLookups) { PrefixLookup prefixLookup = (PrefixLookup) abstractLookup; TableBucket tb = prefixLookup.tableBucket(); long tableId = tb.getTableId(); @@ -195,12 +192,12 @@ private void sendPrefixLookupRequest( } lookupByTableId.forEach( - (tableId, indexLookupBatch) -> + (tableId, prefixLookupBatch) -> sendPrefixLookupRequestAndHandleResponse( gateway, - makePrefixLookupRequest(tableId, indexLookupBatch.values()), + makePrefixLookupRequest(tableId, prefixLookupBatch.values()), tableId, - indexLookupBatch)); + prefixLookupBatch)); } private void sendLookupRequestAndHandleResponse( @@ -307,7 +304,7 @@ private void handleLookupResponse( private void handlePrefixLookupResponse( long tableId, PrefixLookupResponse prefixLookupResponse, - Map indexLookupsByBucket) { + Map prefixLookupsByBucket) { for (PbPrefixLookupRespForBucket pbRespForBucket : prefixLookupResponse.getBucketsRespsList()) { TableBucket tableBucket = @@ -318,7 +315,7 @@ private void handlePrefixLookupResponse( : null, pbRespForBucket.getBucketId()); - PrefixLookupBatch prefixLookupBatch = indexLookupsByBucket.get(tableBucket); + PrefixLookupBatch prefixLookupBatch = prefixLookupsByBucket.get(tableBucket); if (pbRespForBucket.hasErrorCode()) { // TODO for re-triable error, we should retry here instead of throwing exception. ApiError error = ApiError.fromErrorMessage(pbRespForBucket); @@ -328,10 +325,10 @@ private void handlePrefixLookupResponse( error.formatErrMsg()); prefixLookupBatch.completeExceptionally(error.exception()); } else { - List> result = new ArrayList<>(); + List> result = new ArrayList<>(pbRespForBucket.getValueListsCount()); for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) { PbValueList pbValueList = pbRespForBucket.getValueListAt(i); - List keyResult = new ArrayList<>(); + List keyResult = new ArrayList<>(pbValueList.getValuesCount()); for (int j = 0; j < pbValueList.getValuesCount(); j++) { keyResult.add(pbValueList.getValueAt(j)); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java index 289623ee..a9f1050d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookup.java @@ -28,20 +28,13 @@ */ @Internal public class PrefixLookup extends AbstractLookup> { - private final TableBucket tableBucket; private final CompletableFuture> future; - public PrefixLookup(TableBucket tableBucket, byte[] prefixKey) { - super(prefixKey); - this.tableBucket = tableBucket; + PrefixLookup(TableBucket tableBucket, byte[] prefixKey) { + super(tableBucket, prefixKey); this.future = new CompletableFuture<>(); } - @Override - public TableBucket tableBucket() { - return tableBucket; - } - @Override public CompletableFuture> future() { return future; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java index 20188ff5..6c973681 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupBatch.java @@ -58,7 +58,7 @@ public void complete(List> values) { new FlussRuntimeException( String.format( "The number of values return by prefix lookup request is not equal to the number of " - + "index lookups send. Got %d values, but expected %d.", + + "prefix lookups send. Got %d values, but expected %d.", values.size(), prefixLookups.size()))); } else { for (int i = 0; i < values.size(); i++) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java index a9ff3044..3a6e3186 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/PrefixLookupResult.java @@ -24,7 +24,7 @@ /** * The result of {@link Table#prefixLookup(InternalRow)}}. * - * @since 0.1 + * @since 0.6 */ public class PrefixLookupResult { private final List rowList; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 385e20e1..779fb63e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -187,14 +187,14 @@ public CompletableFuture lookup(InternalRow key) { } // encoding the key row using a compacted way consisted with how the key is encoded when put // a row - byte[] keyBytes = primaryKeyEncoder.encode(key); - byte[] lookupBucketKeyBytes = bucketKeyEncoder.encode(key); + byte[] pkBytes = primaryKeyEncoder.encode(key); + byte[] bkBytes = bucketKeyEncoder.encode(key); Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key); - int bucketId = getBucketId(lookupBucketKeyBytes, key); + int bucketId = getBucketId(bkBytes, key); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); return lookupClientSupplier .get() - .lookup(tableBucket, keyBytes) + .lookup(tableBucket, pkBytes) .thenApply( valueBytes -> { InternalRow row = @@ -206,14 +206,15 @@ public CompletableFuture lookup(InternalRow key) { } @Override - public CompletableFuture prefixLookup(InternalRow prefixKey) { + public CompletableFuture prefixLookup(InternalRow bucketKey) { if (!hasPrimaryKey) { throw new FlussRuntimeException( String.format("None-pk table %s don't support prefix lookup", tablePath)); } + // TODO: add checks the bucket key is prefix of primary key - byte[] prefixKeyBytes = bucketKeyEncoder.encode(prefixKey); - int bucketId = getBucketId(prefixKeyBytes, prefixKey); + byte[] prefixKeyBytes = bucketKeyEncoder.encode(bucketKey); + int bucketId = getBucketId(prefixKeyBytes, bucketKey); return lookupClientSupplier .get() .prefixLookup(tableId, bucketId, prefixKeyBytes) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index 0b4d6547..62fd8ef1 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -78,10 +78,13 @@ public interface Table extends AutoCloseable { * the prefix schema would also be [a, b]. This pattern can use PrefixLookup to lookup by prefix * scan. * - * @param prefixKey the given prefix key to do prefix lookup. + *

TODO: currently, the interface only support bucket key as the prefix key to lookup. + * Generalize the prefix lookup to support any prefix key including bucket key. + * + * @param bucketKey the given bucket key to do prefix lookup. * @return the result of prefix lookup. */ - CompletableFuture prefixLookup(InternalRow prefixKey); + CompletableFuture prefixLookup(InternalRow bucketKey); /** * Extracts limit number of rows from the given table bucket. diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index 5f8c5aef..fe84f76e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -41,8 +41,9 @@ @PublicEvolving public class UpsertWriter extends TableWriter { - private final KeyEncoder keyEncoder; - private final KeyEncoder bucketKeyEncoder; + private final KeyEncoder primaryKeyEncoder; + // null if the bucket key is the same to the primary key + private final @Nullable KeyEncoder bucketKeyEncoder; private final @Nullable int[] targetColumns; public UpsertWriter( @@ -58,17 +59,17 @@ public UpsertWriter( RowType rowType = schema.toRowType(); this.targetColumns = upsertWrite.getPartialUpdateColumns(); - this.keyEncoder = + this.primaryKeyEncoder = KeyEncoder.createKeyEncoder( rowType, schema.getPrimaryKey().get().getColumnNames(), tableDescriptor.getPartitionKeys()); - int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); - if (bucketKeyIndexes.length != 0) { - this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes); + if (tableDescriptor.isDefaultBucketKey()) { + this.bucketKeyEncoder = null; } else { - this.bucketKeyEncoder = keyEncoder; + int[] bucketKeyIndexes = tableDescriptor.getBucketKeyIndexes(); + this.bucketKeyEncoder = new KeyEncoder(rowType, bucketKeyIndexes); } } @@ -119,8 +120,8 @@ private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { * @return A {@link CompletableFuture} that always returns null when complete normally. */ public CompletableFuture upsert(InternalRow row) { - byte[] key = keyEncoder.encode(row); - byte[] bucketKey = bucketKeyEncoder.encode(row); + byte[] key = primaryKeyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder != null ? bucketKeyEncoder.encode(row) : key; return send( new WriteRecord( getPhysicalPath(row), WriteKind.PUT, key, bucketKey, row, targetColumns)); @@ -134,8 +135,8 @@ public CompletableFuture upsert(InternalRow row) { * @return A {@link CompletableFuture} that always returns null when complete normally. */ public CompletableFuture delete(InternalRow row) { - byte[] key = keyEncoder.encode(row); - byte[] bucketKey = bucketKeyEncoder.encode(row); + byte[] key = primaryKeyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder != null ? bucketKeyEncoder.encode(row) : key; return send( new WriteRecord( getPhysicalPath(row), diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java new file mode 100644 index 00000000..93fc48ee --- /dev/null +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/lookup/LookupQueueTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.TableBucket; + +import org.junit.jupiter.api.Test; + +import static com.alibaba.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT; +import static com.alibaba.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LookupQueue}. */ +class LookupQueueTest { + + @Test + void testDrainMaxBatchSize() throws Exception { + Configuration conf = new Configuration(); + conf.set(CLIENT_LOOKUP_MAX_BATCH_SIZE, 10); + conf.setString(CLIENT_LOOKUP_BATCH_TIMEOUT.key(), "1ms"); + LookupQueue queue = new LookupQueue(conf); + + // drain empty + assertThat(queue.drain()).hasSize(0); + + appendLookups(queue, 1); + assertThat(queue.drain()).hasSize(1); + assertThat(queue.hasUnDrained()).isFalse(); + + appendLookups(queue, 9); + assertThat(queue.drain()).hasSize(9); + assertThat(queue.hasUnDrained()).isFalse(); + + appendLookups(queue, 10); + assertThat(queue.drain()).hasSize(10); + assertThat(queue.hasUnDrained()).isFalse(); + + appendLookups(queue, 20); + assertThat(queue.drain()).hasSize(10); + assertThat(queue.hasUnDrained()).isTrue(); + assertThat(queue.drainAll()).hasSize(10); + assertThat(queue.hasUnDrained()).isFalse(); + } + + private static void appendLookups(LookupQueue queue, int count) { + for (int i = 0; i < count; i++) { + queue.appendLookup(new Lookup(new TableBucket(1, 1), new byte[] {0})); + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index ec6c7a44..7c1df3db 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -151,12 +151,16 @@ public Schema getSchema() { return schema; } + /** Returns the bucket key of the table, empty if no bucket key is set. */ public List getBucketKey() { return this.getTableDistribution() .map(TableDescriptor.TableDistribution::getBucketKeys) .orElse(Collections.emptyList()); } + /** + * Returns the indexes of the bucket key fields in the schema, empty if no bucket key is set. + */ public int[] getBucketKeyIndexes() { List bucketKey = getBucketKey(); RowType rowType = schema.toRowType(); @@ -167,6 +171,22 @@ public int[] getBucketKeyIndexes() { return bucketKeyIndex; } + /** + * Check if the table is using a default bucket key. A default bucket key is: + * + *

    + *
  • the same as the primary keys excluding the partition keys. + *
  • empty if the table is not a primary key table. + *
+ */ + public boolean isDefaultBucketKey() { + if (schema.getPrimaryKey().isPresent()) { + return getBucketKey().equals(defaultBucketKeyOfPrimaryKeyTable(schema, partitionKeys)); + } else { + return getBucketKey().isEmpty(); + } + } + /** * Check if the table is partitioned or not. * @@ -371,27 +391,16 @@ private static TableDistribution normalizeDistribution( originDistribution.getBucketCount().orElse(null), defaultBucketKeyOfPrimaryKeyTable(schema, partitionKeys)); } else { - // For the primary key table match prefix lookup pattern, we allow the bucket - // keys different from primary keys. - if (!bucketKeysMatchPrefixLookupPattern(schema, bucketKeys)) { - // check the provided bucket key and expected bucket key - List expectedBucketKeys = - defaultBucketKeyOfPrimaryKeyTable(schema, partitionKeys); - List pkColumns = schema.getPrimaryKey().get().getColumnNames(); - - if (expectedBucketKeys.size() != bucketKeys.size() - || !new HashSet<>(expectedBucketKeys).containsAll(bucketKeys)) { - throw new IllegalArgumentException( - String.format( - "Currently, bucket keys must be equal to primary keys excluding partition " - + "keys for primary-key tables. The primary keys are %s, the " - + "partition keys are %s, the expected bucket keys are %s, but " - + "the user-defined bucket keys are %s.", - pkColumns, - partitionKeys, - expectedBucketKeys, - bucketKeys)); - } + // check the provided bucket key + List pkColumns = schema.getPrimaryKey().get().getColumnNames(); + if (!new HashSet<>(pkColumns).containsAll(bucketKeys)) { + throw new IllegalArgumentException( + String.format( + "Bucket keys must be a subset of primary keys excluding partition " + + "keys for primary-key tables. The primary keys are %s, the " + + "partition keys are %s, but " + + "the user-defined bucket keys are %s.", + pkColumns, partitionKeys, bucketKeys)); } return new TableDistribution( originDistribution.getBucketCount().orElse(null), bucketKeys); @@ -411,40 +420,6 @@ private static TableDistribution normalizeDistribution( } } - /** - * Check if the table supports prefix lookup. Currently, only pk-table are supported, and the - * bucket key needs to be part of the primary key and must be a prefix of the primary key. For - * example, if a table has fields [a,b,c,d], and the primary key is set to [a, b, c], with the - * bucket key set to [a, b]. - * - * @return true if the table supports prefix lookup; otherwise, false - */ - public static boolean bucketKeysMatchPrefixLookupPattern( - Schema schema, List bucketKeys) { - if (!schema.getPrimaryKey().isPresent()) { - return false; - } - - RowType rowType = schema.toRowType(); - int[] pkIndexes = schema.getPrimaryKeyIndexes(); - int[] bucketKeyIndexes = new int[bucketKeys.size()]; - for (int i = 0; i < bucketKeys.size(); i++) { - bucketKeyIndexes[i] = rowType.getFieldIndex(bucketKeys.get(i)); - } - - if (bucketKeyIndexes.length >= pkIndexes.length) { - return false; - } else { - for (int i = 0; i < bucketKeyIndexes.length; i++) { - if (bucketKeyIndexes[i] != pkIndexes[i]) { - return false; - } - } - - return true; - } - } - /** The default bucket key of primary key table is the primary key excluding partition keys. */ private static List defaultBucketKeyOfPrimaryKeyTable( Schema schema, List partitionKeys) { diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrayUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrayUtils.java index 9f4ab1be..3a65620c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrayUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrayUtils.java @@ -34,4 +34,93 @@ public static String[] concat(String[] array1, String[] array2) { System.arraycopy(array2, 0, resultArray, array1.length, array2.length); return resultArray; } + + public static int[] concat(int[] array1, int[] array2) { + if (array1.length == 0) { + return array2; + } + if (array2.length == 0) { + return array1; + } + int[] resultArray = new int[array1.length + array2.length]; + System.arraycopy(array1, 0, resultArray, 0, array1.length); + System.arraycopy(array2, 0, resultArray, array1.length, array2.length); + return resultArray; + } + + /** Check if the second array is a subset of the first array. */ + public static boolean isSubset(int[] a, int[] b) { + // Iterate over each element in the second array + for (int elementB : b) { + boolean found = false; + // Check if the element exists in the first array + for (int elementA : a) { + if (elementB == elementA) { + found = true; + break; + } + } + // If any element is not found, return false + if (!found) { + return false; + } + } + // If all elements are found, return true + return true; + } + + /** + * Remove the elements of the second array from the first array. + * + * @throws IllegalArgumentException if the element of the second array is not found in the first + * array. + */ + public static int[] removeSet(int[] a, int[] b) { + // Iterate over each element in the second array + // and check if the element exists in the first array + if (!isSubset(a, b)) { + throw new IllegalArgumentException("Element not found in the first array"); + } + // Remove the elements of the second array from the first array + int[] resultArray = new int[a.length - b.length]; + int index = 0; + for (int elementA : a) { + boolean found = false; + for (int elementB : b) { + if (elementA == elementB) { + found = true; + break; + } + } + if (!found) { + resultArray[index] = elementA; + index++; + } + } + return resultArray; + } + + /** + * Returns a new array that contains the intersection of the two arrays and in the order of the + * first array. + * + * @throws IllegalArgumentException if the element of the second array is not found in the first + * array. + */ + public static int[] intersection(int[] a, int[] b) { + // Remove the elements from the first array that not exist in the second array + return removeSet(a, removeSet(a, b)); + } + + /** Check if the second array is a prefix of the first array. */ + public static boolean isPrefix(int[] a, int[] b) { + // Iterate over each element in the second array + for (int i = 0; i < b.length; i++) { + // Check if the element exists in the first array + if (a[i] != b[i]) { + return false; + } + } + return true; + } } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java index fed0d8ce..478d9c51 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/metadata/TableDescriptorTest.java @@ -91,15 +91,44 @@ void testProperties() { @Test void testDistribution() { - final TableDescriptor descriptor = + TableDescriptor descriptor = TableDescriptor.builder().schema(SCHEMA_1).distributedBy(10, "f0", "f3").build(); - Optional distribution = descriptor.getTableDistribution(); assertThat(distribution).isPresent(); assertThat(distribution.get().getBucketCount()).hasValue(10); assertThat(distribution.get().getBucketKeys()).hasSize(2); assertThat(distribution.get().getBucketKeys().get(0)).isEqualTo("f0", "f3"); + assertThat(descriptor.isDefaultBucketKey()).isTrue(); + + // a subset of primary key + descriptor = TableDescriptor.builder().schema(SCHEMA_1).distributedBy(10, "f3").build(); + distribution = descriptor.getTableDistribution(); + assertThat(distribution).isPresent(); + assertThat(distribution.get().getBucketCount()).hasValue(10); + assertThat(distribution.get().getBucketKeys()).isEqualTo(Collections.singletonList("f3")); + assertThat(descriptor.isDefaultBucketKey()).isFalse(); + + // default bucket key for partitioned table + descriptor = TableDescriptor.builder().schema(SCHEMA_1).partitionedBy("f0").build(); + distribution = descriptor.getTableDistribution(); + assertThat(distribution).isPresent(); + assertThat(distribution.get().getBucketCount()).isEmpty(); + assertThat(distribution.get().getBucketKeys()).isEqualTo(Collections.singletonList("f3")); + assertThat(descriptor.isDefaultBucketKey()).isTrue(); + + // test subset of primary key for partitioned table + descriptor = + TableDescriptor.builder() + .schema(SCHEMA_1) + .partitionedBy("f0") + .distributedBy(10, "f3") + .build(); + distribution = descriptor.getTableDistribution(); + assertThat(distribution).isPresent(); + assertThat(distribution.get().getBucketCount()).hasValue(10); + assertThat(distribution.get().getBucketKeys()).isEqualTo(Collections.singletonList("f3")); + assertThat(descriptor.isDefaultBucketKey()).isTrue(); } @Test @@ -116,6 +145,7 @@ void testSchemaWithoutPrimaryKeyAndDistributionWithEmptyBucketKeys() { assertThat(distribution).isPresent(); assertThat(distribution.get().getBucketCount()).hasValue(12); assertThat(distribution.get().getBucketKeys()).hasSize(0); + assertThat(descriptor.isDefaultBucketKey()).isTrue(); } @Test @@ -128,9 +158,9 @@ void testPrimaryKeyDifferentWithBucketKeys() { .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Currently, bucket keys must be equal to primary keys excluding partition keys for primary-key tables. " + "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. " + "The primary keys are [f0, f3], the partition keys are [], " - + "the expected bucket keys are [f0, f3], but the user-defined bucket keys are [f1]."); + + "but the user-defined bucket keys are [f1]."); assertThatThrownBy( () -> @@ -140,25 +170,21 @@ void testPrimaryKeyDifferentWithBucketKeys() { .build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Currently, bucket keys must be equal to primary keys excluding partition keys for primary-key tables. " + "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. " + "The primary keys are [f0, f3], the partition keys are [], " - + "the expected bucket keys are [f0, f3], but the user-defined bucket keys are [f0, f1]."); + + "but the user-defined bucket keys are [f0, f1]."); - // bucket key is the subset of primary key. This pattern can be support for prefixLookup. - Schema schema0 = - Schema.newBuilder() - .column("f0", DataTypes.STRING()) - .column("f1", DataTypes.BIGINT()) - .primaryKey("f0", "f1") - .build(); - TableDescriptor tableDescriptor = - TableDescriptor.builder().schema(schema0).distributedBy(12, "f0").build(); - assertThat(tableDescriptor.getBucketKey()).containsExactlyInAnyOrder("f0"); - assertThat(tableDescriptor.getBucketKeyIndexes()).isEqualTo(new int[] {0}); - assertThat( - TableDescriptor.bucketKeysMatchPrefixLookupPattern( - schema0, Collections.singletonList("f0"))) - .isTrue(); + // bucket key shouldn't include partition key + assertThatThrownBy( + () -> + TableDescriptor.builder() + .schema(SCHEMA_1) + .partitionedBy("f0") + .distributedBy(3, "f0", "f3") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Bucket key [f0, f3] shouldn't include any column in partition keys [f0]."); } @Test diff --git a/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrayUtilsTest.java b/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrayUtilsTest.java index c48066e6..956884f5 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrayUtilsTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrayUtilsTest.java @@ -19,12 +19,13 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link com.alibaba.fluss.utils.ArrayUtils}. */ public class ArrayUtilsTest { @Test - void concatWithEmptyArray() { + void testConcatWithEmptyArray() { String[] emptyArray = new String[] {}; String[] nonEmptyArray = new String[] {"some value"}; @@ -34,7 +35,7 @@ void concatWithEmptyArray() { } @Test - void concatArrays() { + void testConcatArrays() { String[] array1 = new String[] {"A", "B", "C", "D", "E", "F", "G"}; String[] array2 = new String[] {"1", "2", "3"}; @@ -44,4 +45,82 @@ void concatArrays() { assertThat(ArrayUtils.concat(array2, array1)) .isEqualTo(new String[] {"1", "2", "3", "A", "B", "C", "D", "E", "F", "G"}); } + + @Test + void testConcatIntWithEmptyArray() { + int[] emptyArray = new int[] {}; + int[] nonEmptyArray = new int[] {1}; + + assertThat(ArrayUtils.concat(emptyArray, nonEmptyArray)).isSameAs(nonEmptyArray); + + assertThat(ArrayUtils.concat(nonEmptyArray, emptyArray)).isSameAs(nonEmptyArray); + } + + @Test + void testConcatIntArrays() { + int[] array1 = new int[] {1, 2, 3, 4, 5, 6, 7}; + int[] array2 = new int[] {8, 9, 10}; + + assertThat(ArrayUtils.concat(array1, array2)) + .isEqualTo(new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + + assertThat(ArrayUtils.concat(array2, array1)) + .isEqualTo(new int[] {8, 9, 10, 1, 2, 3, 4, 5, 6, 7}); + } + + @Test + void testIsSubset() { + int[] a = new int[] {1, 2, 3, 4, 5}; + int[] b = new int[] {1, 3, 5}; + int[] c = new int[] {1, 3, 6}; + int[] d = new int[] {}; + + assertThat(ArrayUtils.isSubset(a, b)).isTrue(); + assertThat(ArrayUtils.isSubset(a, c)).isFalse(); + assertThat(ArrayUtils.isSubset(a, d)).isTrue(); + } + + @Test + void testRemoveSet() { + int[] a = new int[] {1, 2, 3, 4, 5}; + int[] b = new int[] {1, 3, 5}; + int[] c = new int[] {1, 3, 6}; + int[] d = new int[] {}; + + assertThat(ArrayUtils.removeSet(a, b)).isEqualTo(new int[] {2, 4}); + assertThat(ArrayUtils.removeSet(a, d)).isEqualTo(new int[] {1, 2, 3, 4, 5}); + assertThatThrownBy(() -> ArrayUtils.removeSet(a, c)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Element not found in the first array"); + } + + @Test + void testIntersection() { + int[] a = new int[] {1, 2, 3, 4, 5}; + int[] b = new int[] {1, 3, 5}; + int[] c = new int[] {5, 3, 1}; + int[] d = new int[] {}; + int[] e = new int[] {1, 3, 6}; + + assertThat(ArrayUtils.intersection(a, b)).isEqualTo(new int[] {1, 3, 5}); + assertThat(ArrayUtils.intersection(a, c)).isEqualTo(new int[] {1, 3, 5}); + assertThat(ArrayUtils.intersection(a, d)).isEqualTo(new int[] {}); + assertThatThrownBy(() -> ArrayUtils.intersection(a, e)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Element not found in the first array"); + } + + @Test + void testIsPrefix() { + int[] a = new int[] {1, 2, 3, 4, 5}; + int[] b = new int[] {1, 2, 3}; + int[] c = new int[] {1, 3, 5}; + int[] d = new int[] {2, 3}; + int[] e = new int[] {}; + + assertThat(ArrayUtils.isPrefix(a, b)).isTrue(); + assertThat(ArrayUtils.isPrefix(a, c)).isFalse(); + assertThat(ArrayUtils.isPrefix(a, d)).isFalse(); + assertThat(ArrayUtils.isPrefix(a, e)).isTrue(); + } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 62baf248..b719918b 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -90,12 +90,16 @@ public DynamicTableSource createDynamicTableSource(Context context) { ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema(); ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes(); + int[] partitionKeyIndexes = + resolvedCatalogTable.getPartitionKeys().stream() + .mapToInt(tableOutputType::getFieldIndex) + .toArray(); + int[] bucketKeyIndexes = + FlinkConnectorOptionsUtils.getBucketKeyIndexes(tableOptions, tableOutputType); // options for lookup LookupCache cache = null; - LookupOptions.LookupCacheType lookupCacheType = tableOptions.get(LookupOptions.CACHE_TYPE); - if (lookupCacheType.equals(LookupOptions.LookupCacheType.PARTIAL)) { cache = DefaultLookupCache.fromConfig(tableOptions); } else if (lookupCacheType.equals(LookupOptions.LookupCacheType.FULL)) { @@ -105,24 +109,31 @@ public DynamicTableSource createDynamicTableSource(Context context) { throw new UnsupportedOperationException("Full lookup caching is not supported yet."); } + // other option values + long partitionDiscoveryIntervalMs = + tableOptions + .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) + .toMillis(); + boolean isDatalakeEnabled = + tableOptions.get( + key(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) + .booleanType() + .defaultValue(false)); + return new FlinkTableSource( toFlussTablePath(context.getObjectIdentifier()), toFlussClientConfig(helper.getOptions(), context.getConfiguration()), tableOutputType, primaryKeyIndexes, - resolvedCatalogTable.getPartitionKeys(), + bucketKeyIndexes, + partitionKeyIndexes, isStreamingMode, startupOptions, tableOptions.get(LookupOptions.MAX_RETRIES), tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC), cache, - tableOptions - .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) - .toMillis(), - tableOptions.get( - key(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) - .booleanType() - .defaultValue(false))); + partitionDiscoveryIntervalMs, + isDatalakeEnabled); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 53250681..939d872c 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -88,9 +88,12 @@ public class FlinkTableSource private final Configuration flussConfig; // output type before projection pushdown private final org.apache.flink.table.types.logical.RowType tableOutputType; - // will be empty if no pk + // will be empty if no primary key private final int[] primaryKeyIndexes; - private final List partitionKeys; + // will be empty if no bucket key + private final int[] bucketKeyIndexes; + // will be empty if no partition key + private final int[] partitionKeyIndexes; private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; @@ -123,7 +126,8 @@ public FlinkTableSource( Configuration flussConfig, org.apache.flink.table.types.logical.RowType tableOutputType, int[] primaryKeyIndexes, - List partitionKeys, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, boolean streaming, FlinkConnectorOptionsUtils.StartupOptions startupOptions, int lookupMaxRetryTimes, @@ -136,7 +140,8 @@ public FlinkTableSource( this.tableOutputType = tableOutputType; this.producedDataType = tableOutputType; this.primaryKeyIndexes = primaryKeyIndexes; - this.partitionKeys = partitionKeys; + this.bucketKeyIndexes = bucketKeyIndexes; + this.partitionKeyIndexes = partitionKeyIndexes; this.streaming = streaming; this.startupOptions = checkNotNull(startupOptions, "startupOptions must not be null"); @@ -167,6 +172,10 @@ private boolean hasPrimaryKey() { return primaryKeyIndexes.length > 0; } + private boolean isPartitioned() { + return partitionKeyIndexes.length > 0; + } + @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { // handle single row filter scan @@ -239,7 +248,7 @@ public boolean isBounded() { flussConfig, tablePath, hasPrimaryKey(), - !partitionKeys.isEmpty(), + isPartitioned(), flussRowType, projectedFields, offsetsInitializer, @@ -281,16 +290,16 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { LookupNormalizer.validateAndCreateLookupNormalizer( context.getKeys(), primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, tableOutputType, - tablePath, - flussConfig); + projectedFields); if (lookupAsync) { AsyncLookupFunction asyncLookupFunction = new FlinkAsyncLookupFunction( flussConfig, tablePath, tableOutputType, - lookupNormalizer.getLookupKeyIndexes(), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -305,7 +314,6 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { flussConfig, tablePath, tableOutputType, - lookupNormalizer.getLookupKeyIndexes(), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -325,7 +333,8 @@ public DynamicTableSource copy() { flussConfig, tableOutputType, primaryKeyIndexes, - partitionKeys, + bucketKeyIndexes, + partitionKeyIndexes, streaming, startupOptions, lookupMaxRetryTimes, diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index 0c48a799..ae53093d 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -59,7 +59,6 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; private final LookupType flussLookupType; @@ -73,7 +72,6 @@ public FlinkAsyncLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -81,10 +79,9 @@ public FlinkAsyncLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; - this.flussLookupType = lookupNormalizer.getFlussLookupType(); + this.flussLookupType = lookupNormalizer.getLookupType(); } @Override @@ -93,6 +90,7 @@ public void open(FunctionContext context) { connection = ConnectionFactory.createConnection(flussConfig); table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization + int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), @@ -153,7 +151,7 @@ private void fetchResult( handleLookupSuccess(resultFuture, result, remainingFilter); } }); - } else if (flussLookupType == LookupType.PREFIX_LOOKUP) { + } else { table.prefixLookup(keyRow) .whenComplete( (result, throwable) -> { @@ -169,11 +167,6 @@ private void fetchResult( resultFuture, result, remainingFilter); } }); - } else { - resultFuture.completeExceptionally( - new UnsupportedOperationException( - "Unsupported Fluss lookup type. Currently, Fluss only " - + "support lookup by primary keys or prefix lookup by bucket keys.")); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 99bb3ced..20788e53 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -53,7 +53,6 @@ public class FlinkLookupFunction extends LookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; private final LookupType flussLookupType; @@ -68,7 +67,6 @@ public FlinkLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -76,10 +74,9 @@ public FlinkLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; - this.flussLookupType = lookupNormalizer.getFlussLookupType(); + this.flussLookupType = lookupNormalizer.getLookupType(); } @Override @@ -88,6 +85,7 @@ public void open(FunctionContext context) { connection = ConnectionFactory.createConnection(flussConfig); table = connection.getTable(tablePath); // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization + int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java index 199c684a..e720eaf1 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java @@ -16,13 +16,8 @@ package com.alibaba.fluss.connector.flink.source.lookup; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.lookup.LookupType; -import com.alibaba.fluss.client.table.Table; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.TableDescriptor; -import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.utils.ArrayUtils; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.GenericRowData; @@ -36,9 +31,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; import static com.alibaba.fluss.utils.Preconditions.checkState; @@ -58,9 +53,6 @@ public class LookupNormalizer implements Serializable { private static final long serialVersionUID = 1L; - public static final LookupNormalizer NOOP_NORMALIZER = - new LookupNormalizer(LookupType.LOOKUP, new int[0], null, null, null); - /** Mapping from normalized key index to the lookup key index (in the lookup row). */ @Nullable private final FieldGetter[] normalizedKeyGetters; @@ -70,16 +62,21 @@ public class LookupNormalizer implements Serializable { /** The field getter to get the remaining condition result from the lookup result row. */ @Nullable private final FieldGetter[] resultFieldGetters; - private final LookupType flussLookupType; + /** + * The lookup request type (primary key lookup or prefix lookup) requested from Flink to Fluss. + */ + private final LookupType lookupType; + + /** The indexes of the lookup keys in the lookup key row. */ private final int[] lookupKeyIndexes; private LookupNormalizer( - LookupType flussLookupType, + LookupType lookupType, int[] lookupKeyIndexes, @Nullable FieldGetter[] normalizedKeyGetters, @Nullable FieldGetter[] conditionFieldGetters, @Nullable FieldGetter[] resultFieldGetters) { - this.flussLookupType = flussLookupType; + this.lookupType = lookupType; this.lookupKeyIndexes = lookupKeyIndexes; this.normalizedKeyGetters = normalizedKeyGetters; this.conditionFieldGetters = conditionFieldGetters; @@ -92,14 +89,20 @@ private LookupNormalizer( } } - public LookupType getFlussLookupType() { - return flussLookupType; + /** + * Returns the lookup type (primary key lookup, or prefix key lookup) requested from Flink to + * Fluss. + */ + public LookupType getLookupType() { + return lookupType; } + /** Returns the indexes of the normalized lookup keys. */ public int[] getLookupKeyIndexes() { return lookupKeyIndexes; } + /** Normalize the lookup key row to match the request key and the key fields order. */ public RowData normalizeLookupKey(RowData lookupKey) { if (normalizedKeyGetters == null) { return lookupKey; @@ -163,108 +166,173 @@ public boolean fieldMatches(RowData result) { // -------------------------------------------------------------------------------------------- - /** Validate the lookup key indexes and primary keys, and create a {@link LookupNormalizer}. */ + /** Create a {@link LookupNormalizer} for primary key lookup. */ + public static LookupNormalizer createPrimaryKeyLookupNormalizer( + int[] primaryKeys, RowType schema) { + List primaryKeyNames = fieldNames(primaryKeys, schema); + return createLookupNormalizer( + primaryKeyNames, primaryKeyNames, primaryKeys, schema, LookupType.LOOKUP); + } + + /** + * Validate the lookup key indexes and primary keys, and create a {@link LookupNormalizer}. + * + * @param lookupKeyIndexes the indexes of the lookup keys in the table row + * @param primaryKeys the indexes of the primary keys of the table + * @param bucketKeys the indexes of the bucket keys of the table, must be a part of primary keys + * @param partitionKeys the indexes of the partition keys of the table, maybe empty if the table + * is not partitioned + * @param schema the schema of the table + */ public static LookupNormalizer validateAndCreateLookupNormalizer( int[][] lookupKeyIndexes, int[] primaryKeys, + int[] bucketKeys, + int[] partitionKeys, RowType schema, - TablePath tablePath, - Configuration flussConfig) { - int[] bucketKeys; - boolean supportPrefixLookup; - try (Connection connection = ConnectionFactory.createConnection(flussConfig); - Table table = connection.getTable(tablePath)) { - TableDescriptor descriptor = table.getDescriptor(); - bucketKeys = descriptor.getBucketKeyIndexes(); - supportPrefixLookup = - TableDescriptor.bucketKeysMatchPrefixLookupPattern( - descriptor.getSchema(), descriptor.getBucketKey()); - } catch (Exception e) { - throw new TableException( - "Failed execute validate and create lookup normalizer operation on Fluss table.", - e); - } - - if (primaryKeys.length == 0 || bucketKeys.length == 0) { + @Nullable int[] projectedFields) { + if (primaryKeys.length == 0) { throw new UnsupportedOperationException( - "Fluss lookup function only support lookup table with primary key or prefix lookup with bucket key."); + "Fluss lookup function only support lookup table with primary key."); + } + // bucket keys must not be empty + if (bucketKeys.length == 0 || !ArrayUtils.isSubset(primaryKeys, bucketKeys)) { + throw new IllegalArgumentException( + "Illegal bucket keys: " + + Arrays.toString(bucketKeys) + + ", must be a part of primary keys: " + + Arrays.toString(primaryKeys)); + } + // partition keys can be empty + if (partitionKeys.length != 0 && !ArrayUtils.isSubset(primaryKeys, partitionKeys)) { + throw new IllegalArgumentException( + "Illegal partition keys: " + + Arrays.toString(partitionKeys) + + ", must be a part of primary keys: " + + Arrays.toString(primaryKeys)); } + int[] lookupKeysBeforeProjection = new int[lookupKeyIndexes.length]; int[] lookupKeys = new int[lookupKeyIndexes.length]; - for (int i = 0; i < lookupKeys.length; i++) { + for (int i = 0; i < lookupKeysBeforeProjection.length; i++) { int[] innerKeyArr = lookupKeyIndexes[i]; checkArgument(innerKeyArr.length == 1, "Do not support nested lookup keys"); // lookupKeyIndexes passed by Flink is key indexed after projection pushdown, - // we do remaining condition filter on the projected row, so no remapping needed. + // we restore the lookup key indexes before pushdown to easier compare with primary + // keys. + if (projectedFields != null) { + lookupKeysBeforeProjection[i] = projectedFields[innerKeyArr[0]]; + } else { + lookupKeysBeforeProjection[i] = innerKeyArr[0]; + } lookupKeys[i] = innerKeyArr[0]; } + List lookupKeyNames = fieldNames(lookupKeysBeforeProjection, schema); + List primaryKeyNames = fieldNames(primaryKeys, schema); - if (supportPrefixLookup && lookupKeys.length == bucketKeys.length) { - // bucket key prefix lookup. - return createLookupNormalizer(lookupKeys, bucketKeys, schema, LookupType.PREFIX_LOOKUP); + if (new HashSet<>(lookupKeyNames).containsAll(primaryKeyNames)) { + // primary key lookup. + return createLookupNormalizer( + lookupKeyNames, primaryKeyNames, lookupKeys, schema, LookupType.LOOKUP); } else { - // Primary key lookup. - return createLookupNormalizer(lookupKeys, primaryKeys, schema, LookupType.LOOKUP); + // the encoding primary key is the primary key without partition keys. + int[] encodedPrimaryKeys = ArrayUtils.removeSet(primaryKeys, partitionKeys); + // the table support prefix lookup iff the bucket key is a prefix of the encoding pk + boolean supportPrefixLookup = ArrayUtils.isPrefix(encodedPrimaryKeys, bucketKeys); + if (supportPrefixLookup) { + // try to create prefix lookup normalizer + // TODO: support prefix lookup with arbitrary part of prefix of primary key + int[] expectedLookupKeys = + ArrayUtils.intersection( + primaryKeys, ArrayUtils.concat(bucketKeys, partitionKeys)); + return createLookupNormalizer( + lookupKeyNames, + fieldNames(expectedLookupKeys, schema), + lookupKeys, + schema, + LookupType.PREFIX_LOOKUP); + } else { + // throw exception for tables that doesn't support prefix lookup + throw new TableException( + "The Fluss lookup function supports lookup tables where the lookup keys include all primary keys, the primary keys are " + + primaryKeyNames + + ", but the lookup keys are " + + lookupKeyNames); + } } } - /** create a {@link LookupNormalizer}. */ + /** + * Create a {@link LookupNormalizer}. + * + *

Note: We compare string names rather than int index for better error message and + * readability, the length of lookup key and keys (primary key or index key) shouldn't be large, + * so the overhead is low. + * + * @param originalLookupKeys the original lookup keys that is determined by Flink. + * @param expectedLookupKeys the expected lookup keys to lookup Fluss. + */ private static LookupNormalizer createLookupNormalizer( - int[] lookupKeys, int[] keys, RowType schema, LookupType flussLookupType) { - // we compare string names rather than int index for better error message and readability, - // the length of lookup key and keys (primary key or index key) shouldn't be large, so the - // overhead is low. - String[] columnNames = schema.getFieldNames().toArray(new String[0]); - String[] keyNames = - Arrays.stream(keys).mapToObj(i -> columnNames[i]).toArray(String[]::new); - - // get the lookup keys - String[] lookupKeyNames = new String[lookupKeys.length]; - for (int i = 0; i < lookupKeyNames.length; i++) { - lookupKeyNames[i] = columnNames[lookupKeys[i]]; - } - - if (Arrays.equals(lookupKeys, keys)) { - return new LookupNormalizer(flussLookupType, keys, null, null, null); + List originalLookupKeys, + List expectedLookupKeys, + int[] lookupKeyIndexes, + RowType schema, + LookupType lookupType) { + if (originalLookupKeys.equals(expectedLookupKeys)) { + int[] normalizedLookupKeys = fieldIndexes(expectedLookupKeys, schema); + return new LookupNormalizer(lookupType, normalizedLookupKeys, null, null, null); } - FieldGetter[] normalizedKeyGetters = new FieldGetter[keys.length]; - for (int i = 0; i < keyNames.length; i++) { - LogicalType fieldType = schema.getTypeAt(keys[i]); - int lookupKeyIndex = findIndex(lookupKeyNames, keyNames[i]); - normalizedKeyGetters[i] = RowData.createFieldGetter(fieldType, lookupKeyIndex); + FieldGetter[] normalizedKeyGetters = new FieldGetter[expectedLookupKeys.size()]; + for (int i = 0; i < expectedLookupKeys.size(); i++) { + String expectedKey = expectedLookupKeys.get(i); + LogicalType fieldType = schema.getTypeAt(schema.getFieldIndex(expectedKey)); + int idxInLookupKey = findIndex(originalLookupKeys, expectedKey); + normalizedKeyGetters[i] = RowData.createFieldGetter(fieldType, idxInLookupKey); } - Set keySet = Arrays.stream(keys).boxed().collect(Collectors.toSet()); List conditionFieldGetters = new ArrayList<>(); List resultFieldGetters = new ArrayList<>(); - for (int i = 0; i < lookupKeys.length; i++) { - if (!keySet.contains(i)) { - LogicalType fieldType = schema.getTypeAt(lookupKeys[i]); + for (int i = 0; i < originalLookupKeys.size(); i++) { + String originalKey = originalLookupKeys.get(i); + if (!expectedLookupKeys.contains(originalKey)) { + LogicalType fieldType = schema.getTypeAt(schema.getFieldIndex(originalKey)); + // get the condition field from the original lookup key row conditionFieldGetters.add(RowData.createFieldGetter(fieldType, i)); - resultFieldGetters.add(RowData.createFieldGetter(fieldType, lookupKeys[i])); + // get the result field from the lookup result row (projected) + resultFieldGetters.add(RowData.createFieldGetter(fieldType, lookupKeyIndexes[i])); } } return new LookupNormalizer( - flussLookupType, - keys, + lookupType, + fieldIndexes(expectedLookupKeys, schema), normalizedKeyGetters, conditionFieldGetters.toArray(new FieldGetter[0]), resultFieldGetters.toArray(new FieldGetter[0])); } - private static int findIndex(String[] columnNames, String key) { - for (int i = 0; i < columnNames.length; i++) { - if (columnNames[i].equals(key)) { + private static int findIndex(List columnNames, String key) { + for (int i = 0; i < columnNames.size(); i++) { + if (columnNames.get(i).equals(key)) { return i; } } throw new TableException( - "Fluss lookup function only supports lookup table with lookup keys contain all primary keys or bucket keys." - + " Can't find primary key or bucket key '" + "The Fluss lookup function supports lookup tables where the lookup keys include all primary keys or all bucket keys." + + " Can't find expected key '" + key + "' in lookup keys " - + Arrays.toString(columnNames)); + + columnNames); + } + + private static List fieldNames(int[] fieldIndexes, RowType schema) { + return Arrays.stream(fieldIndexes) + .mapToObj(i -> schema.getFields().get(i).getName()) + .collect(Collectors.toList()); + } + + private static int[] fieldIndexes(List fieldNames, RowType schema) { + return fieldNames.stream().mapToInt(schema::getFieldIndex).toArray(); } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java index 4a17ac9a..37b7a6f2 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java @@ -22,10 +22,12 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.types.logical.RowType; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Optional; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; @@ -61,6 +63,30 @@ public static StartupOptions getStartupOptions(ReadableConfig tableOptions, Zone return options; } + public static int[] getBucketKeyIndexes(ReadableConfig tableOptions, RowType schema) { + Optional bucketKey = tableOptions.getOptional(FlinkConnectorOptions.BUCKET_KEY); + if (!bucketKey.isPresent()) { + // log tables don't have bucket key by default + return new int[0]; + } + + String[] keys = bucketKey.get().split(","); + int[] indexes = new int[keys.length]; + for (int i = 0; i < keys.length; i++) { + int index = schema.getFieldIndex(keys[i].trim()); + if (index < 0) { + throw new ValidationException( + String.format( + "Field '%s' not found in the schema. Available fields are: %s", + keys[i].trim(), schema.getFieldNames())); + } + indexes[i] = index; + } + return indexes; + } + + // ---------------------------------------------------------------------------------------- + private static void validateScanStartupMode(ReadableConfig tableOptions) { ScanStartupMode scanStartupMode = tableOptions.get(SCAN_STARTUP_MODE); if (scanStartupMode == TIMESTAMP) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java index 841e551c..805197da 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java @@ -68,6 +68,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer; + /** Utilities for pushdown abilities. */ public class PushdownUtils { private static final int LIMIT_PUSH_DOWN_CEIL = 1024; @@ -255,13 +257,13 @@ public static Collection querySingleRow( int[] primaryKeyIndexes, int lookupMaxRetryTimes, @Nullable int[] projectedFields) { - LookupNormalizer lookupNormalizer = LookupNormalizer.NOOP_NORMALIZER; + LookupNormalizer lookupNormalizer = + createPrimaryKeyLookupNormalizer(primaryKeyIndexes, sourceOutputType); LookupFunction lookupFunction = new FlinkLookupFunction( flussConfig, tablePath, sourceOutputType, - primaryKeyIndexes, lookupMaxRetryTimes, lookupNormalizer, projectedFields); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java index 19845bf3..9fcbb692 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java @@ -647,7 +647,7 @@ private static Stream lookupArgs() { @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTable(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null, null); String dimJoinQuery = String.format( "SELECT a, c, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -660,13 +660,30 @@ void testLookup1PkTable(Caching caching, boolean async) throws Exception { assertResultsIgnoreOrder(collected, expected, true); } + @ParameterizedTest + @MethodSource("lookupArgs") + void testLookupWithProjection(Caching caching, boolean async) throws Exception { + String dim = + prepareDimTableAndSourceTable(caching, async, new String[] {"name"}, null, null); + String dimJoinQuery = + String.format( + "SELECT a, c, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" + + " ON src.b = h.name", + dim); + + CloseableIterator collected = tEnv.executeSql(dimJoinQuery).collect(); + List expected = + Arrays.asList("+I[1, 11, address5]", "+I[2, 2, address2]", "+I[10, 44, address4]"); + assertResultsIgnoreOrder(collected, expected, true); + } + /** * lookup table with one pk, two join condition and one of the join condition is constant value. */ @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null, null); String dimJoinQuery = String.format( "SELECT a, b, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -696,7 +713,7 @@ void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Ex @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null, null); String dimJoinQuery = String.format( "SELECT a, b, c, h.address FROM src LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -709,7 +726,7 @@ void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Ex "+I[1, name1, 11, null]", "+I[2, name2, 2, address2]", "+I[3, name33, 33, null]", - "+I[10, name4, 44, null]"); + "+I[10, name0, 44, null]"); assertResultsIgnoreOrder(collected, expected, true); } @@ -718,7 +735,8 @@ void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Ex @MethodSource("lookupArgs") void testLookup2PkTable(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, null); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -739,7 +757,8 @@ void testLookup2PkTable(Caching caching, boolean async) throws Exception { void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws Exception { // the primary key is (name, id) but the schema order is (id, name) String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"name", "id"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"name", "id"}, null, null); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -759,7 +778,8 @@ void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws E @MethodSource("lookupArgs") void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, null); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -767,9 +787,9 @@ void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throw dim); assertThatThrownBy(() -> tEnv.executeSql(dimJoinQuery)) .hasStackTraceContaining( - "Fluss lookup function only supports lookup table with " - + "lookup keys contain all primary keys or bucket keys. Can't find primary " - + "key or bucket key 'name' in lookup keys [id]"); + "The Fluss lookup function supports lookup tables where" + + " the lookup keys include all primary keys or all bucket keys." + + " Can't find expected key 'name' in lookup keys [id]"); } /** @@ -780,7 +800,8 @@ void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throw @MethodSource("lookupArgs") void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, null); String dimJoinQuery = String.format( "SELECT a, h.name, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -795,7 +816,8 @@ void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Ex @ParameterizedTest @MethodSource("lookupArgs") void testLookupPartitionedTable(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, "p_date"); + String dim = + prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null, "p_date"); String dimJoinQuery = String.format( @@ -808,6 +830,45 @@ void testLookupPartitionedTable(Caching caching, boolean async) throws Exception assertResultsIgnoreOrder(collected, expected, true); } + @ParameterizedTest + @MethodSource("lookupArgs") + void testPrefixLookup(Caching caching, boolean async) throws Exception { + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"name", "id"}, new String[] {"name"}, null); + String dimJoinQuery = + String.format( + "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" + + " ON src.b = h.name", + dim); + + CloseableIterator collected = tEnv.executeSql(dimJoinQuery).collect(); + List expected = + Arrays.asList( + "+I[1, name1, address1]", + "+I[1, name1, address5]", + "+I[2, name2, address2]", + "+I[10, name0, address4]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @ParameterizedTest + @MethodSource("lookupArgs") + void testPrefixLookupWithCondition(Caching caching, boolean async) throws Exception { + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"name", "id"}, new String[] {"name"}, null); + String dimJoinQuery = + String.format( + "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" + + " ON src.b = h.name AND h.address = 'address5'", + dim); + + CloseableIterator collected = tEnv.executeSql(dimJoinQuery).collect(); + List expected = Collections.singletonList("+I[1, name1, address5]"); + assertResultsIgnoreOrder(collected, expected, true); + } + @Test void testLookupFullCacheThrowException() { tEnv.executeSql( @@ -840,7 +901,11 @@ private InternalRow genRow(boolean isPkTable, RowType rowType, Object[] objects) * @return the table name of the dim table */ private String prepareDimTableAndSourceTable( - Caching caching, boolean async, String[] keys, @Nullable String partitionedKey) + Caching caching, + boolean async, + String[] primaryKeys, + @Nullable String[] bucketKeys, + @Nullable String partitionedKey) throws Exception { String options = async ? "'lookup.async' = 'true'" : "'lookup.async' = 'false'"; if (caching == Caching.ENABLE_CACHE) { @@ -849,6 +914,12 @@ private String prepareDimTableAndSourceTable( + ",'lookup.partial-cache.max-rows' = '1000'" + ",'lookup.partial-cache.expire-after-write' = '10min'"; } + String bucketOptions = + bucketKeys == null + ? "" + : ", 'bucket.num' = '1', 'bucket.key' = '" + + String.join(",", bucketKeys) + + "'"; // create dim table String tableName = @@ -856,7 +927,7 @@ private String prepareDimTableAndSourceTable( "lookup_test_%s_%s_pk_%s_%s", caching.name().toLowerCase(), async ? "async" : "sync", - String.join("_", keys), + String.join("_", primaryKeys), RandomUtils.nextInt()); if (partitionedKey == null) { tEnv.executeSql( @@ -865,8 +936,8 @@ private String prepareDimTableAndSourceTable( + " id int not null," + " address varchar," + " name varchar," - + " primary key (%s) NOT ENFORCED) with (%s)", - tableName, String.join(",", keys), options)); + + " primary key (%s) NOT ENFORCED) with (%s %s)", + tableName, String.join(",", primaryKeys), options, bucketOptions)); } else { tEnv.executeSql( String.format( @@ -876,13 +947,15 @@ private String prepareDimTableAndSourceTable( + " name varchar," + " %s varchar , " + " primary key (%s, %s) NOT ENFORCED) partitioned by (%s) with (%s , " - + "'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')", + + " 'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year'" + + " %s)", tableName, partitionedKey, - String.join(",", keys), + String.join(",", primaryKeys), partitionedKey, partitionedKey, - options)); + options, + bucketOptions)); } TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); @@ -904,8 +977,8 @@ private String prepareDimTableAndSourceTable( for (int i = 1; i <= 5; i++) { Object[] values = partition1 == null - ? new Object[] {i, "address" + i, "name" + i} - : new Object[] {i, "address" + i, "name" + i, partition1}; + ? new Object[] {i, "address" + i, "name" + i % 4} + : new Object[] {i, "address" + i, "name" + i % 4, partition1}; upsertWriter.upsert(compactedRow(dimTableRowType, values)); } upsertWriter.flush(); @@ -918,12 +991,12 @@ private String prepareDimTableAndSourceTable( Row.of(1, "name1", 11), Row.of(2, "name2", 2), Row.of(3, "name33", 33), - Row.of(10, "name4", 44)) + Row.of(10, "name0", 44)) : Arrays.asList( Row.of(1, "name1", 11, partition1), Row.of(2, "name2", 2, partition1), Row.of(3, "name33", 33, partition2), - Row.of(10, "name4", 44, partition2)); + Row.of(10, "name0", 44, partition2)); Schema.Builder builder = Schema.newBuilder() .column("a", DataTypes.INT()) diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunctionTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunctionTest.java index 471f5ea2..4d3e74a0 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunctionTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunctionTest.java @@ -21,11 +21,11 @@ import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase; import com.alibaba.fluss.connector.flink.utils.FlinkConversions; import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.types.RowType; import org.apache.flink.table.connector.source.lookup.LookupOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.AsyncLookupFunction; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; @@ -37,6 +37,7 @@ import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; +import static com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static org.assertj.core.api.Assertions.assertThat; @@ -48,14 +49,14 @@ void testSyncLookupEval() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "sync-lookup-table"); prepareData(tablePath, 3); + RowType flinkRowType = FlinkConversions.toFlinkRowType(DEFAULT_PK_TABLE_SCHEMA.toRowType()); FlinkLookupFunction lookupFunction = new FlinkLookupFunction( clientConf, tablePath, - FlinkConversions.toFlinkRowType(DEFAULT_PK_TABLE_SCHEMA.toRowType()), - new int[] {0}, + flinkRowType, LookupOptions.MAX_RETRIES.defaultValue(), - LookupNormalizer.NOOP_NORMALIZER, + createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), null); ListOutputCollector collector = new ListOutputCollector(); @@ -84,14 +85,15 @@ void testAsyncLookupEval() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "async-lookup-table"); int rows = 3; prepareData(tablePath, rows); + + RowType flinkRowType = FlinkConversions.toFlinkRowType(DEFAULT_PK_TABLE_SCHEMA.toRowType()); AsyncLookupFunction asyncLookupFunction = new FlinkAsyncLookupFunction( clientConf, tablePath, - FlinkConversions.toFlinkRowType(DEFAULT_PK_TABLE_SCHEMA.toRowType()), - new int[] {0}, + flinkRowType, LookupOptions.MAX_RETRIES.defaultValue(), - LookupNormalizer.NOOP_NORMALIZER, + createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), null); asyncLookupFunction.open(null); @@ -134,7 +136,7 @@ void testAsyncLookupEval() throws Exception { private void prepareData(TablePath tablePath, int rows) throws Exception { createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); - RowType rowType = DEFAULT_PK_TABLE_SCHEMA.toRowType(); + com.alibaba.fluss.types.RowType rowType = DEFAULT_PK_TABLE_SCHEMA.toRowType(); // first write some data to the table try (Table table = conn.getTable(tablePath)) { diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/PrefixLookupResultForBucket.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/PrefixLookupResultForBucket.java index 508401eb..e520bccf 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/PrefixLookupResultForBucket.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/entity/PrefixLookupResultForBucket.java @@ -35,7 +35,7 @@ public PrefixLookupResultForBucket(TableBucket tableBucket, ApiError error) { this(tableBucket, null, error); } - public PrefixLookupResultForBucket( + private PrefixLookupResultForBucket( TableBucket tableBucket, List> values, ApiError error) { super(tableBucket, error); this.values = values; diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java index efc96fde..85e8f362 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java @@ -102,9 +102,9 @@ CompletableFuture notifyLeaderAndIsr( CompletableFuture lookup(LookupRequest request); /** - * Prefix lookup to get value by index key. + * Prefix lookup to get value by prefix key. * - * @return Index lookup response. + * @return Prefix lookup response. */ @RPC(api = ApiKeys.PREFIX_LOOKUP) CompletableFuture prefixLookup(PrefixLookupRequest request); diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java index 74741c49..ea223c8a 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java @@ -94,7 +94,7 @@ private static String toRequestName(ApiKeys apiKeys, boolean isFromFollower) { case LOOKUP: return "lookup"; case PREFIX_LOOKUP: - return "indexLookup"; + return "prefixLookup"; case FETCH_LOG: return isFromFollower ? "fetchLogFollower" : "fetchLogClient"; default: diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java index 2f3fb8c3..b8fb107f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java @@ -106,10 +106,6 @@ public List prefixLookup(byte[] prefixKey) { RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); try { iterator.seek(prefixKey); - // TODO, This is very inefficient to compare arrays byte by byte. In the future we can - // use JDK9 Arrays.compare(compare(byte[] a, int aFromIndex, int aToIndex, byte[] b, int - // bFromIndex, int bToIndex)) to instead. See issue: - // https://github.com/alibaba/fluss/issues/271 while (iterator.isValid() && isPrefixEquals(prefixKey, iterator.key())) { pkList.add(iterator.value()); iterator.next(); @@ -210,20 +206,23 @@ public RocksDB getDb() { } /** - * Check if the given two byte arrays have the same prefix. If bytes2 is shorter than bytes1, - * return false. Otherwise, compare bytes1 and bytes2 from the start until the end of bytes2. If - * all bytes are equal, return true. + * Check if the given first byte array ({@code prefix}) is a prefix of the second byte array + * ({@code bytes}). * - * @param bytes1 The first byte array - * @param bytes2 The second byte array - * @return true if the given two byte arrays have the same prefix, false otherwise + * @param prefix The prefix byte array + * @param bytes The byte array to check if it has the prefix + * @return true if the given bytes has the given prefix, false otherwise */ - public static boolean isPrefixEquals(byte[] bytes1, byte[] bytes2) { - if (bytes1.length > bytes2.length) { + public static boolean isPrefixEquals(byte[] prefix, byte[] bytes) { + // TODO, This is very inefficient to compare arrays byte by byte. In the future we can + // use JDK9 Arrays.compare(compare(byte[] a, int aFromIndex, int aToIndex, byte[] b, int + // bFromIndex, int bToIndex)) to instead. See issue: + // https://github.com/alibaba/fluss/issues/271 + if (prefix.length > bytes.length) { return false; } - for (int i = 0; i < bytes1.length; i++) { - if (bytes1[i] != bytes2[i]) { + for (int i = 0; i < prefix.length; i++) { + if (prefix[i] != bytes[i]) { return false; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java index ec3ba43d..5d651d80 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java @@ -368,7 +368,7 @@ public KvMetricGroup(PhysicalTableMetricGroup physicalTableMetricGroup) { MetricNames.FAILED_LIMIT_SCAN_REQUESTS_RATE, new MeterView(failedLimitScanRequests)); - // for index lookup request + // for prefix lookup request totalPrefixLookupRequests = new ThreadSafeSimpleCounter(); meter( MetricNames.TOTAL_PREFIX_LOOKUP_REQUESTS_RATE, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 08c2aaa1..77dcba13 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -166,7 +166,6 @@ public final class Replica { private final KvFormat kvFormat; private final long logTTLMs; private final boolean dataLakeEnabled; - private final boolean supportPrefixLookup; private final int tieredLogLocalSegments; private final AtomicReference leaderReplicaIdOpt = new AtomicReference<>(); private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock(); @@ -232,7 +231,6 @@ public Replica( this.snapshotContext = snapshotContext; // create a closeable registry for the replica this.closeableRegistry = new CloseableRegistry(); - this.supportPrefixLookup = supportPrefixLookup(tableDescriptor); this.logTablet = createLog(lazyHighWatermarkCheckpoint); registerMetrics(); @@ -309,10 +307,6 @@ public long getLogTTLMs() { return logTTLMs; } - public boolean supportPrefixLookup() { - return supportPrefixLookup; - } - public int writerIdCount() { return logTablet.getWriterIdCount(); } @@ -1735,11 +1729,6 @@ private void traceAckInfo(List curMaximalIsr, long requiredOffset) { .collect(Collectors.toList())); } - private boolean supportPrefixLookup(TableDescriptor tableDescriptor) { - return TableDescriptor.bucketKeysMatchPrefixLookupPattern( - tableDescriptor.getSchema(), tableDescriptor.getBucketKey()); - } - @VisibleForTesting public int getBucketEpoch() { return bucketEpoch; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 6d31b703..957f549c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -24,7 +24,6 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.InvalidCoordinatorException; import com.alibaba.fluss.exception.InvalidRequiredAcksException; -import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.exception.LogOffsetOutOfRangeException; import com.alibaba.fluss.exception.LogStorageException; import com.alibaba.fluss.exception.NotLeaderOrFollowerException; @@ -469,25 +468,13 @@ public void prefixLookups( List> resultForBucket = new ArrayList<>(); try { Replica replica = getReplicaOrException(tb); - if (!replica.supportPrefixLookup()) { - result.put( - tb, - new PrefixLookupResultForBucket( - tb, - ApiError.fromThrowable( - new KvStorageException( - "Table bucket " - + tb - + " does not support prefix lookup")))); - continue; - } - tableMetrics = replica.tableMetrics(); tableMetrics.totalPrefixLookupRequests().inc(); for (byte[] prefixKey : entry.getValue()) { - List resultForPerKey = new ArrayList<>(replica.prefixLookup(prefixKey)); + List resultForPerKey = replica.prefixLookup(prefixKey); resultForBucket.add(resultForPerKey); } + result.put(tb, new PrefixLookupResultForBucket(tb, resultForBucket)); } catch (Exception e) { if (isUnexpectedException(e)) { LOG.error("Error processing prefix lookup operation on replica {}", tb, e); @@ -497,8 +484,6 @@ public void prefixLookups( } result.put(tb, new PrefixLookupResultForBucket(tb, ApiError.fromThrowable(e))); } - - result.put(tb, new PrefixLookupResultForBucket(tb, resultForBucket)); } responseCallback.accept(result); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index b60f8845..e438ee13 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -74,7 +74,7 @@ import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_KEY_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; -import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; +import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; @@ -674,17 +674,16 @@ void testPrefixLookup() throws Exception { Arrays.asList(prefixKey1Bytes, prefixKey2Bytes), Arrays.asList(key1ExpectedValues, key2ExpectedValues)); - // Prefix lookup an unsupported prefixLookup table. - tablePath = TablePath.of("test_db_1", "test_unsupported_prefix_lookup_t1"); + // Prefix lookup an unsupported prefixLookup table (a log table). tableId = registerTableInZkClient( - tablePath, - DATA1_SCHEMA_PK, - 200L, + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 2001L, Collections.emptyList(), Collections.emptyMap()); TableBucket tb3 = new TableBucket(tableId, 0); - makeKvTableAsLeader(tableId, tablePath, tb3.getBucket()); + makeLogTableAsLeader(tb3, false); replicaManager.prefixLookups( Collections.singletonMap(tb3, Collections.singletonList(prefixKey2Bytes)), (prefixLookupResultForBuckets) -> { @@ -692,9 +691,11 @@ void testPrefixLookup() throws Exception { prefixLookupResultForBuckets.get(tb3); assertThat(lookupResultForBucket.failed()).isTrue(); ApiError apiError = lookupResultForBucket.getError(); - assertThat(apiError.error()).isEqualTo(Errors.KV_STORAGE_EXCEPTION); + assertThat(apiError.error()).isEqualTo(Errors.NON_PRIMARY_KEY_TABLE_EXCEPTION); assertThat(apiError.message()) - .isEqualTo("Table bucket " + tb3 + " does not support prefix lookup"); + .isEqualTo( + "Try to do prefix lookup on a non primary key table: " + + DATA1_TABLE_PATH); }); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index 717a884e..24c6e97f 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -493,13 +493,12 @@ void testPrefixLookup() throws Exception { Arrays.asList(key1ExpectedValues, key2ExpectedValues)); // Prefix lookup an unsupported prefixLookup table. - TableDescriptor unsupportedDescriptor = TableDescriptor.builder().schema(schema).build(); - long tableId2 = + long logTableId = createTable( FLUSS_CLUSTER_EXTENSION, - TablePath.of("test_db_1", "test_unsupported_prefix_lookup_t1"), - unsupportedDescriptor); - tb = new TableBucket(tableId2, 0); + DATA1_TABLE_PATH, + DATA1_TABLE_INFO.getTableDescriptor()); + tb = new TableBucket(logTableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); TabletServerGateway leaderGateWay2 = @@ -508,15 +507,13 @@ void testPrefixLookup() throws Exception { leaderGateWay2 .prefixLookup( newPrefixLookupRequest( - tableId2, 0, Collections.singletonList(prefixKey1Bytes))) + logTableId, 0, Collections.singletonList(prefixKey1Bytes))) .get() .getBucketsRespAt(0); verifyPrefixLookupBucketError( pbPrefixLookupRespForBucket, - Errors.KV_STORAGE_EXCEPTION, - "Table bucket TableBucket{tableId=" - + tableId2 - + ", bucket=0} does not support prefix lookup"); + Errors.NON_PRIMARY_KEY_TABLE_EXCEPTION, + "Try to do prefix lookup on a non primary key table: " + DATA1_TABLE_PATH); } @Test diff --git a/website/docs/maintenance/monitor-metrics.md b/website/docs/maintenance/monitor-metrics.md index 9078c106..7304a7e2 100644 --- a/website/docs/maintenance/monitor-metrics.md +++ b/website/docs/maintenance/monitor-metrics.md @@ -501,13 +501,13 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Meter - totalIndexLookupRequestsPerSecond - The number of index lookup requests to index lookup value by key from this table per second. + totalPrefixLookupRequestsPerSecond + The number of prefix lookup requests to lookup value by prefix key from this table per second. Meter - failedIndexLookupRequestsPerSecond - The number of failed index lookup requests to index lookup value by key from this table per second. + failedPrefixLookupRequestsPerSecond + The number of failed prefix lookup requests to lookup value by prefix key from this table per second. Meter