Skip to content

Commit

Permalink
support async lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan committed Oct 16, 2024
1 parent 3825f43 commit d7cb609
Show file tree
Hide file tree
Showing 17 changed files with 389 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The serialized refresh handler of materialized table.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<Integer> LOOKUP_HASH_ASYNC_THREAD_NUMBER =
key("lookup.hash.async-thread-number")
.intType()
.defaultValue(16)
.withDescription("The thread number for lookup async for hash store.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ public InternalRow copy(InternalRow from) {
}

@Override
public void serialize(InternalRow record, DataOutputView target) throws IOException {
public synchronized void serialize(InternalRow record, DataOutputView target)
throws IOException {
byte[] bytes = serializeToBytes(record);
VarLengthIntUtils.encodeInt(target, bytes.length);
target.write(bytes);
}

@Override
public InternalRow deserialize(DataInputView source) throws IOException {
public synchronized InternalRow deserialize(DataInputView source) throws IOException {
int len = VarLengthIntUtils.decodeInt(source);
byte[] bytes = new byte[len];
source.readFully(bytes);
Expand All @@ -132,7 +133,7 @@ public int hashCode() {
return Objects.hash(rowType);
}

public byte[] serializeToBytes(InternalRow record) {
public synchronized byte[] serializeToBytes(InternalRow record) {
if (rowWriter == null) {
rowWriter = new RowWriter(calculateBitSetInBytes(getters.length));
}
Expand All @@ -149,7 +150,7 @@ public byte[] serializeToBytes(InternalRow record) {
return rowWriter.copyBuffer();
}

public InternalRow deserialize(byte[] bytes) {
public synchronized InternalRow deserialize(byte[] bytes) {
if (rowReader == null) {
rowReader = new RowReader(calculateBitSetInBytes(getters.length));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {
Expand All @@ -39,7 +40,7 @@ public class CacheManager {

private final Cache<CacheKey, CacheValue> cache;

private int fileReadCount;
private final AtomicInteger fileReadCount;

public CacheManager(MemorySize maxMemorySize) {
this.cache =
Expand All @@ -49,7 +50,7 @@ public CacheManager(MemorySize maxMemorySize) {
.removalListener(this::onRemoval)
.executor(Runnable::run)
.build();
this.fileReadCount = 0;
this.fileReadCount = new AtomicInteger(0);
}

@VisibleForTesting
Expand All @@ -61,7 +62,7 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
CacheValue value = cache.getIfPresent(key);
while (value == null || value.isClosed) {
try {
this.fileReadCount++;
this.fileReadCount.incrementAndGet();
value = new CacheValue(MemorySegment.wrap(reader.read(key)), callback);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -87,7 +88,7 @@ private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) {
}

public int fileReadCount() {
return fileReadCount;
return fileReadCount.get();
}

private static class CacheValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public final class ListDelimitedSerializer {
private final DataInputDeserializer dataInputView = new DataInputDeserializer();
private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128);

public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T> elementSerializer) {
public synchronized <T> List<T> deserializeList(
byte[] valueBytes, Serializer<T> elementSerializer) {
if (valueBytes == null) {
return null;
}
Expand All @@ -54,7 +55,7 @@ public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T> elementSeria
return result;
}

public <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializer)
public synchronized <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializer)
throws IOException {

dataOutputView.clear();
Expand All @@ -74,7 +75,7 @@ public <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializ
return dataOutputView.getCopyOfBuffer();
}

public byte[] serializeList(List<byte[]> valueList) throws IOException {
public synchronized byte[] serializeList(List<byte[]> valueList) throws IOException {

dataOutputView.clear();
boolean first = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public List<V> get(K key) throws IOException {
});
}

public byte[] serializeValue(V value) throws IOException {
public synchronized byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
}

public byte[] serializeList(List<byte[]> valueList) throws IOException {
public synchronized byte[] serializeList(List<byte[]> valueList) throws IOException {
return listSerializer.serializeList(valueList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public List<V> get(K key) throws IOException {
}
cache.put(keyBytes, valueBytes);
}
return serializeValues(valueBytes);
}

private synchronized List<V> serializeValues(List<byte[]> valueBytes) throws IOException {
List<V> values = new ArrayList<>(valueBytes.size());
for (byte[] value : valueBytes) {
valueInputView.setBuffer(value);
Expand Down Expand Up @@ -93,7 +96,7 @@ public void add(K key, V value) throws IOException {
}
}

private byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
private synchronized byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
checkArgument(value != null);

keyOutView.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public RocksDBState(
.build();
}

public byte[] serializeKey(K key) throws IOException {
public synchronized byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
return keyOutView.getCopyOfBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ public void delete(K key) throws IOException {
}
}

public V deserializeValue(byte[] valueBytes) throws IOException {
public synchronized V deserializeValue(byte[] valueBytes) throws IOException {
valueInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(valueInputView);
}

public byte[] serializeValue(V value) throws IOException {
public synchronized byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
Expand All @@ -67,6 +68,9 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;

private final ReentrantLock[] locks;
private final int lookupAsyncThreadNumber;

public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
Expand All @@ -76,7 +80,8 @@ public LookupLevels(
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
Cache<String, LookupFile> lookupFileCache,
int lookupAsyncThreadNumber) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
Expand All @@ -87,9 +92,37 @@ public LookupLevels(
this.bfGenerator = bfGenerator;
this.lookupFileCache = lookupFileCache;
this.ownCachedFiles = new HashSet<>();
this.lookupAsyncThreadNumber = lookupAsyncThreadNumber;
this.locks = new ReentrantLock[lookupAsyncThreadNumber];
for (int i = 0; i < lookupAsyncThreadNumber; i++) {
locks[i] = new ReentrantLock();
}
levels.addDropFileCallback(this);
}

public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
this(
levels,
keyComparator,
keyType,
valueProcessor,
fileReaderFactory,
localFileFactory,
lookupStoreFactory,
bfGenerator,
lookupFileCache,
1);
}

public Levels getLevels() {
return levels;
}
Expand Down Expand Up @@ -126,29 +159,37 @@ private T lookup(InternalRow key, SortedRun level) throws IOException {

@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
ReentrantLock lock = locks[Math.abs(file.hashCode()) % lookupAsyncThreadNumber];
try {
lock.lock();
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());

boolean newCreatedLookupFile = false;
if (lookupFile == null) {
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}
boolean newCreatedLookupFile = false;
if (lookupFile == null) {
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}

byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
}
}
if (valueBytes == null) {
return null;
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
if (lock != null) {
lock.unlock();
}
}
if (valueBytes == null) {
return null;
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
}

private LookupFile createLookupFile(DataFileMeta file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class LocalTableQuery implements TableQuery {

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
this.tableView = new ConcurrentHashMap<>();
FileStore<?> tableStore = table.store();
if (!(tableStore instanceof KeyValueFileStore)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -128,7 +128,7 @@ public void refreshFiles(
List<DataFileMeta> beforeFiles,
List<DataFileMeta> dataFiles) {
LookupLevels<KeyValue> lookupLevels =
tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket);
tableView.computeIfAbsent(partition, k -> new ConcurrentHashMap<>()).get(bucket);
if (lookupLevels == null) {
Preconditions.checkArgument(
beforeFiles.isEmpty(),
Expand Down Expand Up @@ -180,16 +180,17 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
lookupFileCache,
options.get(CoreOptions.LOOKUP_HASH_ASYNC_THREAD_NUMBER));

tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
tableView
.computeIfAbsent(partition, k -> new ConcurrentHashMap<>())
.put(bucket, lookupLevels);
}

/** TODO remove synchronized and supports multiple thread to lookup. */
@Nullable
@Override
public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
throws IOException {
public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
Map<Integer, LookupLevels<KeyValue>> buckets = tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ private Collection<RowData> lookup(RowData keyRow) {
Thread.currentThread()
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
return function.lookup(keyRow);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Expand Down
Loading

0 comments on commit d7cb609

Please sign in to comment.