Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core]Support async lookup #4341

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The serialized refresh handler of materialized table.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<Integer> LOOKUP_HASH_ASYNC_THREAD_NUMBER =
key("lookup.hash.async-thread-number")
.intType()
.defaultValue(16)
.withDescription("The thread number for lookup async for hash store.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ public InternalRow copy(InternalRow from) {
}

@Override
public void serialize(InternalRow record, DataOutputView target) throws IOException {
public synchronized void serialize(InternalRow record, DataOutputView target)
throws IOException {
byte[] bytes = serializeToBytes(record);
VarLengthIntUtils.encodeInt(target, bytes.length);
target.write(bytes);
}

@Override
public InternalRow deserialize(DataInputView source) throws IOException {
public synchronized InternalRow deserialize(DataInputView source) throws IOException {
int len = VarLengthIntUtils.decodeInt(source);
byte[] bytes = new byte[len];
source.readFully(bytes);
Expand All @@ -132,7 +133,7 @@ public int hashCode() {
return Objects.hash(rowType);
}

public byte[] serializeToBytes(InternalRow record) {
public synchronized byte[] serializeToBytes(InternalRow record) {
if (rowWriter == null) {
rowWriter = new RowWriter(calculateBitSetInBytes(getters.length));
}
Expand All @@ -149,7 +150,7 @@ public byte[] serializeToBytes(InternalRow record) {
return rowWriter.copyBuffer();
}

public InternalRow deserialize(byte[] bytes) {
public synchronized InternalRow deserialize(byte[] bytes) {
if (rowReader == null) {
rowReader = new RowReader(calculateBitSetInBytes(getters.length));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {
Expand All @@ -39,7 +40,7 @@ public class CacheManager {

private final Cache<CacheKey, CacheValue> cache;

private int fileReadCount;
private final AtomicInteger fileReadCount;

public CacheManager(MemorySize maxMemorySize) {
this.cache =
Expand All @@ -49,7 +50,7 @@ public CacheManager(MemorySize maxMemorySize) {
.removalListener(this::onRemoval)
.executor(Runnable::run)
.build();
this.fileReadCount = 0;
this.fileReadCount = new AtomicInteger(0);
}

@VisibleForTesting
Expand All @@ -61,7 +62,7 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
CacheValue value = cache.getIfPresent(key);
while (value == null || value.isClosed) {
try {
this.fileReadCount++;
this.fileReadCount.incrementAndGet();
value = new CacheValue(MemorySegment.wrap(reader.read(key)), callback);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -87,7 +88,7 @@ private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) {
}

public int fileReadCount() {
return fileReadCount;
return fileReadCount.get();
}

private static class CacheValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public final class ListDelimitedSerializer {
private final DataInputDeserializer dataInputView = new DataInputDeserializer();
private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128);

public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T> elementSerializer) {
public synchronized <T> List<T> deserializeList(
byte[] valueBytes, Serializer<T> elementSerializer) {
if (valueBytes == null) {
return null;
}
Expand All @@ -54,7 +55,7 @@ public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T> elementSeria
return result;
}

public <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializer)
public synchronized <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializer)
throws IOException {

dataOutputView.clear();
Expand All @@ -74,7 +75,7 @@ public <T> byte[] serializeList(List<T> valueList, Serializer<T> elementSerializ
return dataOutputView.getCopyOfBuffer();
}

public byte[] serializeList(List<byte[]> valueList) throws IOException {
public synchronized byte[] serializeList(List<byte[]> valueList) throws IOException {

dataOutputView.clear();
boolean first = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public List<V> get(K key) throws IOException {
});
}

public byte[] serializeValue(V value) throws IOException {
public synchronized byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
}

public byte[] serializeList(List<byte[]> valueList) throws IOException {
public synchronized byte[] serializeList(List<byte[]> valueList) throws IOException {
return listSerializer.serializeList(valueList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public List<V> get(K key) throws IOException {
}
cache.put(keyBytes, valueBytes);
}
return serializeValues(valueBytes);
}

private synchronized List<V> serializeValues(List<byte[]> valueBytes) throws IOException {
List<V> values = new ArrayList<>(valueBytes.size());
for (byte[] value : valueBytes) {
valueInputView.setBuffer(value);
Expand Down Expand Up @@ -93,7 +96,7 @@ public void add(K key, V value) throws IOException {
}
}

private byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
private synchronized byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
checkArgument(value != null);

keyOutView.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public RocksDBState(
.build();
}

public byte[] serializeKey(K key) throws IOException {
public synchronized byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
return keyOutView.getCopyOfBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ public void delete(K key) throws IOException {
}
}

public V deserializeValue(byte[] valueBytes) throws IOException {
public synchronized V deserializeValue(byte[] valueBytes) throws IOException {
valueInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(valueInputView);
}

public byte[] serializeValue(V value) throws IOException {
public synchronized byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
Expand All @@ -67,6 +68,9 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;

private final ReentrantLock[] locks;
private final int lookupAsyncThreadNumber;

public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
Expand All @@ -76,7 +80,8 @@ public LookupLevels(
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
Cache<String, LookupFile> lookupFileCache,
int lookupAsyncThreadNumber) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
Expand All @@ -87,9 +92,37 @@ public LookupLevels(
this.bfGenerator = bfGenerator;
this.lookupFileCache = lookupFileCache;
this.ownCachedFiles = new HashSet<>();
this.lookupAsyncThreadNumber = lookupAsyncThreadNumber;
this.locks = new ReentrantLock[lookupAsyncThreadNumber];
for (int i = 0; i < lookupAsyncThreadNumber; i++) {
locks[i] = new ReentrantLock();
}
levels.addDropFileCallback(this);
}

public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
this(
levels,
keyComparator,
keyType,
valueProcessor,
fileReaderFactory,
localFileFactory,
lookupStoreFactory,
bfGenerator,
lookupFileCache,
1);
}

public Levels getLevels() {
return levels;
}
Expand Down Expand Up @@ -126,29 +159,37 @@ private T lookup(InternalRow key, SortedRun level) throws IOException {

@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
ReentrantLock lock = locks[Math.abs(file.hashCode()) % lookupAsyncThreadNumber];
try {
lock.lock();
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just create a lock in LookupFile?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just create a lock in LookupFile?

I think is ok, create one lock in LookupFile, the same DataFileMeta file use the same lock to ensure thread safe, is that what you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes


boolean newCreatedLookupFile = false;
if (lookupFile == null) {
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}
boolean newCreatedLookupFile = false;
if (lookupFile == null) {
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}

byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
}
}
if (valueBytes == null) {
return null;
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
} finally {
if (newCreatedLookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
if (lock != null) {
lock.unlock();
}
}
if (valueBytes == null) {
return null;
}

return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
}

private LookupFile createLookupFile(DataFileMeta file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class LocalTableQuery implements TableQuery {

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
this.tableView = new ConcurrentHashMap<>();
FileStore<?> tableStore = table.store();
if (!(tableStore instanceof KeyValueFileStore)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -128,7 +128,7 @@ public void refreshFiles(
List<DataFileMeta> beforeFiles,
List<DataFileMeta> dataFiles) {
LookupLevels<KeyValue> lookupLevels =
tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket);
tableView.computeIfAbsent(partition, k -> new ConcurrentHashMap<>()).get(bucket);
if (lookupLevels == null) {
Preconditions.checkArgument(
beforeFiles.isEmpty(),
Expand Down Expand Up @@ -180,16 +180,17 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
lookupFileCache,
options.get(CoreOptions.LOOKUP_HASH_ASYNC_THREAD_NUMBER));

tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
tableView
.computeIfAbsent(partition, k -> new ConcurrentHashMap<>())
.put(bucket, lookupLevels);
}

/** TODO remove synchronized and supports multiple thread to lookup. */
@Nullable
@Override
public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
throws IOException {
public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {
Map<Integer, LookupLevels<KeyValue>> buckets = tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ private Collection<RowData> lookup(RowData keyRow) {
Thread.currentThread()
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
return function.lookup(keyRow);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Expand Down
Loading
Loading