From 742d5989c2009965c3fd552add5cfe5c8e65fcdc Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Wed, 18 Dec 2024 20:46:42 +0800 Subject: [PATCH] [kv] Support index lookup for primary key table --- .../fluss/client/lookup/AbstractLookup.java | 41 ++++ .../client/lookup/AbstractLookupBatch.java | 47 +++++ .../fluss/client/lookup/IndexLookup.java | 70 +++++++ .../fluss/client/lookup/IndexLookupBatch.java | 65 ++++++ .../alibaba/fluss/client/lookup/Lookup.java | 15 +- .../fluss/client/lookup/LookupBatch.java | 15 +- .../fluss/client/lookup/LookupClient.java | 16 +- .../fluss/client/lookup/LookupQueue.java | 74 +++++-- .../{table => lookup}/LookupResult.java | 23 +-- .../fluss/client/lookup/LookupSender.java | 193 +++++++++++++++--- .../fluss/client/lookup/LookupType.java | 26 +++ .../fluss/client/table/FlussTable.java | 84 ++++++-- .../com/alibaba/fluss/client/table/Table.java | 16 ++ .../client/table/writer/UpsertWriter.java | 22 +- .../client/utils/ClientRpcMessageUtils.java | 19 ++ .../fluss/client/admin/FlussAdminITCase.java | 30 +++ .../table/FlussPartitionedTableITCase.java | 3 +- .../fluss/client/table/FlussTableITCase.java | 64 +++++- .../alibaba/fluss/config/ConfigOptions.java | 18 ++ .../exception/InvalidIndexKeysException.java | 32 +++ .../fluss/metadata/TableDescriptor.java | 99 +++++++++ .../alibaba/fluss/metrics/MetricNames.java | 4 + .../alibaba/fluss/row/encode/KeyEncoder.java | 2 +- .../com/alibaba/fluss/utils/BytesUtils.java | 16 ++ .../fluss/testutils/DataTestUtils.java | 22 +- .../flink/FlinkConnectorOptions.java | 12 ++ .../connector/flink/catalog/FlinkCatalog.java | 16 +- .../flink/catalog/FlinkTableFactory.java | 21 +- .../flink/source/FlinkTableSource.java | 22 +- .../lookup/FlinkAsyncLookupFunction.java | 66 +++--- .../source/lookup/FlinkLookupFunction.java | 50 +++-- .../flink/source/lookup/LookupNormalizer.java | 116 +++++++++-- .../utils/FlinkConnectorOptionsUtils.java | 87 +++++++- .../flink/utils/FlinkConversions.java | 6 + .../rpc/gateway/TabletServerGateway.java | 10 + .../alibaba/fluss/rpc/protocol/ApiKeys.java | 3 +- .../alibaba/fluss/rpc/protocol/Errors.java | 5 +- fluss-rpc/src/main/proto/FlussApi.proto | 31 +++ .../coordinator/CoordinatorService.java | 6 + .../com/alibaba/fluss/server/kv/KvTablet.java | 9 + .../fluss/server/kv/rocksdb/RocksDBKv.java | 30 ++- .../group/PhysicalTableMetricGroup.java | 28 +++ .../alibaba/fluss/server/replica/Replica.java | 38 ++++ .../fluss/server/replica/ReplicaManager.java | 48 +++++ .../fluss/server/tablet/TabletService.java | 10 + .../fluss/server/utils/RpcMessageUtils.java | 24 +++ .../log/remote/RemoteLogManagerTest.java | 12 +- .../server/replica/ReplicaManagerTest.java | 34 ++- .../fluss/server/replica/ReplicaTestBase.java | 27 ++- .../server/tablet/TabletServiceITCase.java | 174 +++++++++++++++- .../tablet/TestTabletServerGateway.java | 7 + .../fluss/server/testutils/KvTestUtils.java | 22 ++ .../server/testutils/RpcMessageTestUtils.java | 13 ++ 53 files changed, 1733 insertions(+), 210 deletions(-) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java rename fluss-client/src/main/java/com/alibaba/fluss/client/{table => lookup}/LookupResult.java (72%) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java new file mode 100644 index 00000000..5f5c8537 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Abstract Class to represent a lookup operation. */ +@Internal +public abstract class AbstractLookup { + + private final byte[] key; + + public AbstractLookup(byte[] key) { + this.key = key; + } + + public byte[] key() { + return key; + } + + public abstract LookupType lookupType(); + + public abstract CompletableFuture> future(); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java new file mode 100644 index 00000000..229fe826 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.ArrayList; +import java.util.List; + +/** An abstract lookup batch. */ +@Internal +public abstract class AbstractLookupBatch { + + protected final List lookups; + + public AbstractLookupBatch() { + this.lookups = new ArrayList<>(); + } + + public void addLookup(AbstractLookup lookup) { + lookups.add(lookup); + } + + public List lookups() { + return lookups; + } + + /** Complete the lookup operations using given values . */ + public abstract void complete(List> values); + + /** Complete the get operations with given exception. */ + public abstract void completeExceptionally(Exception exception); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java new file mode 100644 index 00000000..f1bbea14 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Class to represent an index lookup operation, it contains the table id, bucketNums and related + * CompletableFuture. + */ +@Internal +public class IndexLookup extends AbstractLookup { + private final int destination; + private final long tableId; + private final int bucketId; + private final String indexName; + private final CompletableFuture> future; + + public IndexLookup( + int destination, long tableId, int bucketId, String indexName, byte[] indexKey) { + super(indexKey); + this.destination = destination; + this.tableId = tableId; + this.bucketId = bucketId; + this.indexName = indexName; + this.future = new CompletableFuture<>(); + } + + public int destination() { + return destination; + } + + public long tableId() { + return tableId; + } + + public int bucketId() { + return bucketId; + } + + public String indexName() { + return indexName; + } + + public CompletableFuture> future() { + return future; + } + + @Override + public LookupType lookupType() { + return LookupType.INDEX_LOOKUP; + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java new file mode 100644 index 00000000..043a510f --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metadata.TableBucket; + +import java.util.List; + +/** + * A batch that contains the index operations that send to same destination and some table together. + */ +@Internal +public class IndexLookupBatch extends AbstractLookupBatch { + + private final TableBucket tableBucket; + + public IndexLookupBatch(TableBucket tableBucket) { + super(); + this.tableBucket = tableBucket; + } + + public TableBucket tableBucket() { + return tableBucket; + } + + @Override + public void complete(List> values) { + if (values.size() != lookups.size()) { + completeExceptionally( + new FlussRuntimeException( + String.format( + "The number of values return by index lookup request is not equal to the number of " + + "index lookups send. Got %d values, but expected %d.", + values.size(), lookups.size()))); + } else { + for (int i = 0; i < values.size(); i++) { + AbstractLookup lookup = lookups.get(i); + lookup.future().complete(values.get(i)); + } + } + } + + @Override + public void completeExceptionally(Exception exception) { + for (AbstractLookup get : lookups) { + get.future().completeExceptionally(exception); + } + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java index b4dd1ada..49716559 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.metadata.TableBucket; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -26,15 +27,14 @@ * from, the bytes of the key, and a future for the lookup operation. */ @Internal -public class Lookup { +public class Lookup extends AbstractLookup { private final TableBucket tableBucket; - private final byte[] key; - private final CompletableFuture future; + private final CompletableFuture> future; Lookup(TableBucket tableBucket, byte[] key) { + super(key); this.tableBucket = tableBucket; - this.key = key; this.future = new CompletableFuture<>(); } @@ -42,11 +42,12 @@ public TableBucket tableBucket() { return tableBucket; } - public byte[] key() { - return key; + @Override + public LookupType lookupType() { + return LookupType.LOOKUP; } - public CompletableFuture future() { + public CompletableFuture> future() { return future; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java index 32bcb13f..fb7df277 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java @@ -19,7 +19,6 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.TableBucket; -import com.alibaba.fluss.rpc.messages.PbValue; import java.util.ArrayList; import java.util.List; @@ -51,21 +50,21 @@ public TableBucket tableBucket() { } /** Complete the lookup operations using given values . */ - public void complete(List pbValues) { + public void complete(List> values) { // if the size of return values of lookup operation are not equal to the number of lookups, // should complete an exception. - if (pbValues.size() != lookups.size()) { + if (values.size() != lookups.size()) { completeExceptionally( new FlussRuntimeException( String.format( "The number of return values of lookup operation is not equal to the number of " + "lookups. Return %d values, but expected %d.", - pbValues.size(), lookups.size()))); + values.size(), lookups.size()))); } else { - for (int i = 0; i < pbValues.size(); i++) { - Lookup lookup = lookups.get(i); - PbValue pbValue = pbValues.get(i); - lookup.future().complete(pbValue.hasValues() ? pbValue.getValues() : null); + for (int i = 0; i < values.size(); i++) { + AbstractLookup lookup = lookups.get(i); + // single value. + lookup.future().complete(values.get(i)); } } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index 836a4d98..ea7e4da8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -29,6 +29,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,10 +58,12 @@ public class LookupClient { private final ExecutorService lookupSenderThreadPool; private final LookupSender lookupSender; + private final MetadataUpdater metadataUpdater; public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) { this.lookupQueue = new LookupQueue(conf); this.lookupSenderThreadPool = createThreadPool(); + this.metadataUpdater = metadataUpdater; this.lookupSender = new LookupSender( metadataUpdater, @@ -75,12 +78,23 @@ private ExecutorService createThreadPool() { return Executors.newFixedThreadPool(1, new ExecutorThreadFactory(LOOKUP_THREAD_PREFIX)); } - public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes) { + public CompletableFuture> lookup(TableBucket tableBucket, byte[] keyBytes) { Lookup lookup = new Lookup(tableBucket, keyBytes); lookupQueue.appendLookup(lookup); return lookup.future(); } + public CompletableFuture> indexLookup( + long tableId, int bucketId, String indexName, byte[] keyBytes) { + // TODO index lookup support partition table. + + // TODO leader and buckets may change during the index lookup. need do retry send. + int leader = metadataUpdater.leaderFor(new TableBucket(tableId, bucketId)); + IndexLookup indexLookup = new IndexLookup(leader, tableId, bucketId, indexName, keyBytes); + lookupQueue.appendLookup(indexLookup); + return indexLookup.future(); + } + public void close(Duration timeout) { LOG.info("Closing lookup client and lookup sender."); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java index fec36a1c..3b9c0f24 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java @@ -26,33 +26,55 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when - * call method {@link #drain()}. + * call method {@link #drain(LookupType)}. */ @ThreadSafe @Internal class LookupQueue { private volatile boolean closed; - private final ArrayBlockingQueue lookupQueue; + private final ArrayBlockingQueue lookupQueue; + private final ArrayBlockingQueue indexLookupQueue; + // next drain queue, 0 or 1, 0 means lookupQueue, 1 means indexLookupQueue. + private final AtomicInteger nextDrainQueue; private final int maxBatchSize; + private final long batchTimeoutMs; LookupQueue(Configuration conf) { this.lookupQueue = new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE)); + this.indexLookupQueue = + new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE)); + this.nextDrainQueue = new AtomicInteger(0); this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE); + this.batchTimeoutMs = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toMillis(); this.closed = false; } - void appendLookup(Lookup lookup) { + LookupType nexDrainLookupType() { + int drainQueueId = nextDrainQueue.updateAndGet(i -> i == 0 ? 1 : 0); + if (drainQueueId == 0) { + return LookupType.LOOKUP; + } else { + return LookupType.INDEX_LOOKUP; + } + } + + void appendLookup(AbstractLookup lookup) { if (closed) { throw new IllegalStateException( "Can not append lookup operation since the LookupQueue is closed."); } try { - lookupQueue.put(lookup); + if (lookup.lookupType() == LookupType.LOOKUP) { + lookupQueue.put(lookup); + } else { + indexLookupQueue.put(lookup); + } } catch (InterruptedException e) { lookup.future().completeExceptionally(e); } @@ -63,21 +85,43 @@ boolean hasUnDrained() { } /** Drain a batch of {@link Lookup}s from the lookup queue. */ - List drain() throws Exception { - List lookups = new ArrayList<>(maxBatchSize); - Lookup firstLookup = lookupQueue.poll(300, TimeUnit.MILLISECONDS); - if (firstLookup != null) { - lookups.add(firstLookup); - lookupQueue.drainTo(lookups, maxBatchSize - 1); + List drain(LookupType lookupType) throws Exception { + long start = System.currentTimeMillis(); + List lookupOperations = new ArrayList<>(maxBatchSize); + ArrayBlockingQueue queue = getQueue(lookupType); + int count = 0; + while (true) { + if (System.currentTimeMillis() - start > batchTimeoutMs) { + break; + } + + AbstractLookup lookup = queue.poll(300, TimeUnit.MILLISECONDS); + if (lookup == null) { + break; + } + count++; + lookupOperations.add(lookup); + if (count >= maxBatchSize) { + break; + } } - return lookups; + return lookupOperations; } /** Drain all the {@link Lookup}s from the lookup queue. */ - List drainAll() { - List lookups = new ArrayList<>(lookupQueue.size()); - lookupQueue.drainTo(lookups); - return lookups; + List drainAll(LookupType lookupType) { + ArrayBlockingQueue queue = getQueue(lookupType); + List lookupOperations = new ArrayList<>(queue.size()); + queue.drainTo(lookupOperations); + return lookupOperations; + } + + private ArrayBlockingQueue getQueue(LookupType lookupType) { + if (lookupType == LookupType.LOOKUP) { + return this.lookupQueue; + } else { + return this.indexLookupQueue; + } } public void close() { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java similarity index 72% rename from fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java index 9d8bee92..d3c55437 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package com.alibaba.fluss.client.table; +package com.alibaba.fluss.client.lookup; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.row.InternalRow; -import javax.annotation.Nullable; - -import java.util.Objects; +import java.util.List; /** * The result of {@link Table#lookup(InternalRow)}. @@ -30,14 +29,14 @@ */ @PublicEvolving public final class LookupResult { - private final @Nullable InternalRow row; + private final List rowList; - public LookupResult(@Nullable InternalRow row) { - this.row = row; + public LookupResult(List rowList) { + this.rowList = rowList; } - public @Nullable InternalRow getRow() { - return row; + public List getRowList() { + return rowList; } @Override @@ -50,16 +49,16 @@ public boolean equals(Object o) { } LookupResult lookupResult = (LookupResult) o; - return Objects.equals(row, lookupResult.row); + return rowList.equals(lookupResult.rowList); } @Override public int hashCode() { - return Objects.hash(row); + return rowList.hashCode(); } @Override public String toString() { - return "LookupResult{row=" + row + '}'; + return "LookupResult{" + "rowList=" + rowList + '}'; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java index 87021397..b1e312dd 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java @@ -21,21 +21,27 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.LookupRequest; import com.alibaba.fluss.rpc.messages.LookupResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.messages.PbLookupRespForBucket; -import com.alibaba.fluss.rpc.messages.PbValue; import com.alibaba.fluss.rpc.protocol.ApiError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; +import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeIndexLookupRequest; import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeLookupRequest; /** @@ -96,57 +102,83 @@ public void run() { /** Run a single iteration of sending. */ private void runOnce(boolean drainAll) throws Exception { - List lookups = drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); - sendLookups(lookups); + if (drainAll) { + sendLookups(LookupType.LOOKUP, lookupQueue.drainAll(LookupType.LOOKUP)); + sendLookups(LookupType.INDEX_LOOKUP, lookupQueue.drainAll(LookupType.INDEX_LOOKUP)); + } else { + LookupType lookupType = lookupQueue.nexDrainLookupType(); + sendLookups(lookupType, lookupQueue.drain(lookupType)); + } } - private void sendLookups(List lookups) { + private void sendLookups(LookupType lookupType, List lookups) { if (lookups.isEmpty()) { return; } // group by to lookup batches - Map> lookupBatches = groupByLeader(lookups); + Map> lookupBatches = groupByLeader(lookups); // now, send the batches - lookupBatches.forEach(this::sendLookups); + lookupBatches.forEach((destination, batch) -> sendLookups(destination, lookupType, batch)); } - private Map> groupByLeader(List lookups) { + private Map> groupByLeader(List lookups) { // leader -> lookup batches - Map> lookupBatchesByLeader = new HashMap<>(); - for (Lookup lookup : lookups) { - // get the leader node - TableBucket tb = lookup.tableBucket(); - + Map> lookupBatchesByLeader = new HashMap<>(); + for (AbstractLookup abstractLookup : lookups) { int leader; - try { - // TODO this can be a re-triable operation. We should retry here instead of throwing - // exception. - leader = metadataUpdater.leaderFor(tb); - } catch (Exception e) { - lookup.future().completeExceptionally(e); - continue; + if (abstractLookup instanceof Lookup) { + Lookup lookup = (Lookup) abstractLookup; + // lookup the leader node + TableBucket tb = lookup.tableBucket(); + try { + // TODO this can be a re-triable operation. We should retry here instead of + // throwing exception. + leader = metadataUpdater.leaderFor(tb); + } catch (Exception e) { + lookup.future().completeExceptionally(e); + continue; + } + } else if (abstractLookup instanceof IndexLookup) { + IndexLookup indexLookup = (IndexLookup) abstractLookup; + leader = indexLookup.destination(); + } else { + throw new IllegalArgumentException("Unsupported lookup type: " + abstractLookup); } - - lookupBatchesByLeader.computeIfAbsent(leader, k -> new ArrayList<>()).add(lookup); + lookupBatchesByLeader + .computeIfAbsent(leader, k -> new ArrayList<>()) + .add(abstractLookup); } return lookupBatchesByLeader; } - private void sendLookups(int destination, List lookupBatches) { + private void sendLookups( + int destination, LookupType lookupType, List lookupBatches) { TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination); + if (lookupType == LookupType.LOOKUP) { + sendLookupRequest(gateway, lookupBatches); + } else if (lookupType == LookupType.INDEX_LOOKUP) { + sendIndexLookupRequest(gateway, lookupBatches); + } else { + throw new IllegalArgumentException("Unsupported lookup type: " + lookupType); + } + } + + private void sendLookupRequest( + TabletServerGateway gateway, List lookupBatches) { // table id -> (bucket -> lookups) - Map> lookupsByTableId = new HashMap<>(); - for (Lookup lookup : lookupBatches) { + Map> lookupByTableId = new HashMap<>(); + for (AbstractLookup abstractLookup : lookupBatches) { + Lookup lookup = (Lookup) abstractLookup; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); - lookupsByTableId + lookupByTableId .computeIfAbsent(tableId, k -> new HashMap<>()) .computeIfAbsent(tb, k -> new LookupBatch(tb)) .addLookup(lookup); } - lookupsByTableId.forEach( + lookupByTableId.forEach( (tableId, lookupsByBucket) -> sendLookupRequestAndHandleResponse( gateway, @@ -155,6 +187,29 @@ private void sendLookups(int destination, List lookupBatches) { lookupsByBucket)); } + private void sendIndexLookupRequest( + TabletServerGateway gateway, List indexLookupBatches) { + // table id -> (bucket -> lookups) + Map> lookupByTableId = new HashMap<>(); + for (AbstractLookup abstractLookup : indexLookupBatches) { + IndexLookup indexLookup = (IndexLookup) abstractLookup; + TableBucket tb = new TableBucket(indexLookup.tableId(), indexLookup.bucketId()); + long tableId = tb.getTableId(); + lookupByTableId + .computeIfAbsent(tableId, k -> new HashMap<>()) + .computeIfAbsent(tb, k -> new IndexLookupBatch(tb)) + .addLookup(indexLookup); + } + + lookupByTableId.forEach( + (tableId, indexLookupBatch) -> + sendIndexLookupRequestAndHandleResponse( + gateway, + makeIndexLookupRequest(tableId, indexLookupBatch.values()), + tableId, + indexLookupBatch)); + } + private void sendLookupRequestAndHandleResponse( TabletServerGateway gateway, LookupRequest lookupRequest, @@ -186,6 +241,38 @@ private void sendLookupRequestAndHandleResponse( }); } + private void sendIndexLookupRequestAndHandleResponse( + TabletServerGateway gateway, + IndexLookupRequest indexLookupRequest, + long tableId, + Map lookupsByBucket) { + try { + maxInFlightReuqestsSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FlussRuntimeException("interrupted:", e); + } + gateway.indexLookup(indexLookupRequest) + .thenAccept( + indexLookupResponse -> { + try { + handleIndexLookupResponse( + tableId, indexLookupResponse, lookupsByBucket); + } finally { + maxInFlightReuqestsSemaphore.release(); + } + }) + .exceptionally( + e -> { + try { + handleIndexLookupException(e, lookupsByBucket); + return null; + } finally { + maxInFlightReuqestsSemaphore.release(); + } + }); + } + private void handleLookupResponse( long tableId, LookupResponse lookupResponse, @@ -208,9 +295,46 @@ private void handleLookupResponse( error.formatErrMsg()); lookupBatch.completeExceptionally(error.exception()); } else { - List pbValues = pbLookupRespForBucket.getValuesList(); - lookupBatch.complete(pbValues); + List> byteValues = + pbLookupRespForBucket.getValuesList().stream() + .map( + pbValue -> { + if (pbValue.hasValues()) { + return Collections.singletonList( + pbValue.getValues()); + } else { + return null; + } + }) + .collect(Collectors.toList()); + lookupBatch.complete(byteValues); + } + } + } + + private void handleIndexLookupResponse( + long tableId, + IndexLookupResponse indexLookupResponse, + Map indexLookupsByBucket) { + for (PbIndexLookupRespForBucket respForBucket : indexLookupResponse.getBucketsRespsList()) { + TableBucket tableBucket = + new TableBucket( + tableId, + respForBucket.hasPartitionId() ? respForBucket.getPartitionId() : null, + respForBucket.getBucketId()); + + // TODO If error, we need to retry send the request instead of throw exception. + IndexLookupBatch indexLookupBatch = indexLookupsByBucket.get(tableBucket); + List> result = new ArrayList<>(); + for (int i = 0; i < respForBucket.getKeysRespsCount(); i++) { + PbIndexLookupRespForKey respForKey = respForBucket.getKeysRespAt(i); + List keyResult = new ArrayList<>(); + for (int j = 0; j < respForKey.getValuesCount(); j++) { + keyResult.add(respForKey.getValueAt(j)); + } + result.add(keyResult); } + indexLookupBatch.complete(result); } } @@ -227,6 +351,19 @@ private void handleLookupRequestException( } } + private void handleIndexLookupException( + Throwable t, Map lookupsByBucket) { + ApiError error = ApiError.fromThrowable(t); + for (IndexLookupBatch lookupBatch : lookupsByBucket.values()) { + // TODO If error, we need to retry send the request instead of throw exception. + LOG.warn( + "Get error index lookup response on table bucket {}, fail. Error: {}", + lookupBatch.tableBucket(), + error.formatErrMsg()); + lookupBatch.completeExceptionally(error.exception()); + } + } + void forceClose() { forceClose = true; initiateClose(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java new file mode 100644 index 00000000..2e92d363 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +/** Enum to represent the type of lookup operation. */ +@Internal +enum LookupType { + LOOKUP, + INDEX_LOOKUP; +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 54dda698..3edd311d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; import com.alibaba.fluss.client.lookup.LookupClient; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.metadata.MetadataUpdater; import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; @@ -81,6 +82,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -103,15 +105,18 @@ public class FlussTable implements Table { private final TableInfo tableInfo; private final boolean hasPrimaryKey; private final int numBuckets; + private final @Nullable int[] indexKeyIndices; private final RowType keyRowType; - // encode the key bytes for kv lookups - private final KeyEncoder keyEncoder; // decode the lookup bytes to result row private final ValueDecoder kvValueDecoder; // a getter to extract partition from key row, null when it's not a partitioned primary key // table private final @Nullable PartitionGetter keyRowPartitionGetter; + private final KeyEncoder indexKeyEncoder; + private final KeyEncoder primaryKeyEncoder; + private final KeyEncoder lookupBucketKeyEncoder; + private final Supplier writerSupplier; private final Supplier lookupClientSupplier; private final AtomicBoolean closed; @@ -148,9 +153,6 @@ public FlussTable( this.hasPrimaryKey = tableDescriptor.hasPrimaryKey(); this.numBuckets = metadataUpdater.getBucketCount(tablePath); this.keyRowType = getKeyRowType(schema); - this.keyEncoder = - KeyEncoder.createKeyEncoder( - keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys()); this.keyRowPartitionGetter = tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() ? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys()) @@ -161,6 +163,26 @@ public FlussTable( RowDecoder.create( tableDescriptor.getKvFormat(), schema.toRowType().getChildren().toArray(new DataType[0]))); + + this.primaryKeyEncoder = new KeyEncoder(keyRowType); + Map indexKeyIndexes = tableDescriptor.getIndexKeyIndexes(); + if (indexKeyIndexes.size() > 1) { + throw new FlussRuntimeException( + String.format("table %s only support one index key", tablePath)); + } else if (indexKeyIndexes.size() == 1) { + this.indexKeyIndices = new ArrayList<>(indexKeyIndexes.values()).get(0); + this.indexKeyEncoder = new KeyEncoder(getIndexRowType(schema, indexKeyIndices)); + int length = indexKeyIndices.length; + int[] lookupBucketKeyIndices = new int[length]; + for (int i = 0; i < length; i++) { + lookupBucketKeyIndices[i] = i; + } + this.lookupBucketKeyEncoder = new KeyEncoder(keyRowType, lookupBucketKeyIndices); + } else { + this.indexKeyIndices = null; + this.indexKeyEncoder = null; + this.lookupBucketKeyEncoder = primaryKeyEncoder; + } } @Override @@ -176,20 +198,48 @@ public CompletableFuture lookup(InternalRow key) { } // encoding the key row using a compacted way consisted with how the key is encoded when put // a row - byte[] keyBytes = keyEncoder.encode(key); + byte[] keyBytes = primaryKeyEncoder.encode(key); + byte[] lookupBucketKeyBytes = lookupBucketKeyEncoder.encode(key); Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key); - int bucketId = getBucketId(keyBytes, key); + int bucketId = getBucketId(lookupBucketKeyBytes, key); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); return lookupClientSupplier .get() .lookup(tableBucket, keyBytes) .thenApply( valueBytes -> { - InternalRow row = + List rowList = valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row; - return new LookupResult(row); + ? Collections.emptyList() + : Collections.singletonList( + kvValueDecoder.decodeValue(valueBytes.get(0)) + .row); + return new LookupResult(rowList); + }); + } + + @Override + public CompletableFuture indexLookup(String indexName, InternalRow indexKey) { + if (!hasPrimaryKey) { + throw new FlussRuntimeException( + String.format("none-pk table %s not support lookup", tablePath)); + } + + byte[] indexKeyBytes = indexKeyEncoder.encode(indexKey); + int bucketId = HashBucketAssigner.bucketForRowKey(indexKeyBytes, numBuckets); + return lookupClientSupplier + .get() + .indexLookup(tableId, bucketId, indexName, indexKeyBytes) + .thenApply( + result -> { + List rowList = new ArrayList<>(); + for (byte[] valueBytes : result) { + rowList.add( + valueBytes == null + ? null + : kvValueDecoder.decodeValue(valueBytes).row); + } + return new LookupResult(rowList); }); } @@ -346,6 +396,15 @@ private RowType getKeyRowType(Schema schema) { return new RowType(keyRowFields); } + private RowType getIndexRowType(Schema schema, int[] indexIndexes) { + List keyRowFields = new ArrayList<>(indexIndexes.length); + List rowFields = schema.toRowType().getFields(); + for (int index : indexIndexes) { + keyRowFields.add(rowFields.get(index)); + } + return new RowType(keyRowFields); + } + @Override public AppendWriter getAppendWriter() { if (hasPrimaryKey) { @@ -367,7 +426,8 @@ public UpsertWriter getUpsertWriter(UpsertWrite upsertWrite) { tableInfo.getTableDescriptor(), upsertWrite, writerSupplier.get(), - metadataUpdater); + metadataUpdater, + indexKeyIndices); } @Override diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index b2373f7b..1712fd1d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -63,6 +64,21 @@ public interface Table extends AutoCloseable { */ CompletableFuture lookup(InternalRow key); + /** + * Lookup certain rows from the given table by index key. + * + *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary + * Key Table. + * + *

The index need to be created while creating the table. If this table doesn't have an index + * belong to the given index key, the exception will throw. + * + * @param indexName the given table index name. + * @param indexKey the given table index key. + * @return the result of index lookup. + */ + CompletableFuture indexLookup(String indexName, InternalRow indexKey); + /** * Extracts limit number of rows from the given table bucket. * diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index 01bcdfe7..9f4492ac 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -42,6 +42,7 @@ public class UpsertWriter extends TableWriter { private final KeyEncoder keyEncoder; + private final KeyEncoder bucketKeyEncoder; private final @Nullable int[] targetColumns; public UpsertWriter( @@ -49,7 +50,8 @@ public UpsertWriter( TableDescriptor tableDescriptor, UpsertWrite upsertWrite, WriterClient writerClient, - MetadataUpdater metadataUpdater) { + MetadataUpdater metadataUpdater, + @Nullable int[] indexKeyIndex) { super(tablePath, tableDescriptor, metadataUpdater, writerClient); Schema schema = tableDescriptor.getSchema(); sanityCheck(schema, upsertWrite.getPartialUpdateColumns()); @@ -61,6 +63,12 @@ public UpsertWriter( schema.toRowType(), schema.getPrimaryKey().get().getColumnNames(), tableDescriptor.getPartitionKeys()); + + if (indexKeyIndex == null) { + this.bucketKeyEncoder = keyEncoder; + } else { + this.bucketKeyEncoder = new KeyEncoder(schema.toRowType(), indexKeyIndex); + } } private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { @@ -111,8 +119,10 @@ private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { */ public CompletableFuture upsert(InternalRow row) { byte[] key = keyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder.encode(row); return send( - new WriteRecord(getPhysicalPath(row), WriteKind.PUT, key, key, row, targetColumns)); + new WriteRecord( + getPhysicalPath(row), WriteKind.PUT, key, bucketKey, row, targetColumns)); } /** @@ -124,8 +134,14 @@ public CompletableFuture upsert(InternalRow row) { */ public CompletableFuture delete(InternalRow row) { byte[] key = keyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder.encode(row); return send( new WriteRecord( - getPhysicalPath(row), WriteKind.DELETE, key, key, null, targetColumns)); + getPhysicalPath(row), + WriteKind.DELETE, + key, + bucketKey, + null, + targetColumns)); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java index cadccb37..226bccdb 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.client.utils; import com.alibaba.fluss.client.admin.OffsetSpec; +import com.alibaba.fluss.client.lookup.IndexLookupBatch; import com.alibaba.fluss.client.lookup.LookupBatch; import com.alibaba.fluss.client.table.lake.LakeTableSnapshotInfo; import com.alibaba.fluss.client.table.snapshot.BucketSnapshotInfo; @@ -43,12 +44,14 @@ import com.alibaba.fluss.rpc.messages.GetKvSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse; import com.alibaba.fluss.rpc.messages.LookupRequest; import com.alibaba.fluss.rpc.messages.MetadataRequest; import com.alibaba.fluss.rpc.messages.PbBucketSnapshot; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbKeyValue; import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket; import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo; @@ -185,6 +188,22 @@ public static LookupRequest makeLookupRequest( return request; } + public static IndexLookupRequest makeIndexLookupRequest( + long tableId, Collection lookupBatches) { + IndexLookupRequest request = new IndexLookupRequest().setTableId(tableId); + lookupBatches.forEach( + (batch) -> { + TableBucket tb = batch.tableBucket(); + PbIndexLookupReqForBucket pbIndexLookupReqForBucket = + request.addBucketsReq().setBucketId(tb.getBucket()); + if (tb.getPartitionId() != null) { + pbIndexLookupReqForBucket.setPartitionId(tb.getPartitionId()); + } + batch.lookups().forEach(get -> pbIndexLookupReqForBucket.addKey(get.key())); + }); + return request; + } + public static FetchLogResultForBucket getFetchLogResultForBucket( TableBucket tb, TablePath tp, PbFetchLogRespForBucket respForBucket) { FetchLogResultForBucket fetchLogResultForBucket; diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index f957e55c..e0d639c4 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.DatabaseNotExistException; import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidReplicationFactorException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.exception.SchemaNotExistException; @@ -198,6 +199,35 @@ void testCreateTableWithInvalidProperty() { .cause() .isInstanceOf(InvalidConfigException.class) .hasMessage("'table.log.tiered.local-segments' must be greater than 0."); + + TableDescriptor t4 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table") + // invalid property value + .property("table.index.key", "") + .build(); + // should throw exception + assertThatThrownBy(() -> admin.createTable(tablePath, t4, false).get()) + .cause() + .isInstanceOf(InvalidIndexKeysException.class) + .hasMessageContaining( + "Option 'table.index.key' = '' is invalid: " + + "There is an index key not follow the format 'indexKeyName=indexKeyFields'"); + TableDescriptor t5 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table") + // invalid property value + .property("table.index.key", "idx0=a;idx1=b") + .build(); + // should throw exception + assertThatThrownBy(() -> admin.createTable(tablePath, t5, false).get()) + .cause() + .isInstanceOf(InvalidIndexKeysException.class) + .hasMessageContaining( + "Option 'table.index.key' = 'idx0=a;idx1=b' is invalid: Currently, " + + "Fluss only support to define single index key, but there is more than one index key."); } @Test diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 09e16589..b6b4324f 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -83,7 +83,8 @@ void testPartitionedPrimaryKeyTable() throws Exception { InternalRow lookupRow = table.lookup(keyRow(schema, new Object[] {i, null, partition})) .get() - .getRow(); + .getRowList() + .get(0); assertThat(lookupRow).isEqualTo(actualRow); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3..3d39b8d9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; @@ -58,6 +59,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; @@ -215,6 +217,66 @@ void testPutAndLookup() throws Exception { verifyPutAndLookup(table2, schema, new Object[] {"a", 1}); } + @Test + void testPutAndIndexLookup() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_put_and_index_lookup_table"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .property("table.index.key", "idx0=a,b") + .build(); + createTable(tablePath, descriptor, false); + Table table = conn.getTable(tablePath); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 1L, "value1"}); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 2L, "value2"}); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 3L, "value3"}); + verifyPutAndLookup(table, schema, new Object[] {2, "a", 4L, "value4"}); + RowType rowType = schema.toRowType(); + + // test index lookup. + Schema indexKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + CompletableFuture result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {1, "a"})); + LookupResult lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + List rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(3); + for (int i = 0; i < rowList.size(); i++) { + assertRowValueEquals( + rowType, rowList.get(i), new Object[] {1, "a", i + 1L, "value" + (i + 1)}); + } + + result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {2, "a"})); + lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(1); + assertRowValueEquals(rowType, rowList.get(0), new Object[] {2, "a", 4L, "value4"}); + + result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {3, "a"})); + lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(0); + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); @@ -349,7 +411,7 @@ void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { // lookup this key. - return table.lookup(keyRow).get().getRow(); + return table.lookup(keyRow).get().getRowList().get(0); } @Test diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 12f842a5..9c61d418 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -734,6 +734,12 @@ public class ConfigOptions { .withDescription( "The maximum number of unacknowledged lookup requests for lookup operations."); + public static final ConfigOption CLIENT_LOOKUP_BATCH_TIMEOUT = + key("client.lookup.batch-timeout") + .durationType() + .defaultValue(Duration.ofMillis(100)) + .withDescription("The lookup batch timeout ms."); + public static final ConfigOption CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM = key("client.scanner.remote-log.prefetch-num") .intType() @@ -891,6 +897,18 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); + public static final ConfigOption TABLE_INDEX_KEY = + key("table.index.key") + .stringType() + .noDefaultValue() + .withDescription( + "The index key is used to build non-key secondary indexes on a pk table, " + + "enabling fast data queries. You can define multiple index keys, separated by ';'." + + " Each index key can specify a index name, like indexName=indexKeyFields. " + + " indexKeyFields support multiple fields, which can be nullable filed," + + " and fields within a key are separated by ','." + + " For example: 'index1=num_orders;index2=num_orders,total_amount'"); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java new file mode 100644 index 00000000..3bc38606 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * Try to build a table with some invalid index keys. like the column in index key not exist or the + * column is repeated, like (a, b) and (b, a). + * + * @since 0.3 + */ +@PublicEvolving +public class InvalidIndexKeysException extends ApiException { + public InvalidIndexKeysException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index d3737b93..c3ae9213 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; import com.alibaba.fluss.utils.json.JsonSerdeUtils; @@ -135,6 +136,78 @@ && getLogFormat() != LogFormat.ARROW) { } } + /** Validate the table descriptor. */ + public void validate() throws InvalidIndexKeysException { + validateIndexKeys(); + } + + private void validateIndexKeys() throws InvalidIndexKeysException { + Configuration conf = configuration(); + if (!conf.contains(ConfigOptions.TABLE_INDEX_KEY)) { + return; + } + + String indexKeysString = conf.get(ConfigOptions.TABLE_INDEX_KEY); + String indexKeysError = detectInvalidIndexKeys(indexKeysString); + if (indexKeysError != null) { + throw new InvalidIndexKeysException( + "Option 'table.index.key' = '" + + indexKeysString + + "' is invalid: " + + indexKeysError); + } + } + + private String detectInvalidIndexKeys(String indexKeysString) { + List columnNames = + schema.getColumns().stream() + .map(Schema.Column::getName) + .collect(Collectors.toList()); + String[] indexKeyStringList = indexKeysString.split(";"); + if (indexKeyStringList.length <= 0) { + return "Index key is empty or index key doesn't split by ';'."; + } else if (indexKeyStringList.length > 1) { + return "Currently, Fluss only support to define single index key, but there is more than one index key."; + } + + List orderedIndexKey = new ArrayList<>(); + for (String indexKeyFieldsStr : indexKeyStringList) { + String[] indexNameAndFields = indexKeyFieldsStr.split("="); + if (indexNameAndFields.length != 2) { + return "There is an index key not follow the format 'indexKeyName=indexKeyFields'."; + } + + String[] fieldStrings = indexNameAndFields[1].split(","); + if (fieldStrings.length <= 0) { + return "There is an index key is empty or column field in index key doesn't split by ','."; + } + + List indexKeyPosList = new ArrayList<>(); + for (String field : fieldStrings) { + int fieldIndex = columnNames.indexOf(field); + if (fieldIndex < 0) { + return "Index key '" + field + "' does not exist in the schema."; + } + indexKeyPosList.add(fieldIndex); + } + + Collections.sort(indexKeyPosList); + if (orderedIndexKey.stream() + .anyMatch( + f -> + Arrays.equals( + f, + indexKeyPosList.stream() + .mapToInt(Integer::intValue) + .toArray()))) { + return "There is an index key is duplicate."; + } + orderedIndexKey.add(indexKeyPosList.stream().mapToInt(Integer::intValue).toArray()); + } + + return null; + } + /** Creates a builder for building table descriptor. */ public static Builder builder() { return new Builder(); @@ -239,6 +312,32 @@ public int getTieredLogLocalSegments() { return configuration().get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS); } + /** Return a list of index key indexes in the table. */ + public Map getIndexKeyIndexes() { + Configuration conf = configuration(); + if (!conf.contains(ConfigOptions.TABLE_INDEX_KEY)) { + return Collections.emptyMap(); + } + + List columnNames = + schema.getColumns().stream() + .map(Schema.Column::getName) + .collect(Collectors.toList()); + + Map indexKeyIndexes = new HashMap<>(); + for (String indexKeyStr : conf.get(ConfigOptions.TABLE_INDEX_KEY).split(";")) { + String[] indexNameAndFields = indexKeyStr.split("="); + String indexKeyName = indexNameAndFields[0]; + List indexKeyList = new ArrayList<>(); + for (String field : indexNameAndFields[1].split(",")) { + indexKeyList.add(columnNames.indexOf(field)); + } + indexKeyIndexes.put( + indexKeyName, indexKeyList.stream().mapToInt(Integer::intValue).toArray()); + } + return indexKeyIndexes; + } + /** Whether the data lake is enabled. */ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java index c92e64df..700d21b5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java @@ -76,6 +76,10 @@ public class MetricNames { public static final String FAILED_PUT_KV_REQUESTS_RATE = "failedPutKvRequestsPerSecond"; public static final String TOTAL_LIMIT_SCAN_REQUESTS_RATE = "totalLimitScanRequestsPerSecond"; public static final String FAILED_LIMIT_SCAN_REQUESTS_RATE = "failedLimitScanRequestsPerSecond"; + public static final String TOTAL_INDEX_LOOKUP_REQUESTS_RATE = + "totalIndexLookupRequestsPerSecond"; + public static final String FAILED_INDEX_LOOKUP_REQUESTS_RATE = + "failedIndexLookupRequestsPerSecond"; // -------------------------------------------------------------------------------------------- // metrics for table bucket diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java index eed6b665..227da452 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java @@ -54,7 +54,7 @@ public static KeyEncoder createKeyEncoder( return new KeyEncoder(rowType, encodeColIndexes); } - protected KeyEncoder(RowType rowType) { + public KeyEncoder(RowType rowType) { this(rowType, IntStream.range(0, rowType.getFieldCount()).toArray()); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java index 6a609d86..fa1d6459 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java @@ -58,4 +58,20 @@ public static byte[] toArray(ByteBuffer buffer, int offset, int size) { } return dest; } + + /** + * Check if the given two byte arrays have the same prefix. + * + * @param bytes1 The first byte array + * @param bytes2 The second byte array + * @return true if the given two byte arrays have the same prefix, false otherwise + */ + public static boolean isPrefixEquals(byte[] bytes1, byte[] bytes2) { + for (int i = 0; i < bytes1.length; i++) { + if (bytes1[i] != bytes2[i]) { + return false; + } + } + return true; + } } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java index 8e8c67e6..eab22b9b 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java @@ -179,23 +179,33 @@ public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( public static KvRecordBatch genKvRecordBatch(List> keyAndValues) throws Exception { - return genKvRecordBatchWithWriterId(keyAndValues, NO_WRITER_ID, NO_BATCH_SEQUENCE); + return genKvRecordBatch(DATA1_KEY_TYPE, DATA1_ROW_TYPE, keyAndValues); + } + + public static KvRecordBatch genKvRecordBatch( + RowType keyType, RowType valueType, List> keyAndValues) + throws Exception { + return genKvRecordBatchWithWriterId( + keyAndValues, keyType, valueType, NO_WRITER_ID, NO_BATCH_SEQUENCE); } public static KvRecordBatch genKvRecordBatchWithWriterId( - List> keyAndValues, long writerId, int batchSequence) + List> keyAndValues, + RowType keyType, + RowType valueType, + long writerId, + int batchSequence) throws Exception { - KeyEncoder keyEncoder = new KeyEncoder(DATA1_ROW_TYPE, new int[] {0}); + KeyEncoder keyEncoder = new KeyEncoder(keyType); KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = KvRecordTestUtils.KvRecordBatchFactory.of(DEFAULT_SCHEMA_ID); KvRecordTestUtils.KvRecordFactory kvRecordFactory = - KvRecordTestUtils.KvRecordFactory.of(DATA1_ROW_TYPE); + KvRecordTestUtils.KvRecordFactory.of(valueType); List records = new ArrayList<>(); for (Tuple2 keyAndValue : keyAndValues) { records.add( kvRecordFactory.ofRecord( - keyEncoder.encode(row(DATA1_KEY_TYPE, keyAndValue.f0)), - keyAndValue.f1)); + keyEncoder.encode(row(keyType, keyAndValue.f0)), keyAndValue.f1)); } return kvRecordBatchFactory.ofRecords(records, writerId, batchSequence); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java index 3bc76cd2..1b964dc3 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java @@ -69,6 +69,18 @@ public class FlinkConnectorOptions { .defaultValue(true) .withDescription("Whether to set async lookup. Default is true."); + public static final ConfigOption INDEX_LOOKUP_KEY = + ConfigOptions.key("table.index.key") + .stringType() + .noDefaultValue() + .withDescription( + "The index key is used to build non-key secondary indexes on a pk table, " + + "enabling fast data queries. You can define multiple index keys, separated by ';'." + + " Each index key can specify a index name, like indexName=indexKeyFields. " + + " indexKeyFields support multiple fields, which can be nullable filed," + + " and fields within a key are separated by ','." + + " For example: 'index1=num_orders;index2=num_orders,total_amount'"); + // -------------------------------------------------------------------------------------------- // Scan specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index ddbc8a5f..671b8b86 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -76,13 +76,13 @@ public class FlinkCatalog implements Catalog { public static final String LAKE_TABLE_SPLITTER = "$lake"; - private final ClassLoader classLoader; + protected final ClassLoader classLoader; - private final String catalogName; - private final @Nullable String defaultDatabase; - private final String bootstrapServers; - private Connection connection; - private Admin admin; + protected final String catalogName; + protected final @Nullable String defaultDatabase; + protected final String bootstrapServers; + protected Connection connection; + protected Admin admin; private volatile @Nullable LakeCatalog lakeCatalog; @@ -272,7 +272,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } } - private CatalogBaseTable getLakeTable(String databaseName, String tableName) + protected CatalogBaseTable getLakeTable(String databaseName, String tableName) throws TableNotExistException, CatalogException { mayInitLakeCatalogCatalog(); String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); @@ -514,7 +514,7 @@ public void alterPartitionColumnStatistics( throw new UnsupportedOperationException(); } - private TablePath toTablePath(ObjectPath objectPath) { + protected TablePath toTablePath(ObjectPath objectPath) { return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName()); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index dbe033bc..a02761da 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -48,10 +48,13 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; +import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.getIndexKeys; import static org.apache.flink.configuration.ConfigOptions.key; /** Factory to create table source and table sink for Fluss. */ @@ -78,7 +81,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { == RuntimeExecutionMode.STREAMING; final ReadableConfig tableOptions = helper.getOptions(); - FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); + RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions, tableOutputType); ZoneId timeZone = FlinkConnectorOptionsUtils.getLocalTimeZone( @@ -90,8 +94,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes(); - RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); - // options for lookup LookupCache cache = null; @@ -106,6 +108,15 @@ public DynamicTableSource createDynamicTableSource(Context context) { throw new UnsupportedOperationException("Full lookup caching is not supported yet."); } + // get and validate index lookup key. + Map indexKeys = new HashMap<>(); + if (tableOptions.getOptional(FlinkConnectorOptions.INDEX_LOOKUP_KEY).isPresent()) { + indexKeys = + getIndexKeys( + tableOptions.get(FlinkConnectorOptions.INDEX_LOOKUP_KEY), + tableOutputType); + } + return new FlinkTableSource( toFlussTablePath(context.getObjectIdentifier()), toFlussClientConfig(helper.getOptions(), context.getConfiguration()), @@ -123,7 +134,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.get( key(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) .booleanType() - .defaultValue(false))); + .defaultValue(false)), + indexKeys); } @Override @@ -166,6 +178,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, + FlinkConnectorOptions.INDEX_LOOKUP_KEY, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 49ecb87f..bc6956c5 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -101,6 +101,7 @@ public class FlinkTableSource private final long scanPartitionDiscoveryIntervalMs; private final boolean isDataLakeEnabled; + private final Map indexKeys; // output type after projection pushdown private LogicalType producedDataType; @@ -130,7 +131,8 @@ public FlinkTableSource( boolean lookupAsync, @Nullable LookupCache cache, long scanPartitionDiscoveryIntervalMs, - boolean isDataLakeEnabled) { + boolean isDataLakeEnabled, + Map indexKeys) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -146,6 +148,7 @@ public FlinkTableSource( this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.isDataLakeEnabled = isDataLakeEnabled; + this.indexKeys = indexKeys; } @Override @@ -279,14 +282,20 @@ public boolean isBounded() { public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { LookupNormalizer lookupNormalizer = LookupNormalizer.validateAndCreateLookupNormalizer( - context.getKeys(), primaryKeyIndexes, tableOutputType, projectedFields); + context.getKeys(), + primaryKeyIndexes, + indexKeys, + tableOutputType, + projectedFields); if (lookupAsync) { AsyncLookupFunction asyncLookupFunction = new FlinkAsyncLookupFunction( flussConfig, tablePath, tableOutputType, - primaryKeyIndexes, + lookupNormalizer.getIndexName() == null + ? primaryKeyIndexes + : indexKeys.get(lookupNormalizer.getIndexName()), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -301,7 +310,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { flussConfig, tablePath, tableOutputType, - primaryKeyIndexes, + lookupNormalizer.getIndexName() == null + ? primaryKeyIndexes + : indexKeys.get(lookupNormalizer.getIndexName()), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -328,7 +339,8 @@ public DynamicTableSource copy() { lookupAsync, cache, scanPartitionDiscoveryIntervalMs, - isDataLakeEnabled); + isDataLakeEnabled, + indexKeys); source.producedDataType = producedDataType; source.projectedFields = projectedFields; source.singleRowFilter = singleRowFilter; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index 87f38d07..cc466c83 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -18,7 +18,7 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.table.LookupResult; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.RemainingFilter; @@ -40,8 +40,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; /** A flink async lookup function for fluss. */ @@ -55,7 +56,7 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] pkIndexes; + private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -68,7 +69,7 @@ public FlinkAsyncLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] pkIndexes, + int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -76,17 +77,17 @@ public FlinkAsyncLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.pkIndexes = pkIndexes; + this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; } - private RowType toPkRowType(RowType rowType, int[] pkIndex) { - LogicalType[] types = new LogicalType[pkIndex.length]; - String[] names = new String[pkIndex.length]; - for (int i = 0; i < pkIndex.length; i++) { - types[i] = rowType.getTypeAt(pkIndex[i]); - names[i] = rowType.getFieldNames().get(pkIndex[i]); + private RowType toLookupKeyRowType(RowType rowType, int[] lookupKeyIndex) { + LogicalType[] types = new LogicalType[lookupKeyIndex.length]; + String[] names = new String[lookupKeyIndex.length]; + for (int i = 0; i < lookupKeyIndex.length; i++) { + types[i] = rowType.getTypeAt(lookupKeyIndex[i]); + names[i] = rowType.getFieldNames().get(lookupKeyIndex[i]); } return RowType.of(rowType.isNullable(), types, names); } @@ -99,7 +100,8 @@ public void open(FunctionContext context) { // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat()); + toLookupKeyRowType(flinkRowType, lookupKeyIndexes), + table.getDescriptor().getKvFormat()); flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType)); LOG.info("end open."); @@ -117,7 +119,7 @@ public CompletableFuture> asyncLookup(RowData keyRow) { InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); CompletableFuture> future = new CompletableFuture<>(); // fetch result - fetchResult(future, 0, flussKeyRow, remainingFilter); + fetchResult(future, 0, flussKeyRow, remainingFilter, lookupNormalizer.getIndexName()); return future; } @@ -128,13 +130,21 @@ public CompletableFuture> asyncLookup(RowData keyRow) { * @param currentRetry Current number of retries. * @param keyRow the key row to get. * @param remainingFilter the nullable remaining filter to filter the result. + * @param indexName the index name to get. */ private void fetchResult( CompletableFuture> resultFuture, int currentRetry, InternalRow keyRow, - @Nullable RemainingFilter remainingFilter) { - CompletableFuture responseFuture = table.lookup(keyRow); + @Nullable RemainingFilter remainingFilter, + @Nullable String indexName) { + CompletableFuture responseFuture; + if (indexName == null) { + responseFuture = table.lookup(keyRow); + } else { + responseFuture = table.indexLookup(indexName, keyRow); + } + responseFuture.whenComplete( (result, throwable) -> { if (throwable != null) { @@ -163,24 +173,24 @@ private void fetchResult( resultFuture.completeExceptionally(e1); } fetchResult( - resultFuture, currentRetry + 1, keyRow, remainingFilter); + resultFuture, + currentRetry + 1, + keyRow, + remainingFilter, + indexName); } } } else { - InternalRow row = result.getRow(); - if (row == null) { - resultFuture.complete(Collections.emptyList()); - } else { - // TODO: we can project fluss row first, - // to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); - if (remainingFilter != null && !remainingFilter.isMatch(flinkRow)) { - resultFuture.complete(Collections.emptyList()); - } else { - resultFuture.complete( - Collections.singletonList(maybeProject(flinkRow))); + List projectedRow = new ArrayList<>(); + for (InternalRow row : result.getRowList()) { + if (row != null) { + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRow.add(maybeProject(flinkRow)); + } } } + resultFuture.complete(projectedRow); } }); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 2e28e856..b52984cc 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -37,8 +37,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; /** A flink lookup function for fluss. */ public class FlinkLookupFunction extends LookupFunction { @@ -50,7 +52,7 @@ public class FlinkLookupFunction extends LookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] pkIndexes; + private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -64,7 +66,7 @@ public FlinkLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] pkIndexes, + int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -72,17 +74,17 @@ public FlinkLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.pkIndexes = pkIndexes; + this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; } - private RowType toPkRowType(RowType rowType, int[] pkIndex) { - LogicalType[] types = new LogicalType[pkIndex.length]; - String[] names = new String[pkIndex.length]; - for (int i = 0; i < pkIndex.length; i++) { - types[i] = rowType.getTypeAt(pkIndex[i]); - names[i] = rowType.getFieldNames().get(pkIndex[i]); + private RowType toLookupKeyRowType(RowType rowType, int[] lookupKeyIndex) { + LogicalType[] types = new LogicalType[lookupKeyIndex.length]; + String[] names = new String[lookupKeyIndex.length]; + for (int i = 0; i < lookupKeyIndex.length; i++) { + types[i] = rowType.getTypeAt(lookupKeyIndex[i]); + names[i] = rowType.getFieldNames().get(lookupKeyIndex[i]); } return RowType.of(rowType.isNullable(), types, names); } @@ -98,7 +100,8 @@ public void open(FunctionContext context) { // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat()); + toLookupKeyRowType(flinkRowType, lookupKeyIndexes), + table.getDescriptor().getKvFormat()); flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType)); LOG.info("end open."); @@ -115,6 +118,7 @@ public Collection lookup(RowData keyRow) { RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow); LookupNormalizer.RemainingFilter remainingFilter = lookupNormalizer.createRemainingFilter(keyRow); + String indexName = lookupNormalizer.getIndexName(); // to lookup a key, we will need to do two data conversion, // first is converting from flink row to fluss row, // second is extracting key from the fluss row when calling method table.get(flussKeyRow) @@ -122,16 +126,26 @@ public Collection lookup(RowData keyRow) { InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); for (int retry = 0; retry <= maxRetryTimes; retry++) { try { - InternalRow row = table.lookup(flussKeyRow).get().getRow(); - if (row != null) { - // TODO: we can project fluss row first, to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); - if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - return Collections.singletonList(maybeProject(flinkRow)); - } else { - return Collections.emptyList(); + List projectedRows = new ArrayList<>(); + List lookupRows; + if (indexName == null) { + lookupRows = table.lookup(flussKeyRow).get().getRowList(); + } else { + lookupRows = table.indexLookup(indexName, flussKeyRow).get().getRowList(); + } + + for (InternalRow row : lookupRows) { + if (row != null) { + // TODO: we can project fluss row first, to avoid deserialize unnecessary + // fields + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRows.add(maybeProject(flinkRow)); + } } } + + return projectedRows; } catch (Exception e) { LOG.error(String.format("Fluss lookup error, retry times = %d", retry), e); if (retry >= maxRetryTimes) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java index 21b1c120..75ed9c1c 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -50,7 +51,11 @@ public class LookupNormalizer implements Serializable { private static final long serialVersionUID = 1L; - public static final LookupNormalizer NOOP_NORMALIZER = new LookupNormalizer(null, null, null); + public static final LookupNormalizer NOOP_NORMALIZER = + new LookupNormalizer(null, null, null, null); + + /** The index name. */ + @Nullable private final String indexName; /** Mapping from normalized key index to the lookup key index (in the lookup row). */ @Nullable private final FieldGetter[] normalizedKeyGetters; @@ -62,9 +67,11 @@ public class LookupNormalizer implements Serializable { @Nullable private final FieldGetter[] resultFieldGetters; private LookupNormalizer( + @Nullable String indexName, @Nullable FieldGetter[] normalizedKeyGetters, @Nullable FieldGetter[] conditionFieldGetters, @Nullable FieldGetter[] resultFieldGetters) { + this.indexName = indexName; this.normalizedKeyGetters = normalizedKeyGetters; this.conditionFieldGetters = conditionFieldGetters; this.resultFieldGetters = resultFieldGetters; @@ -76,6 +83,10 @@ private LookupNormalizer( } } + public @Nullable String getIndexName() { + return indexName; + } + public RowData normalizeLookupKey(RowData lookupKey) { if (normalizedKeyGetters == null) { return lookupKey; @@ -143,22 +154,16 @@ public boolean fieldMatches(RowData result) { public static LookupNormalizer validateAndCreateLookupNormalizer( int[][] lookupKeyIndexes, int[] primaryKeys, + Map indexKeys, RowType schema, @Nullable int[] projectedFields) { - if (primaryKeys.length == 0) { + if (primaryKeys.length == 0 && indexKeys == null) { throw new UnsupportedOperationException( - "Fluss lookup function only support lookup table with primary key."); + "Fluss lookup function only support lookup table with primary key or index key."); } - // we compare string names rather than int index for better error message and readability, - // the length of lookup key and primary key shouldn't be large, so the overhead is low. - String[] columnNames = schema.getFieldNames().toArray(new String[0]); - String[] primaryKeyNames = - Arrays.stream(primaryKeys).mapToObj(i -> columnNames[i]).toArray(String[]::new); - // get the lookup keys int[] lookupKeys = new int[lookupKeyIndexes.length]; - String[] lookupKeyNames = new String[lookupKeyIndexes.length]; - for (int i = 0; i < lookupKeyNames.length; i++) { + for (int i = 0; i < lookupKeys.length; i++) { int[] innerKeyArr = lookupKeyIndexes[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "Do not support nested lookup keys"); @@ -169,25 +174,48 @@ public static LookupNormalizer validateAndCreateLookupNormalizer( } else { lookupKeys[i] = innerKeyArr[0]; } - lookupKeyNames[i] = columnNames[innerKeyArr[0]]; } - if (Arrays.equals(lookupKeys, primaryKeys)) { - return NOOP_NORMALIZER; + String indexName = getIndexName(lookupKeys, primaryKeys, indexKeys); + if (indexName == null) { + return createLookupNormalizer(lookupKeys, null, primaryKeys, schema); + } else { + return createLookupNormalizer(lookupKeys, indexName, indexKeys.get(indexName), schema); + } + } + + /** create a {@link LookupNormalizer}. */ + private static LookupNormalizer createLookupNormalizer( + int[] lookupKeys, @Nullable String indexName, int[] keys, RowType schema) { + // we compare string names rather than int index for better error message and readability, + // the length of lookup key and keys (primary key or index key) shouldn't be large, so the + // overhead is low. + String[] columnNames = schema.getFieldNames().toArray(new String[0]); + String[] keyNames = + Arrays.stream(keys).mapToObj(i -> columnNames[i]).toArray(String[]::new); + + // get the lookup keys + String[] lookupKeyNames = new String[lookupKeys.length]; + for (int i = 0; i < lookupKeyNames.length; i++) { + lookupKeyNames[i] = columnNames[lookupKeys[i]]; + } + + if (Arrays.equals(lookupKeys, keys)) { + return new LookupNormalizer(indexName, null, null, null); } - FieldGetter[] normalizedKeyGetters = new FieldGetter[primaryKeys.length]; - for (int i = 0; i < primaryKeyNames.length; i++) { - LogicalType fieldType = schema.getTypeAt(primaryKeys[i]); - int lookupKeyIndex = findIndex(lookupKeyNames, primaryKeyNames[i]); + FieldGetter[] normalizedKeyGetters = new FieldGetter[keys.length]; + for (int i = 0; i < keyNames.length; i++) { + LogicalType fieldType = schema.getTypeAt(keys[i]); + int lookupKeyIndex = findIndex(lookupKeyNames, keyNames[i]); normalizedKeyGetters[i] = RowData.createFieldGetter(fieldType, lookupKeyIndex); } - Set primaryKeySet = Arrays.stream(primaryKeys).boxed().collect(Collectors.toSet()); + Set keySet = Arrays.stream(keys).boxed().collect(Collectors.toSet()); List conditionFieldGetters = new ArrayList<>(); List resultFieldGetters = new ArrayList<>(); for (int i = 0; i < lookupKeys.length; i++) { - if (!primaryKeySet.contains(i)) { + if (!keySet.contains(i)) { LogicalType fieldType = schema.getTypeAt(lookupKeys[i]); conditionFieldGetters.add(RowData.createFieldGetter(fieldType, i)); resultFieldGetters.add(RowData.createFieldGetter(fieldType, lookupKeys[i])); @@ -195,6 +223,7 @@ public static LookupNormalizer validateAndCreateLookupNormalizer( } return new LookupNormalizer( + indexName, normalizedKeyGetters, conditionFieldGetters.toArray(new FieldGetter[0]), resultFieldGetters.toArray(new FieldGetter[0])); @@ -213,4 +242,51 @@ private static int findIndex(String[] columnNames, String key) { + "' in lookup keys " + Arrays.toString(columnNames)); } + + private static @Nullable String getIndexName( + int[] lookupKeys, int[] primaryKeys, Map indexKeys) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("lookupKeys: " + Arrays.toString(lookupKeys) + "\n"); + stringBuilder.append("primaryKeys: " + Arrays.toString(primaryKeys) + "\n"); + for (Map.Entry entry : indexKeys.entrySet()) { + int[] copyIndexKey = entry.getValue(); + stringBuilder.append( + "indexKey and value: " + + entry.getKey() + + ":" + + Arrays.toString(copyIndexKey) + + "\n"); + } + + List lookupKeyList = + Arrays.stream(lookupKeys).boxed().collect(Collectors.toList()); + List primaryKeyList = + Arrays.stream(primaryKeys).boxed().collect(Collectors.toList()); + if (lookupKeyList.size() >= primaryKeyList.size()) { + boolean isSubset = true; + for (Integer primaryKey : primaryKeyList) { + if (!lookupKeyList.contains(primaryKey)) { + isSubset = false; + break; + } + } + if (isSubset) { + return null; + } + } + + for (Map.Entry entry : indexKeys.entrySet()) { + int[] indexKey = entry.getValue(); + int[] copyIndexKey = indexKey.clone(); + Arrays.sort(copyIndexKey); + Arrays.sort(lookupKeys); + if (Arrays.equals(lookupKeys, copyIndexKey)) { + return entry.getKey(); + } + } + + throw new UnsupportedOperationException( + "There is no index key or primary key that matches the lookup keys. info: " + + stringBuilder); + } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java index 4a17ac9a..a8ec9877 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java @@ -22,11 +22,19 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.types.logical.RowType; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.INDEX_LOOKUP_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP; @@ -43,8 +51,10 @@ public static ZoneId getLocalTimeZone(String timeZone) { : ZoneId.of(timeZone); } - public static void validateTableSourceOptions(ReadableConfig tableOptions) { + public static void validateTableSourceOptions( + ReadableConfig tableOptions, RowType tableOutputType) { validateScanStartupMode(tableOptions); + validateIndexKeys(tableOptions, tableOutputType); } public static StartupOptions getStartupOptions(ReadableConfig tableOptions, ZoneId timeZone) { @@ -73,6 +83,81 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) { } } + private static void validateIndexKeys(ReadableConfig tableOptions, RowType tableOutputType) { + if (!tableOptions.getOptional(INDEX_LOOKUP_KEY).isPresent()) { + return; + } + + List columnNames = tableOutputType.getFieldNames(); + + String indexKeysString = tableOptions.get(INDEX_LOOKUP_KEY); + String indexKeysError = detectInvalidIndexKeys(indexKeysString, columnNames); + if (indexKeysError != null) { + throw new ValidationException( + "Option 'index.key' = '" + indexKeysString + "' is invalid: " + indexKeysError); + } + } + + private static String detectInvalidIndexKeys(String indexKeysString, List columnNames) { + String[] indexKeyStringList = indexKeysString.split(";"); + if (indexKeyStringList.length <= 0) { + return "Index key is empty or index key doesn't split by ';'."; + } + + List orderedIndexKey = new ArrayList<>(); + for (String indexKeyFieldsStr : indexKeyStringList) { + String[] indexNameAndFields = indexKeyFieldsStr.split("="); + if (indexNameAndFields.length != 2) { + return "There is an index key not follow the format 'indexKeyName=indexKeyFields'."; + } + + String[] fieldStrings = indexNameAndFields[1].split(","); + if (fieldStrings.length <= 0) { + return "There is an index key is empty or column field in index key doesn't split by ','."; + } + + List indexKeyPosList = new ArrayList<>(); + for (String field : fieldStrings) { + int fieldIndex = columnNames.indexOf(field); + if (fieldIndex < 0) { + return "Index key '" + field + "' does not exist in the schema."; + } + indexKeyPosList.add(fieldIndex); + } + + Collections.sort(indexKeyPosList); + if (orderedIndexKey.stream() + .anyMatch( + f -> + Arrays.equals( + f, + indexKeyPosList.stream() + .mapToInt(Integer::intValue) + .toArray()))) { + return "There is an index key is duplicate."; + } + orderedIndexKey.add(indexKeyPosList.stream().mapToInt(Integer::intValue).toArray()); + } + + return null; + } + + public static Map getIndexKeys(String indexKeyString, RowType rowType) { + List columnNames = rowType.getFieldNames(); + Map indexKeyIndexes = new HashMap<>(); + for (String indexKeyStr : indexKeyString.split(";")) { + String[] indexNameAndFields = indexKeyStr.split("="); + String indexKeyName = indexNameAndFields[0]; + List indexKeyList = new ArrayList<>(); + for (String field : indexNameAndFields[1].split(",")) { + indexKeyList.add(columnNames.indexOf(field)); + } + indexKeyIndexes.put( + indexKeyName, indexKeyList.stream().mapToInt(Integer::intValue).toArray()); + } + return indexKeyIndexes; + } + /** * Parses timestamp String to Long. * diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java index 0f038fd8..a739a71f 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java @@ -50,6 +50,7 @@ import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER; +import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.INDEX_LOOKUP_KEY; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; /** Utils for conversion between Flink and Fluss. */ @@ -213,6 +214,11 @@ public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) { // then set distributed by information List bucketKey; if (flinkTableConf.containsKey(BUCKET_KEY.key())) { + if (flinkTableConf.containsKey(INDEX_LOOKUP_KEY.key())) { + throw new CatalogException( + "Bucket key and table index key cannot be set at the same time."); + } + bucketKey = Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(",")) .map(String::trim) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java index 543dfab1..f77cee8c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java @@ -19,6 +19,8 @@ import com.alibaba.fluss.rpc.RpcGateway; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.messages.FetchLogResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -99,6 +101,14 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.LOOKUP) CompletableFuture lookup(LookupRequest request); + /** + * Index lookup to get value by index key. + * + * @return Index lookup response. + */ + @RPC(api = ApiKeys.INDEX_LOOKUP) + CompletableFuture indexLookup(IndexLookupRequest request); + /** * Get limit number of values from the specified table bucket. * diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 949917b2..a5c6160d 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -61,7 +61,8 @@ public enum ApiKeys { NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE), DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC), GET_LAKE_TABLE_SNAPSHOT(1033, 0, 0, PUBLIC), - LIMIT_SCAN(1034, 0, 0, PUBLIC); + LIMIT_SCAN(1034, 0, 0, PUBLIC), + INDEX_LOOKUP(1035, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 11ad3195..1601276c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidCoordinatorException; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidReplicationFactorException; import com.alibaba.fluss.exception.InvalidRequiredAcksException; import com.alibaba.fluss.exception.InvalidTableException; @@ -174,8 +175,8 @@ public enum Errors { INVALID_TIMESTAMP_EXCEPTION(38, "The timestamp is invalid.", InvalidTimestampException::new), INVALID_CONFIG_EXCEPTION(39, "The config is invalid.", InvalidConfigException::new), LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION( - 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new); - ; + 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new), + INVALID_INDEX_KEYS_EXCEPTION(41, "The index keys is invalid", InvalidIndexKeysException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 4c68064a..96fc1e9d 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -202,6 +202,16 @@ message LookupResponse { repeated PbLookupRespForBucket buckets_resp = 1; } +// Index Lookup request and response +message IndexLookupRequest { + required int64 table_id = 1; + repeated PbIndexLookupReqForBucket buckets_req = 2; +} + +message IndexLookupResponse { + repeated PbIndexLookupRespForBucket buckets_resp = 1; +} + // limit scan request and response message LimitScanRequest { @@ -550,6 +560,27 @@ message PbValue { optional bytes values = 1; } +message PbIndexLookupReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated bytes keys = 3; +} + +message PbIndexData { + required string index_name = 1; + required bytes index_key = 2; +} + +message PbIndexLookupRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated PbIndexLookupRespForKey keys_resp = 3; +} + +message PbIndexLookupRespForKey { + repeated bytes values = 1; +} + message PbTableBucket { required int64 table_id = 1; optional int64 partition_id = 2; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 0271b5c2..321c5c8f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.fs.FileSystem; import com.alibaba.fluss.metadata.Schema; @@ -133,6 +134,11 @@ public CompletableFuture createTable(CreateTableRequest req } TableDescriptor tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson()); + try { + tableDescriptor.validate(); + } catch (InvalidIndexKeysException e) { + return FutureUtils.failedFuture(e); + } int bucketCount = defaultBucketNumber; // not set distribution diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 7dc740ce..3615f071 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -470,6 +470,15 @@ public List multiGet(List keys) throws IOException { }); } + public List indexLookup(byte[] indexKey) throws IOException { + return inReadLock( + kvLock, + () -> { + rocksDBKv.checkIfRocksDBClosed(); + return rocksDBKv.indexLookup(indexKey); + }); + } + public List limitScan(int limit) throws IOException { return inReadLock( kvLock, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java index aa71749e..61be998b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java @@ -35,6 +35,8 @@ import java.util.ArrayList; import java.util.List; +import static com.alibaba.fluss.utils.BytesUtils.isPrefixEquals; + /** A wrapper for the operation of {@link org.rocksdb.RocksDB}. */ public class RocksDBKv implements AutoCloseable { @@ -56,7 +58,7 @@ public class RocksDBKv implements AutoCloseable { * {@link RocksDB#open(String)} is different from that by {@link * RocksDB#getDefaultColumnFamily()}, probably it's a bug of RocksDB java API. */ - private final ColumnFamilyHandle defaultColumnFamily; + private final ColumnFamilyHandle defaultColumnFamilyHandle; /** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */ protected final RocksDB db; @@ -73,7 +75,7 @@ public RocksDBKv( this.db = db; this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); - this.defaultColumnFamily = defaultColumnFamilyHandle; + this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; } public ResourceGuard getResourceGuard() { @@ -100,10 +102,28 @@ public List multiGet(List keys) throws IOException { } } + public List indexLookup(byte[] indexKey) throws IOException { + List pkList = new ArrayList<>(); + ReadOptions readOptions = new ReadOptions(); + RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); + try { + iterator.seek(indexKey); + while (iterator.isValid() && isPrefixEquals(indexKey, iterator.key())) { + pkList.add(iterator.value()); + iterator.next(); + } + } finally { + readOptions.close(); + iterator.close(); + } + + return pkList; + } + public List limitScan(Integer limit) { List pkList = new ArrayList<>(); ReadOptions readOptions = new ReadOptions(); - RocksIterator iterator = db.newIterator(defaultColumnFamily, readOptions); + RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); int count = 0; iterator.seekToFirst(); @@ -165,8 +185,8 @@ public void close() throws Exception { // Start with default CF ... List columnFamilyOptions = new ArrayList<>(); RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( - columnFamilyOptions, defaultColumnFamily); - IOUtils.closeQuietly(defaultColumnFamily); + columnFamilyOptions, defaultColumnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamilyHandle); // ... and finally close the DB instance ... IOUtils.closeQuietly(db); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java index 18b2ee6c..5deedab1 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java @@ -201,6 +201,22 @@ public Counter failedLimitScanRequests() { } } + public Counter totalIndexLookupRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.totalIndexLookupRequests; + } + } + + public Counter failedIndexLookupRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.failedIndexLookupRequests; + } + } + // ------------------------------------------------------------------------ // bucket groups // ------------------------------------------------------------------------ @@ -326,6 +342,8 @@ private static class KvMetricGroup extends TabletMetricGroup { private final Counter failedPutKvRequests; private final Counter totalLimitScanRequests; private final Counter failedLimitScanRequests; + private final Counter totalIndexLookupRequests; + private final Counter failedIndexLookupRequests; public KvMetricGroup(PhysicalTableMetricGroup physicalTableMetricGroup) { super(physicalTableMetricGroup, TabletType.KV); @@ -349,6 +367,16 @@ public KvMetricGroup(PhysicalTableMetricGroup physicalTableMetricGroup) { meter( MetricNames.FAILED_LIMIT_SCAN_REQUESTS_RATE, new MeterView(failedLimitScanRequests)); + + // for index lookup request + totalIndexLookupRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.TOTAL_INDEX_LOOKUP_REQUESTS_RATE, + new MeterView(totalIndexLookupRequests)); + failedIndexLookupRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.FAILED_INDEX_LOOKUP_REQUESTS_RATE, + new MeterView(failedIndexLookupRequests)); } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 337b3f50..430a8420 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -167,6 +167,7 @@ public final class Replica { private final long logTTLMs; private final boolean dataLakeEnabled; private final int tieredLogLocalSegments; + private final Map indexKeyIndexes; private final AtomicReference leaderReplicaIdOpt = new AtomicReference<>(); private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock(); @@ -228,6 +229,7 @@ public Replica( this.dataLakeEnabled = tableDescriptor.isDataLakeEnabled(); this.tieredLogLocalSegments = tableDescriptor.getTieredLogLocalSegments(); this.partitionKeys = tableDescriptor.getPartitionKeys(); + this.indexKeyIndexes = tableDescriptor.getIndexKeyIndexes(); this.snapshotContext = snapshotContext; // create a closeable registry for the replica this.closeableRegistry = new CloseableRegistry(); @@ -307,6 +309,10 @@ public long getLogTTLMs() { return logTTLMs; } + public boolean supportIndexLookup() { + return indexKeyIndexes != null && !indexKeyIndexes.isEmpty(); + } + public int writerIdCount() { return logTablet.getWriterIdCount(); } @@ -1028,6 +1034,38 @@ public List lookups(List keys) { }); } + public List indexLookup(byte[] indexKey) { + if (!isKvTable()) { + throw new NonPrimaryKeyTableException( + "Try to do index lookup on a non primary key table: " + getTablePath()); + } + + return inReadLock( + leaderIsrUpdateLock, + () -> { + try { + if (!isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader not local for bucket %s on tabletServer %d", + tableBucket, localTabletServerId)); + } + checkNotNull( + kvTablet, "KvTablet for the replica to get key shouldn't be null."); + // the index key is serialized by index name + index key fields, see + // IndexKeyEncoder. + return kvTablet.indexLookup(indexKey); + } catch (IOException e) { + String errorMsg = + String.format( + "Failed to do index lookup from local kv for table bucket %s, the cause is: %s", + tableBucket, e.getMessage()); + LOG.error(errorMsg, e); + throw new KvStorageException(errorMsg, e); + } + }); + } + public DefaultValueRecordBatch limitKvScan(int limit) { if (!isKvTable()) { throw new NonPrimaryKeyTableException( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 22edc5cc..0236c373 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.InvalidCoordinatorException; import com.alibaba.fluss.exception.InvalidRequiredAcksException; +import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.exception.LogOffsetOutOfRangeException; import com.alibaba.fluss.exception.LogStorageException; import com.alibaba.fluss.exception.NotLeaderOrFollowerException; @@ -50,9 +51,12 @@ import com.alibaba.fluss.rpc.entity.PutKvResultForBucket; import com.alibaba.fluss.rpc.entity.WriteResultForBucket; import com.alibaba.fluss.rpc.gateway.CoordinatorGateway; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse; import com.alibaba.fluss.rpc.messages.NotifyLakeTableOffsetResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.protocol.ApiError; import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.coordinator.CoordinatorContext; @@ -456,6 +460,50 @@ public void multiLookupValues( responseCallback.accept(lookupResultForBucketMap); } + /** Lookup by index keys on kv store. */ + public void indexLookup( + Map> entriesPerBucket, + Consumer responseCallback) { + IndexLookupResponse response = new IndexLookupResponse(); + PhysicalTableMetricGroup tableMetrics = null; + List resultForAll = new ArrayList<>(); + for (Map.Entry> entry : entriesPerBucket.entrySet()) { + TableBucket tb = entry.getKey(); + PbIndexLookupRespForBucket respForBucket = new PbIndexLookupRespForBucket(); + respForBucket.setBucketId(tb.getBucket()); + try { + Replica replica = getReplicaOrException(tb); + if (!replica.supportIndexLookup()) { + throw new KvStorageException( + "Table bucket " + tb + " does not support index lookup"); + } + + tableMetrics = replica.tableMetrics(); + tableMetrics.totalIndexLookupRequests().inc(); + List keyResultList = new ArrayList<>(); + for (byte[] indexKey : entry.getValue()) { + PbIndexLookupRespForKey pbIndexLookupRespForKey = new PbIndexLookupRespForKey(); + for (byte[] result : replica.indexLookup(indexKey)) { + pbIndexLookupRespForKey.addValue(result); + } + keyResultList.add(pbIndexLookupRespForKey); + } + respForBucket.addAllKeysResps(keyResultList); + } catch (Exception e) { + LOG.error("Error processing index lookup operation on replica {}", tb, e); + if (tableMetrics != null) { + tableMetrics.failedIndexLookupRequests().inc(); + } + throw e; + } + + resultForAll.add(respForBucket); + } + + response.addAllBucketsResps(resultForAll); + responseCallback.accept(response); + } + public void listOffsets( ListOffsetsParam listOffsetsParam, Set tableBuckets, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java index 085c80b7..f246d59e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java @@ -25,6 +25,8 @@ import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.messages.FetchLogResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -62,6 +64,7 @@ import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeLookupResponse; +import static com.alibaba.fluss.server.utils.RpcMessageUtils.toIndexLookupData; import static com.alibaba.fluss.server.utils.RpcMessageUtils.toLookupData; /** An RPC Gateway service for tablet server. */ @@ -142,6 +145,13 @@ public CompletableFuture lookup(LookupRequest request) { return response; } + @Override + public CompletableFuture indexLookup(IndexLookupRequest request) { + CompletableFuture response = new CompletableFuture<>(); + replicaManager.indexLookup(toIndexLookupData(request), response::complete); + return response; + } + @Override public CompletableFuture limitScan(LimitScanRequest request) { CompletableFuture response = new CompletableFuture<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java index 69d626a5..5a31b64d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java @@ -52,6 +52,7 @@ import com.alibaba.fluss.rpc.messages.GetKvSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; @@ -72,6 +73,7 @@ import com.alibaba.fluss.rpc.messages.PbFetchLogReqForTable; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbKeyValue; import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket; import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo; @@ -602,6 +604,28 @@ public static Map> toLookupData(LookupRequest lookupRe return lookupEntryData; } + public static Map> toIndexLookupData( + IndexLookupRequest indexLookupRequest) { + long tableId = indexLookupRequest.getTableId(); + Map> lookupEntryData = new HashMap<>(); + for (PbIndexLookupReqForBucket lookupReqForBucket : + indexLookupRequest.getBucketsReqsList()) { + TableBucket tb = + new TableBucket( + tableId, + lookupReqForBucket.hasPartitionId() + ? lookupReqForBucket.getPartitionId() + : null, + lookupReqForBucket.getBucketId()); + List keys = new ArrayList<>(lookupReqForBucket.getKeysCount()); + for (int i = 0; i < lookupReqForBucket.getKeysCount(); i++) { + keys.add(lookupReqForBucket.getKeyAt(i)); + } + lookupEntryData.put(tb, keys); + } + return lookupEntryData; + } + public static @Nullable int[] getTargetColumns(PutKvRequest putKvRequest) { int[] targetColumns = putKvRequest.getTargetColumns(); return targetColumns.length == 0 ? null : targetColumns; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java index 168d9e6e..1a9a08c3 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.log.remote; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.remote.RemoteLogFetchInfo; @@ -39,7 +40,9 @@ import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; +import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH; import static com.alibaba.fluss.utils.FlussPaths.remoteLogDir; import static com.alibaba.fluss.utils.FlussPaths.remoteLogTabletDir; @@ -412,7 +415,14 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception { @ValueSource(booleans = {true, false}) void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Exception { int tieredLogLocalSegments = 8; - long tableId = registerTableInZkClient(tieredLogLocalSegments); + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 200L, + Collections.singletonMap( + ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), + String.valueOf(tieredLogLocalSegments))); TableBucket tb = makeTableBucket(tableId, partitionedTable); // make leader, and then remote log tablet should be created. diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index ac0c81a1..4068ee74 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.replica; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.InvalidRequiredAcksException; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; @@ -68,9 +69,11 @@ import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_KEY_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; +import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; @@ -438,7 +441,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data1, 100L, 0)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 0)), null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5)); @@ -475,7 +481,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data2, 100L, 3)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 3)), null, future::complete); PutKvResultForBucket putKvResultForBucket = future.get().get(0); @@ -508,7 +517,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data3, 100L, 1)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 1)), null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -563,7 +575,7 @@ void testLookup() throws Exception { byte[] key3Bytes = keyEncoder.encode(row(DATA1_KEY_TYPE, key3)); verifyLookup(tb, key3Bytes, null); - // Get key from none pk table. + // Lookup from none pk table. TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1); makeLogTableAsLeader(tb2.getBucket()); replicaManager.multiLookupValues( @@ -578,6 +590,20 @@ void testLookup() throws Exception { }); } + @Test + void testIndexLookup() throws Exception { + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH_PK, + DATA1_SCHEMA_PK, + DATA1_TABLE_ID_PK, + Collections.singletonMap(ConfigOptions.TABLE_INDEX_KEY.key(), "idx0=b")); + TableBucket tb = new TableBucket(tableId, 0); + makeKvTableAsLeader(tb.getBucket()); + + // TODO + } + @Test void testLimitScanPrimaryKeyTable() throws Exception { TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java index bbb78319..b88da6fd 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java @@ -23,8 +23,10 @@ import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.MemoryLogRecords; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup; @@ -75,6 +77,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -229,22 +232,18 @@ private void registerTableInZkClient() throws Exception { zkClient.registerSchema(DATA2_TABLE_PATH, DATA2_SCHEMA); } - protected long registerTableInZkClient(int tieredLogLocalSegment) throws Exception { - long tableId = 200; - TableDescriptor tableDescriptor = - TableDescriptor.builder() - .schema(DATA1_SCHEMA) - .distributedBy(3) - .property( - ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS, - tieredLogLocalSegment) - .build(); + protected long registerTableInZkClient( + TablePath tablePath, Schema schema, long tableId, Map properties) + throws Exception { + TableDescriptor.Builder builder = TableDescriptor.builder().schema(schema).distributedBy(3); + properties.forEach(builder::property); + TableDescriptor tableDescriptor = builder.build(); // if exists, drop it firstly - if (zkClient.tableExist(DATA1_TABLE_PATH)) { - zkClient.deleteTable(DATA1_TABLE_PATH); + if (zkClient.tableExist(tablePath)) { + zkClient.deleteTable(tablePath); } - zkClient.registerTable(DATA1_TABLE_PATH, TableRegistration.of(tableId, tableDescriptor)); - zkClient.registerSchema(DATA1_TABLE_PATH, DATA1_SCHEMA); + zkClient.registerTable(tablePath, TableRegistration.of(tableId, tableDescriptor)); + zkClient.registerSchema(tablePath, schema); return tableId; } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index af039c92..34987f70 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -17,7 +17,9 @@ package com.alibaba.fluss.server.tablet; import com.alibaba.fluss.exception.InvalidRequiredAcksException; +import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; @@ -36,6 +38,10 @@ import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.log.ListOffsetsParam; import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import com.alibaba.fluss.types.DataField; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.utils.types.Tuple2; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -43,6 +49,8 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import static com.alibaba.fluss.record.TestData.ANOTHER_DATA1; @@ -56,12 +64,14 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static com.alibaba.fluss.server.testutils.KvTestUtils.assertIndexLookupResponse; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertLimitScanResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newIndexLookupRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLimitScanRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newListOffsetsRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; @@ -305,7 +315,7 @@ void testPutKv() throws Exception { } @Test - void testGetKey() throws Exception { + void testLookup() throws Exception { long tableId = createTable( FLUSS_CLUSTER_EXTENSION, @@ -319,7 +329,7 @@ void testGetKey() throws Exception { TabletServerGateway leaderGateWay = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); - // first get key without in table, key = 1. + // first lookup without in table, key = 1. Object[] key1 = DATA_1_WITH_KEY_AND_VALUE.get(0).f0; KeyEncoder keyEncoder = new KeyEncoder(DATA1_ROW_TYPE, new int[] {0}); byte[] key1Bytes = keyEncoder.encode(row(DATA1_KEY_TYPE, key1)); @@ -334,7 +344,7 @@ void testGetKey() throws Exception { tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) .get()); - // second get key in table, key = 1, value = 1, "a1". + // second lookup in table, key = 1, value = 1, "a1". Object[] value1 = DATA_1_WITH_KEY_AND_VALUE.get(3).f1; byte[] value1Bytes = ValueEncoder.encodeValue(DEFAULT_SCHEMA_ID, compactedRow(DATA1_ROW_TYPE, value1)); @@ -359,7 +369,7 @@ void testGetKey() throws Exception { Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION, "Unknown table or bucket: TableBucket{tableId=10005, bucket=6}"); - // Get key from a non-pk table. + // Lookup from a non-pk table. long logTableId = createTable( FLUSS_CLUSTER_EXTENSION, @@ -383,6 +393,162 @@ void testGetKey() throws Exception { "the primary key table not exists for TableBucket"); } + @Test + void testIndexLookup() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_index_lookup_t1"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + RowType rowType = schema.toRowType(); + RowType primaryKeyType = + DataTypes.ROW( + new DataField("a", DataTypes.INT()), + new DataField("b", DataTypes.STRING()), + new DataField("c", DataTypes.BIGINT())); + RowType indexKeyType = + DataTypes.ROW( + new DataField("a", DataTypes.INT()), + new DataField("b", DataTypes.STRING())); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .property("table.index.key", "idx0=a,b") + .build(); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, descriptor); + TableBucket tb = new TableBucket(tableId, 0); + + FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + // first index lookup without in table, index key = (1, "a"). + Object[] indexKey1 = new Object[] {1, "a"}; + KeyEncoder keyEncoder = new KeyEncoder(rowType, new int[] {0, 1}); + byte[] indexKey1Bytes = keyEncoder.encode(row(indexKeyType, indexKey1)); + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Collections.singletonList(indexKey1Bytes))) + .get(), + Collections.singletonList(Collections.emptyList())); + + // send one batch kv. + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, + 0, + 1, + genKvRecordBatch( + primaryKeyType, + rowType, + Arrays.asList( + Tuple2.of( + new Object[] {1, "a", 1L}, + new Object[] { + 1, "a", 1L, "value1" + }), + Tuple2.of( + new Object[] {1, "a", 2L}, + new Object[] { + 1, "a", 2L, "value2" + }), + Tuple2.of( + new Object[] {1, "a", 3L}, + new Object[] { + 1, "a", 3L, "value3" + }), + Tuple2.of( + new Object[] {2, "a", 4L}, + new Object[] { + 2, "a", 4L, "value4" + }))))) + .get()); + + // second index lookup in table, index key = (1, "a"). + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Collections.singletonList(indexKey1Bytes))) + .get(), + Collections.singletonList( + Arrays.asList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 1L, "value1"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 2L, "value2"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {1, "a", 3L, "value3"}))))); + + // third index lookup in table for multi index key, index key = (1, "a") and (2, "a"). + Object[] indexKey2 = new Object[] {2, "a"}; + byte[] indexKey2Bytes = keyEncoder.encode(row(indexKeyType, indexKey2)); + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Arrays.asList(indexKey1Bytes, indexKey2Bytes))) + .get(), + Arrays.asList( + Arrays.asList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 1L, "value1"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 2L, "value2"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {1, "a", 3L, "value3"}))), + Collections.singletonList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {2, "a", 4L, "value4"}))))); + + // index lookup an unsupported index table. + long tableId2 = + createTable( + FLUSS_CLUSTER_EXTENSION, + DATA1_TABLE_PATH_PK, + DATA1_TABLE_INFO_PK.getTableDescriptor()); + tb = new TableBucket(tableId2, 0); + FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); + leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay2 = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + assertThatThrownBy( + () -> + leaderGateWay2 + .indexLookup( + newIndexLookupRequest( + tableId2, + 0, + Collections.singletonList(indexKey1Bytes))) + .get()) + .cause() + .isInstanceOf(KvStorageException.class) + .hasMessageContaining( + "Table bucket TableBucket{tableId=" + + tableId2 + + ", bucket=0} does not support index lookup"); + } + @Test void testLimitScanPrimaryKeyTable() throws Exception { long tableId = diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java index cd9df1d5..81911810 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java @@ -41,6 +41,8 @@ import com.alibaba.fluss.rpc.messages.GetTableResponse; import com.alibaba.fluss.rpc.messages.GetTableSchemaRequest; import com.alibaba.fluss.rpc.messages.GetTableSchemaResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -183,6 +185,11 @@ public CompletableFuture lookup(LookupRequest request) { return null; } + @Override + public CompletableFuture indexLookup(IndexLookupRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture limitScan(LimitScanRequest request) { return null; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java index 4d611e95..088da43a 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java @@ -19,7 +19,10 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.LookupResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.messages.PbLookupRespForBucket; import com.alibaba.fluss.rpc.messages.PbValue; import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; @@ -172,4 +175,23 @@ public static void assertLookupResponse( byte[] lookupValue = pbValue.hasValues() ? pbValue.getValues() : null; assertThat(lookupValue).isEqualTo(expectedValue); } + + public static void assertIndexLookupResponse( + IndexLookupResponse indexLookupResponse, List> expectedValues) { + checkArgument(indexLookupResponse.getBucketsRespsCount() == 1); + PbIndexLookupRespForBucket pbIndexLookupRespForBucket = + indexLookupResponse.getBucketsRespAt(0); + checkArgument(pbIndexLookupRespForBucket.getKeysRespsCount() == expectedValues.size()); + for (int i = 0; i < expectedValues.size(); i++) { + PbIndexLookupRespForKey pbIndexLookupRespForKey = + pbIndexLookupRespForBucket.getKeysRespAt(i); + List bytesResultForOneIndexKey = expectedValues.get(i); + checkArgument( + pbIndexLookupRespForKey.getValuesCount() == bytesResultForOneIndexKey.size()); + for (int j = 0; j < bytesResultForOneIndexKey.size(); j++) { + assertThat(pbIndexLookupRespForKey.getValueAt(j)) + .isEqualTo(bytesResultForOneIndexKey.get(j)); + } + } + } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java index 435eefe4..bdfb68fe 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java @@ -34,6 +34,7 @@ import com.alibaba.fluss.rpc.messages.FetchLogResponse; import com.alibaba.fluss.rpc.messages.GetTableRequest; import com.alibaba.fluss.rpc.messages.GetTableResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.LimitScanRequest; import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; @@ -44,6 +45,7 @@ import com.alibaba.fluss.rpc.messages.PbFetchLogReqForTable; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket; import com.alibaba.fluss.rpc.messages.PbProduceLogRespForBucket; @@ -215,6 +217,17 @@ public static LookupRequest newLookupRequest(long tableId, int bucketId, byte[] return lookupRequest; } + public static IndexLookupRequest newIndexLookupRequest( + long tableId, int bucketId, List indexKeys) { + IndexLookupRequest indexLookupRequest = new IndexLookupRequest().setTableId(tableId); + PbIndexLookupReqForBucket pbIndexLookupReqForBucket = indexLookupRequest.addBucketsReq(); + pbIndexLookupReqForBucket.setBucketId(bucketId); + for (byte[] indexKey : indexKeys) { + pbIndexLookupReqForBucket.addKey(indexKey); + } + return indexLookupRequest; + } + public static LimitScanRequest newLimitScanRequest(long tableId, int bucketId, int limit) { return new LimitScanRequest().setTableId(tableId).setBucketId(bucketId).setLimit(limit); }