From 8199e337839301aaa4444815fd47d0396537d8a7 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 29 Jul 2024 15:59:10 +0800 Subject: [PATCH 1/3] [core] Make ObjectsCache thread safe --- .../org/apache/paimon/utils/ObjectsCache.java | 14 ++++++++---- .../apache/paimon/utils/ObjectsCacheTest.java | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 40482c2f5569..e3abbe9757f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -28,6 +28,7 @@ import org.apache.paimon.memory.MemorySegmentSource; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; import java.io.EOFException; import java.io.IOException; @@ -36,11 +37,11 @@ import java.util.function.BiFunction; /** Cache records to {@link SegmentsCache} by compacted serializer. */ +@ThreadSafe public class ObjectsCache { private final SegmentsCache cache; private final ObjectSerializer serializer; - private final InternalRowSerializer rowSerializer; private final BiFunction> reader; public ObjectsCache( @@ -49,7 +50,6 @@ public ObjectsCache( BiFunction> reader) { this.cache = cache; this.serializer = serializer; - this.rowSerializer = new InternalRowSerializer(serializer.fieldTypes()); this.reader = reader; } @@ -59,7 +59,9 @@ public List read( Filter loadFilter, Filter readFilter) throws IOException { - Segments segments = cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter)); + InternalRowSerializer rowSerializer = new InternalRowSerializer(serializer.fieldTypes()); + Segments segments = + cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter, rowSerializer)); List entries = new ArrayList<>(); RandomAccessInputView view = new RandomAccessInputView( @@ -77,7 +79,11 @@ public List read( } } - private Segments readSegments(K key, @Nullable Long fileSize, Filter loadFilter) { + private Segments readSegments( + K key, + @Nullable Long fileSize, + Filter loadFilter, + InternalRowSerializer rowSerializer) { try (CloseableIterator iterator = reader.apply(key, fileSize)) { ArrayList segments = new ArrayList<>(); MemorySegmentSource segmentSource = diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java index 13271bd324a1..be9d2a48c528 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java @@ -99,6 +99,28 @@ public void test() throws IOException { r -> r.getString(0).toString().endsWith("5"), Filter.alwaysTrue()); assertThat(values).isEmpty(); + + // test read concurrently + map.clear(); + for (int i = 0; i < 10; i++) { + map.put(String.valueOf(i), Collections.singletonList(String.valueOf(i))); + } + map.keySet().stream() + .parallel() + .forEach( + k -> { + try { + assertThat( + cache.read( + k, + null, + Filter.alwaysTrue(), + Filter.alwaysTrue())) + .containsExactly(k); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private static class StringSerializer extends ObjectSerializer { From b6d009423c75594853ffcb0d2ce3219a41f6cc3c Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 29 Jul 2024 16:21:20 +0800 Subject: [PATCH 2/3] 1 --- .../src/main/java/org/apache/paimon/utils/ObjectsCache.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index e3abbe9757f3..f081ee58deba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -43,6 +43,7 @@ public class ObjectsCache { private final SegmentsCache cache; private final ObjectSerializer serializer; private final BiFunction> reader; + private final ThreadLocal threadLocalRowSerializer; public ObjectsCache( SegmentsCache cache, @@ -50,6 +51,8 @@ public ObjectsCache( BiFunction> reader) { this.cache = cache; this.serializer = serializer; + this.threadLocalRowSerializer = + ThreadLocal.withInitial(() -> new InternalRowSerializer(serializer.fieldTypes())); this.reader = reader; } @@ -59,7 +62,7 @@ public List read( Filter loadFilter, Filter readFilter) throws IOException { - InternalRowSerializer rowSerializer = new InternalRowSerializer(serializer.fieldTypes()); + InternalRowSerializer rowSerializer = threadLocalRowSerializer.get(); Segments segments = cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter, rowSerializer)); List entries = new ArrayList<>(); From d44ec4d4b836d666314f1b6705814997bcd47f2c Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 29 Jul 2024 16:31:01 +0800 Subject: [PATCH 3/3] 1 --- .../src/main/java/org/apache/paimon/utils/ObjectsCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index f081ee58deba..f80037911381 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -42,8 +42,8 @@ public class ObjectsCache { private final SegmentsCache cache; private final ObjectSerializer serializer; - private final BiFunction> reader; private final ThreadLocal threadLocalRowSerializer; + private final BiFunction> reader; public ObjectsCache( SegmentsCache cache,