diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 40482c2f5569..f80037911381 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -28,6 +28,7 @@ import org.apache.paimon.memory.MemorySegmentSource; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; import java.io.EOFException; import java.io.IOException; @@ -36,11 +37,12 @@ import java.util.function.BiFunction; /** Cache records to {@link SegmentsCache} by compacted serializer. */ +@ThreadSafe public class ObjectsCache { private final SegmentsCache cache; private final ObjectSerializer serializer; - private final InternalRowSerializer rowSerializer; + private final ThreadLocal threadLocalRowSerializer; private final BiFunction> reader; public ObjectsCache( @@ -49,7 +51,8 @@ public ObjectsCache( BiFunction> reader) { this.cache = cache; this.serializer = serializer; - this.rowSerializer = new InternalRowSerializer(serializer.fieldTypes()); + this.threadLocalRowSerializer = + ThreadLocal.withInitial(() -> new InternalRowSerializer(serializer.fieldTypes())); this.reader = reader; } @@ -59,7 +62,9 @@ public List read( Filter loadFilter, Filter readFilter) throws IOException { - Segments segments = cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter)); + InternalRowSerializer rowSerializer = threadLocalRowSerializer.get(); + Segments segments = + cache.getSegments(key, k -> readSegments(k, fileSize, loadFilter, rowSerializer)); List entries = new ArrayList<>(); RandomAccessInputView view = new RandomAccessInputView( @@ -77,7 +82,11 @@ public List read( } } - private Segments readSegments(K key, @Nullable Long fileSize, Filter loadFilter) { + private Segments readSegments( + K key, + @Nullable Long fileSize, + Filter loadFilter, + InternalRowSerializer rowSerializer) { try (CloseableIterator iterator = reader.apply(key, fileSize)) { ArrayList segments = new ArrayList<>(); MemorySegmentSource segmentSource = diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java index 13271bd324a1..be9d2a48c528 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java @@ -99,6 +99,28 @@ public void test() throws IOException { r -> r.getString(0).toString().endsWith("5"), Filter.alwaysTrue()); assertThat(values).isEmpty(); + + // test read concurrently + map.clear(); + for (int i = 0; i < 10; i++) { + map.put(String.valueOf(i), Collections.singletonList(String.valueOf(i))); + } + map.keySet().stream() + .parallel() + .forEach( + k -> { + try { + assertThat( + cache.read( + k, + null, + Filter.alwaysTrue(), + Filter.alwaysTrue())) + .containsExactly(k); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private static class StringSerializer extends ObjectSerializer {