Skip to content

Commit

Permalink
[kv] Support index lookup for primary key table
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 18, 2024
1 parent 9803725 commit 742d598
Show file tree
Hide file tree
Showing 53 changed files with 1,733 additions and 210 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.lookup;

import com.alibaba.fluss.annotation.Internal;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/** Abstract Class to represent a lookup operation. */
@Internal
public abstract class AbstractLookup {

private final byte[] key;

public AbstractLookup(byte[] key) {
this.key = key;
}

public byte[] key() {
return key;
}

public abstract LookupType lookupType();

public abstract CompletableFuture<List<byte[]>> future();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.lookup;

import com.alibaba.fluss.annotation.Internal;

import java.util.ArrayList;
import java.util.List;

/** An abstract lookup batch. */
@Internal
public abstract class AbstractLookupBatch {

protected final List<AbstractLookup> lookups;

public AbstractLookupBatch() {
this.lookups = new ArrayList<>();
}

public void addLookup(AbstractLookup lookup) {
lookups.add(lookup);
}

public List<AbstractLookup> lookups() {
return lookups;
}

/** Complete the lookup operations using given values . */
public abstract void complete(List<List<byte[]>> values);

/** Complete the get operations with given exception. */
public abstract void completeExceptionally(Exception exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.lookup;

import com.alibaba.fluss.annotation.Internal;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Class to represent an index lookup operation, it contains the table id, bucketNums and related
* CompletableFuture.
*/
@Internal
public class IndexLookup extends AbstractLookup {
private final int destination;
private final long tableId;
private final int bucketId;
private final String indexName;
private final CompletableFuture<List<byte[]>> future;

public IndexLookup(
int destination, long tableId, int bucketId, String indexName, byte[] indexKey) {
super(indexKey);
this.destination = destination;
this.tableId = tableId;
this.bucketId = bucketId;
this.indexName = indexName;
this.future = new CompletableFuture<>();
}

public int destination() {
return destination;
}

public long tableId() {
return tableId;
}

public int bucketId() {
return bucketId;
}

public String indexName() {
return indexName;
}

public CompletableFuture<List<byte[]>> future() {
return future;
}

@Override
public LookupType lookupType() {
return LookupType.INDEX_LOOKUP;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.lookup;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.TableBucket;

import java.util.List;

/**
* A batch that contains the index operations that send to same destination and some table together.
*/
@Internal
public class IndexLookupBatch extends AbstractLookupBatch {

private final TableBucket tableBucket;

public IndexLookupBatch(TableBucket tableBucket) {
super();
this.tableBucket = tableBucket;
}

public TableBucket tableBucket() {
return tableBucket;
}

@Override
public void complete(List<List<byte[]>> values) {
if (values.size() != lookups.size()) {
completeExceptionally(
new FlussRuntimeException(
String.format(
"The number of values return by index lookup request is not equal to the number of "
+ "index lookups send. Got %d values, but expected %d.",
values.size(), lookups.size())));
} else {
for (int i = 0; i < values.size(); i++) {
AbstractLookup lookup = lookups.get(i);
lookup.future().complete(values.get(i));
}
}
}

@Override
public void completeExceptionally(Exception exception) {
for (AbstractLookup get : lookups) {
get.future().completeExceptionally(exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,35 @@
import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.metadata.TableBucket;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Class to represent a Lookup operation, it contains the table bucket that the key should lookup
* from, the bytes of the key, and a future for the lookup operation.
*/
@Internal
public class Lookup {
public class Lookup extends AbstractLookup {

private final TableBucket tableBucket;
private final byte[] key;
private final CompletableFuture<byte[]> future;
private final CompletableFuture<List<byte[]>> future;

Lookup(TableBucket tableBucket, byte[] key) {
super(key);
this.tableBucket = tableBucket;
this.key = key;
this.future = new CompletableFuture<>();
}

public TableBucket tableBucket() {
return tableBucket;
}

public byte[] key() {
return key;
@Override
public LookupType lookupType() {
return LookupType.LOOKUP;
}

public CompletableFuture<byte[]> future() {
public CompletableFuture<List<byte[]>> future() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.rpc.messages.PbValue;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -51,21 +50,21 @@ public TableBucket tableBucket() {
}

/** Complete the lookup operations using given values . */
public void complete(List<PbValue> pbValues) {
public void complete(List<List<byte[]>> values) {
// if the size of return values of lookup operation are not equal to the number of lookups,
// should complete an exception.
if (pbValues.size() != lookups.size()) {
if (values.size() != lookups.size()) {
completeExceptionally(
new FlussRuntimeException(
String.format(
"The number of return values of lookup operation is not equal to the number of "
+ "lookups. Return %d values, but expected %d.",
pbValues.size(), lookups.size())));
values.size(), lookups.size())));
} else {
for (int i = 0; i < pbValues.size(); i++) {
Lookup lookup = lookups.get(i);
PbValue pbValue = pbValues.get(i);
lookup.future().complete(pbValue.hasValues() ? pbValue.getValues() : null);
for (int i = 0; i < values.size(); i++) {
AbstractLookup lookup = lookups.get(i);
// single value.
lookup.future().complete(values.get(i));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.concurrent.ThreadSafe;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -57,10 +58,12 @@ public class LookupClient {

private final ExecutorService lookupSenderThreadPool;
private final LookupSender lookupSender;
private final MetadataUpdater metadataUpdater;

public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) {
this.lookupQueue = new LookupQueue(conf);
this.lookupSenderThreadPool = createThreadPool();
this.metadataUpdater = metadataUpdater;
this.lookupSender =
new LookupSender(
metadataUpdater,
Expand All @@ -75,12 +78,23 @@ private ExecutorService createThreadPool() {
return Executors.newFixedThreadPool(1, new ExecutorThreadFactory(LOOKUP_THREAD_PREFIX));
}

public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] keyBytes) {
public CompletableFuture<List<byte[]>> lookup(TableBucket tableBucket, byte[] keyBytes) {
Lookup lookup = new Lookup(tableBucket, keyBytes);
lookupQueue.appendLookup(lookup);
return lookup.future();
}

public CompletableFuture<List<byte[]>> indexLookup(
long tableId, int bucketId, String indexName, byte[] keyBytes) {
// TODO index lookup support partition table.

// TODO leader and buckets may change during the index lookup. need do retry send.
int leader = metadataUpdater.leaderFor(new TableBucket(tableId, bucketId));
IndexLookup indexLookup = new IndexLookup(leader, tableId, bucketId, indexName, keyBytes);
lookupQueue.appendLookup(indexLookup);
return indexLookup.future();
}

public void close(Duration timeout) {
LOG.info("Closing lookup client and lookup sender.");

Expand Down
Loading

0 comments on commit 742d598

Please sign in to comment.