Skip to content

Commit

Permalink
[core] Make ObjectsCache thread safe (#3836)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jul 29, 2024
1 parent 8f69480 commit acec234
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.memory.MemorySegmentSource;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -36,11 +37,12 @@
import java.util.function.BiFunction;

/** Cache records to {@link SegmentsCache} by compacted serializer. */
@ThreadSafe
public class ObjectsCache<K, V> {

private final SegmentsCache<K> cache;
private final ObjectSerializer<V> serializer;
private final InternalRowSerializer rowSerializer;
private final ThreadLocal<InternalRowSerializer> threadLocalRowSerializer;
private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;

public ObjectsCache(
Expand All @@ -49,7 +51,8 @@ public ObjectsCache(
BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
this.cache = cache;
this.serializer = serializer;
this.rowSerializer = new InternalRowSerializer(serializer.fieldTypes());
this.threadLocalRowSerializer =
ThreadLocal.withInitial(() -> new InternalRowSerializer(serializer.fieldTypes()));
this.reader = reader;
}

Expand All @@ -59,7 +62,9 @@ public List<V> read(
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
Segments segments = cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter));
InternalRowSerializer rowSerializer = threadLocalRowSerializer.get();
Segments segments =
cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter, rowSerializer));
List<V> entries = new ArrayList<>();
RandomAccessInputView view =
new RandomAccessInputView(
Expand All @@ -77,7 +82,11 @@ public List<V> read(
}
}

private Segments readSegments(K key, @Nullable Long fileSize, Filter<InternalRow> loadFilter) {
private Segments readSegments(
K key,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
InternalRowSerializer rowSerializer) {
try (CloseableIterator<InternalRow> iterator = reader.apply(key, fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
MemorySegmentSource segmentSource =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@ public void test() throws IOException {
r -> r.getString(0).toString().endsWith("5"),
Filter.alwaysTrue());
assertThat(values).isEmpty();

// test read concurrently
map.clear();
for (int i = 0; i < 10; i++) {
map.put(String.valueOf(i), Collections.singletonList(String.valueOf(i)));
}
map.keySet().stream()
.parallel()
.forEach(
k -> {
try {
assertThat(
cache.read(
k,
null,
Filter.alwaysTrue(),
Filter.alwaysTrue()))
.containsExactly(k);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

private static class StringSerializer extends ObjectSerializer<String> {
Expand Down

0 comments on commit acec234

Please sign in to comment.