Skip to content

Commit

Permalink
[service] Apply new TableQuery to KvQueryServer
Browse files Browse the repository at this point in the history
This closes apache#2655
  • Loading branch information
JingsongLi committed Jan 8, 2024
1 parent f517045 commit 3678116
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 69 deletions.
32 changes: 0 additions & 32 deletions paimon-core/src/main/java/org/apache/paimon/query/TableQuery.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.utils.Projection;
Expand All @@ -43,6 +44,8 @@ default TableQuery withValueProjection(int[] projection) {

TableQuery withIOManager(IOManager ioManager);

InternalRowSerializer createValueSerializer();

void refreshFiles(
BinaryRow partition,
int bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
Expand Down Expand Up @@ -178,6 +180,11 @@ public TableQuery withIOManager(IOManager ioManager) {
return this;
}

@Override
public InternalRowSerializer createValueSerializer() {
return InternalSerializers.create(readerFactoryBuilder.projectedValueType());
}

@Override
public void close() throws IOException {
for (Map.Entry<BinaryRow, Map<Integer, LookupLevels>> buckets : tableView.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.paimon.service.server;

import org.apache.paimon.query.QueryServer;
import org.apache.paimon.query.TableQuery;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.AbstractServerHandler;
import org.apache.paimon.service.network.NetworkServer;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.paimon.service.server;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.query.TableQuery;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.service.exceptions.UnknownPartitionBucketException;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.AbstractServerHandler;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.Preconditions;

Expand All @@ -49,6 +51,7 @@ public class KvServerHandler extends AbstractServerHandler<KvRequest, KvResponse
private final int serverId;
private final int numServers;
private final TableQuery lookup;
private final InternalRowSerializer valueSerializer;

/**
* Create the handler used by the {@link KvQueryServer}.
Expand All @@ -70,6 +73,7 @@ public KvServerHandler(
this.serverId = serverId;
this.numServers = numServers;
this.lookup = Preconditions.checkNotNull(lookup);
this.valueSerializer = lookup.createValueSerializer();
}

@Override
Expand All @@ -85,8 +89,15 @@ public CompletableFuture<KvResponse> handleRequest(
}

try {
BinaryRow[] values =
lookup.lookup(request.partition(), request.bucket(), request.keys());
BinaryRow[] keys = request.keys();
BinaryRow[] values = new BinaryRow[keys.length];
for (int i = 0; i < values.length; i++) {
InternalRow value =
this.lookup.lookup(request.partition(), request.bucket(), keys[i]);
if (value != null) {
values[i] = valueSerializer.toBinaryRow(value).copy();
}
}
responseFuture.complete(new KvResponse(values));
return responseFuture;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.query.QueryLocationImpl;
import org.apache.paimon.query.TableQuery;
import org.apache.paimon.service.client.KvQueryClient;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.table.sink.CommitMessageImpl;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -37,9 +38,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
Expand All @@ -49,8 +48,8 @@
/** Test for remote lookup. */
public class KvQueryTableTest extends PrimaryKeyTableTestBase {

private TestTableQuery query0;
private TestTableQuery query1;
private TableQuery query0;
private TableQuery query1;

private KvQueryServer server0;
private KvQueryServer server1;
Expand All @@ -59,8 +58,9 @@ public class KvQueryTableTest extends PrimaryKeyTableTestBase {

@BeforeEach
public void beforeEach() {
this.query0 = new TestTableQuery();
this.query1 = new TestTableQuery();
IOManager ioManager = IOManager.create(tempPath.toString());
this.query0 = table.newTableQuery().withIOManager(ioManager);
this.query1 = table.newTableQuery().withIOManager(ioManager);

this.server0 = createServer(0, query0, 7777);
this.server1 = createServer(1, query1, 7900);
Expand Down Expand Up @@ -190,8 +190,15 @@ private void innerTestServerRestart(Runnable restart) throws Throwable {

private void write(int partition, int key, int value) throws Exception {
int bucket = computeBucket(partition, key, value);
TestTableQuery query = select(row(partition), bucket, 2) == 0 ? query0 : query1;
query.put(row(partition), bucket, row(key), row(partition, key, value));
TableQuery query = select(row(partition), bucket, 2) == 0 ? query0 : query1;
BatchTableWrite write = table.newBatchWriteBuilder().newWrite();
write.write(row(partition, key, value), bucket);
CommitMessageImpl message = (CommitMessageImpl) write.prepareCommit().get(0);
query.refreshFiles(
message.partition(),
message.bucket(),
Collections.emptyList(),
message.newFilesIncrement().newFiles());
}

private int computeBucket(int partition, int key, int value) throws Exception {
Expand All @@ -201,27 +208,4 @@ private int computeBucket(int partition, int key, int value) throws Exception {
return commitMessages.get(0).bucket();
}
}

private static class TestTableQuery implements TableQuery {

private final Map<Pair<BinaryRow, Integer>, Map<BinaryRow, BinaryRow>> values =
new HashMap<>();

public void put(BinaryRow partition, int bucket, BinaryRow key, BinaryRow value) {
values.computeIfAbsent(Pair.of(partition, bucket), k -> new HashMap<>())
.put(key, value);
}

@Override
public BinaryRow[] lookup(BinaryRow partition, int bucket, BinaryRow[] keys) {
Map<BinaryRow, BinaryRow> map = values.get(Pair.of(partition, bucket));
BinaryRow[] values = new BinaryRow[keys.length];
if (map != null) {
for (int i = 0; i < keys.length; i++) {
values[i] = map.get(keys[i]);
}
}
return values;
}
}
}

0 comments on commit 3678116

Please sign in to comment.