From 245cbad9e11c765ec099014ba8f6b310d55f43f0 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Wed, 29 May 2024 14:06:24 +0800 Subject: [PATCH] [core] Introduce VectoredReadable to SeekableInputStream (#3369) --- LICENSE | 3 + .../paimon/benchmark/TableReadBenchmark.java | 25 +- .../org/apache/paimon/fs/FileIOUtils.java | 7 + .../java/org/apache/paimon/fs/FileRange.java | 83 ++ .../apache/paimon/fs/VectoredReadUtils.java | 247 ++++++ .../apache/paimon/fs/VectoredReadable.java | 79 ++ .../apache/paimon/fs/local/LocalFileIO.java | 14 +- .../apache/paimon/utils/BlockingExecutor.java | 51 ++ .../org/apache/paimon/utils/ThreadUtils.java | 36 + .../paimon/fs/VectoredReadUtilsTest.java | 130 +++ .../apache/orc/impl/RecordReaderUtils.java | 805 ++++++++++++++++++ .../format/fs/FSDataWrappedInputStream.java | 84 ++ .../format/fs/HadoopReadOnlyFileSystem.java | 59 -- 13 files changed, 1562 insertions(+), 61 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java create mode 100644 paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java diff --git a/LICENSE b/LICENSE index 813296254542..8f291d3a0a3c 100644 --- a/LICENSE +++ b/LICENSE @@ -209,6 +209,8 @@ Apache Software Foundation License 2.0 -------------------------------------- paimon-common/src/main/java/org/apache/paimon/fs/Path.java +paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java +paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java from http://hadoop.apache.org/ version 2.10.2 paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java @@ -258,6 +260,7 @@ from https://hive.apache.org/ version 3.1.0 paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java +paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java paimon-format/src/main/java/org/apache/orc/CompressionKind.java paimon-format/src/main/java/org/apache/orc/OrcConf.java paimon-format/src/main/java/org/apache/orc/OrcFile.java diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java index 4047b0d571d8..5ceaa95f42cc 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java @@ -31,6 +31,8 @@ import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -78,6 +80,20 @@ public void testAvroRead() throws Exception { */ } + @Test + public void testOrcReadProjection() throws Exception { + innerTestProjection( + Collections.singletonMap("orc", prepareData(orc(), "orc")), + new int[] {0, 5, 10, 14}); + /* + * OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 + * Apple M1 Pro + * read: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------ + * OPERATORTEST_read_read-orc 716 / 728 4187.4 238.8 1.0X + */ + } + private Options orc() { Options options = new Options(); options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_ORC); @@ -97,6 +113,10 @@ private Options avro() { } private void innerTest(Map tables) { + innerTestProjection(tables, null); + } + + private void innerTestProjection(Map tables, @Nullable int[] projection) { int readTime = 3; Benchmark benchmark = new Benchmark("read", readTime * rowCount) @@ -115,7 +135,10 @@ private void innerTest(Map tables) { try { for (Split split : splits) { RecordReader reader = - table.newReadBuilder().newRead().createReader(split); + table.newReadBuilder() + .withProjection(projection) + .newRead() + .createReader(split); reader.forEachRemaining(row -> readCount.incrementAndGet()); } System.out.printf("Finish read %d rows.\n", readCount.get()); diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java index f7637c822327..556453424e31 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java @@ -21,10 +21,17 @@ import org.apache.paimon.catalog.CatalogContext; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory; /** Utils for {@link FileIO}. */ public class FileIOUtils { + public static final ExecutorService IO_THREAD_POOL = + Executors.newCachedThreadPool(newDaemonThreadFactory("IO-THREAD-POOL")); + public static FileIOLoader checkAccess(FileIOLoader fileIO, Path path, CatalogContext config) throws IOException { if (fileIO == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java new file mode 100644 index 000000000000..2c85ae50067c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import java.util.concurrent.CompletableFuture; + +/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** A byte range of a file. */ +public interface FileRange { + + /** Get the starting offset of the range. */ + long getOffset(); + + /** Get the length of the range. */ + int getLength(); + + /** Get the future data for this range. */ + CompletableFuture getData(); + + /** + * Factory method to create a FileRange object. + * + * @param offset starting offset of the range. + * @param length length of the range. + * @return a new instance of FileRangeImpl. + */ + static FileRange createFileRange(long offset, int length) { + return new FileRangeImpl(offset, length); + } + + /** An implementation for {@link FileRange}. */ + class FileRangeImpl implements FileRange { + + private final long offset; + private final int length; + private final CompletableFuture reader; + + public FileRangeImpl(long offset, int length) { + this.offset = offset; + this.length = length; + this.reader = new CompletableFuture<>(); + } + + @Override + public String toString() { + return "range[" + offset + "," + (offset + length) + ")"; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public int getLength() { + return length; + } + + @Override + public CompletableFuture getData() { + return reader; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java new file mode 100644 index 000000000000..de7974aca1a2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.utils.BlockingExecutor; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; +import static org.apache.paimon.fs.FileIOUtils.IO_THREAD_POOL; +import static org.apache.paimon.fs.FileRange.createFileRange; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Utils for {@link VectoredReadable}. */ +public class VectoredReadUtils { + + public static void readVectored(VectoredReadable readable, List ranges) + throws IOException { + if (ranges.isEmpty()) { + return; + } + + List combinedRanges = + mergeSortedRanges(validateAndSortRanges(ranges), readable.minSeekForVectorReads()); + + int parallelism = readable.parallelismForVectorReads(); + BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL, parallelism); + long batchSize = readable.batchSizeForVectorReads(); + for (CombinedRange combinedRange : combinedRanges) { + if (combinedRange.underlying.size() == 1) { + FileRange fileRange = combinedRange.underlying.get(0); + executor.submit(() -> readSingleRange(readable, fileRange)); + } else { + List splitBatches = combinedRange.splitBatches(batchSize, parallelism); + splitBatches.forEach( + range -> executor.submit(() -> readSingleRange(readable, range))); + List> futures = + splitBatches.stream().map(FileRange::getData).collect(Collectors.toList()); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenAcceptAsync( + unused -> copyToFileRanges(combinedRange, futures), IO_THREAD_POOL); + } + } + } + + private static void readSingleRange(VectoredReadable readable, FileRange range) { + if (range.getLength() == 0) { + range.getData().complete(new byte[0]); + return; + } + try { + long position = range.getOffset(); + int length = range.getLength(); + byte[] buffer = new byte[length]; + readable.preadFully(position, buffer, 0, length); + range.getData().complete(buffer); + } catch (Exception ex) { + range.getData().completeExceptionally(ex); + } + } + + private static void copyToFileRanges( + CombinedRange combinedRange, List> futures) { + List segments = new ArrayList<>(futures.size()); + for (CompletableFuture future : futures) { + segments.add(future.join()); + } + long offset = combinedRange.offset; + for (FileRange fileRange : combinedRange.underlying) { + byte[] buffer = new byte[fileRange.getLength()]; + copyMultiBytesToBytes( + segments, + (int) (fileRange.getOffset() - offset), + buffer, + fileRange.getLength()); + fileRange.getData().complete(buffer); + } + } + + private static void copyMultiBytesToBytes( + List segments, int offset, byte[] bytes, int numBytes) { + int remainSize = numBytes; + for (byte[] segment : segments) { + int remain = segment.length - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + System.arraycopy(segment, offset, bytes, numBytes - remainSize, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + private static List validateAndSortRanges( + final List input) throws EOFException { + requireNonNull(input, "Null input list"); + checkArgument(!input.isEmpty(), "Empty input list"); + final List sortedRanges; + + if (input.size() == 1) { + validateRangeRequest(input.get(0)); + sortedRanges = input; + } else { + sortedRanges = sortRanges(input); + FileRange prev = null; + for (final FileRange current : sortedRanges) { + validateRangeRequest(current); + if (prev != null) { + checkArgument( + current.getOffset() >= prev.getOffset() + prev.getLength(), + "Overlapping ranges %s and %s", + prev, + current); + } + prev = current; + } + } + return sortedRanges; + } + + private static void validateRangeRequest(FileRange range) throws EOFException { + requireNonNull(range, "range is null"); + checkArgument(range.getLength() >= 0, "length is negative in %s", range); + if (range.getOffset() < 0) { + throw new EOFException("position is negative in range " + range); + } + } + + private static List sortRanges(List input) { + List ret = new ArrayList<>(input); + ret.sort(Comparator.comparingLong(FileRange::getOffset)); + return ret; + } + + private static List mergeSortedRanges( + List sortedRanges, int minimumSeek) { + + CombinedRange current = null; + List result = new ArrayList<>(sortedRanges.size()); + + // now merge together the ones that merge + for (FileRange range : sortedRanges) { + long start = range.getOffset(); + long end = range.getOffset() + range.getLength(); + if (current == null || !current.merge(start, end, range, minimumSeek)) { + current = new CombinedRange(start, end, range); + result.add(current); + } + } + return result; + } + + private static class CombinedRange { + + private final List underlying = new ArrayList<>(); + private final long offset; + + private int length; + private long dataSize; + + public CombinedRange(long offset, long end, FileRange original) { + this.offset = offset; + this.length = (int) (end - offset); + append(original); + } + + private void append(final FileRange range) { + this.underlying.add(range); + dataSize += range.getLength(); + } + + public boolean merge(long otherOffset, long otherEnd, FileRange other, int minSeek) { + long end = offset + length; + long newEnd = Math.max(end, otherEnd); + if (otherOffset - end >= minSeek) { + return false; + } + this.length = (int) (newEnd - offset); + append(other); + return true; + } + + private List splitBatches(long batchSize, int parallelism) { + long expectedSize = Math.max(batchSize, (length / parallelism) + 1); + List splitBatches = new ArrayList<>(); + long offset = this.offset; + long end = offset + length; + + // split only when remain size exceeds twice the batchSize to avoid small File IO + long minRemain = Math.max(expectedSize, batchSize * 2); + + while (true) { + if (end < offset + minRemain) { + int currentLen = (int) (end - offset); + if (currentLen > 0) { + splitBatches.add(createFileRange(offset, currentLen)); + } + break; + } else { + splitBatches.add(createFileRange(offset, (int) expectedSize)); + offset += expectedSize; + } + } + return splitBatches; + } + + @Override + public String toString() { + return String.format( + "CombinedRange: range count=%d, data size=%,d", underlying.size(), dataSize); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java new file mode 100644 index 000000000000..a3d3faace735 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** Stream that permits vectored reading. */ +public interface VectoredReadable { + + /** + * Read up to the specified number of bytes, from a given position within a file, and return the + * number of bytes read. This does not change the current offset of a file, and is thread-safe. + */ + int pread(long position, byte[] buffer, int offset, int length) throws IOException; + + /** + * Read the specified number of bytes fully, from a given position within a file. This does not + * change the current offset of a file, and is thread-safe. + */ + default void preadFully(long position, byte[] buffer, int offset, int length) + throws IOException { + int readBytes = 0; + while (readBytes < length) { + int readBytesCurr = pread(position, buffer, offset + readBytes, length - readBytes); + if (readBytesCurr < 0) { + throw new EOFException( + String.format( + "Input Stream closed before all bytes were read." + + " Expected %,d bytes but only read %,d bytes. Current position %,d", + length, readBytes, position)); + } + readBytes += readBytesCurr; + position += readBytesCurr; + } + } + + /** The smallest reasonable seek. */ + default int minSeekForVectorReads() { + return 256 * 1024; + } + + /** The batch size of data read by a single parallelism. */ + default int batchSizeForVectorReads() { + return 1024 * 1024; + } + + /** The read parallelism for vector reads. */ + default int parallelismForVectorReads() { + return 4; + } + + /** + * Read fully a list of file ranges asynchronously from this file. + * + *

As a result of the call, each range will have FileRange.setData(CompletableFuture) called + * with a future that when complete will have a ByteBuffer with the data from the file's range. + */ + default void readVectored(List ranges) throws IOException { + VectoredReadUtils.readVectored(this, ranges); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java index 55d264f1c480..82d8145a2927 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java @@ -24,12 +24,14 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.VectoredReadable; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.AccessDeniedException; import java.nio.file.DirectoryNotEmptyException; @@ -233,7 +235,8 @@ public File toFile(Path path) { } /** Local {@link SeekableInputStream}. */ - public static class LocalSeekableInputStream extends SeekableInputStream { + public static class LocalSeekableInputStream extends SeekableInputStream + implements VectoredReadable { private final FileInputStream in; private final FileChannel channel; @@ -269,6 +272,15 @@ public int read(byte[] b, int off, int len) throws IOException { public void close() throws IOException { in.close(); } + + @Override + public int pread(long position, byte[] b, int off, int len) throws IOException { + if (len == 0) { + return 0; + } + + return channel.read(ByteBuffer.wrap(b, off, len), position); + } } /** Local {@link PositionOutputStream}. */ diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java new file mode 100644 index 000000000000..142312ef4155 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; + +/** A executor wrapper to execute with {@link Semaphore}. */ +public class BlockingExecutor { + + private final Semaphore semaphore; + private final ExecutorService executor; + + public BlockingExecutor(ExecutorService executor, int permits) { + this.semaphore = new Semaphore(permits, true); + this.executor = executor; + } + + public void submit(Runnable task) { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + executor.submit( + () -> { + try { + task.run(); + } finally { + semaphore.release(); + } + }); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java index 25779892b071..5479f0653dde 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java @@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.util.Arrays; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** Utils for thread. */ @@ -54,4 +56,38 @@ public static boolean stackContains(String name) { } return false; } + + public static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + }; + } + + private static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = + (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + + return new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = prefix + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java new file mode 100644 index 000000000000..a3264b08e20c --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +class VectoredReadUtilsTest { + + private final byte[] bytes; + private final VectoredReadable readable; + + public VectoredReadUtilsTest() { + this.bytes = new byte[1024 * 1024]; + ThreadLocalRandom random = ThreadLocalRandom.current(); + random.nextBytes(bytes); + this.readable = + new VectoredReadable() { + @Override + public int minSeekForVectorReads() { + return 100; + } + + @Override + public int batchSizeForVectorReads() { + return 1000; + } + + @Override + public int pread(long position, byte[] buffer, int offset, int length) + throws IOException { + boolean returnAll = random.nextBoolean(); + int len = returnAll ? length : random.nextInt(length) + 1; + System.arraycopy(bytes, (int) position, buffer, offset, len); + return len; + } + }; + } + + private void doTest(List ranges) throws Exception { + VectoredReadUtils.readVectored(readable, ranges); + for (FileRange range : ranges) { + byte[] expected = new byte[range.getLength()]; + System.arraycopy(bytes, (int) range.getOffset(), expected, 0, range.getLength()); + assertThat(range.getData().get()).isEqualTo(expected); + } + } + + @Test + public void testNormal() throws Exception { + // test empty + doTest(Collections.emptyList()); + + // test without merge + doTest( + Arrays.asList( + FileRange.createFileRange(0, 100), + FileRange.createFileRange(100, 200), + FileRange.createFileRange(500, 1000))); + + // test with merge + doTest( + Arrays.asList( + FileRange.createFileRange(0, 60), + FileRange.createFileRange(100, 90), + FileRange.createFileRange(300, 200))); + + // test with batchSize + doTest( + Arrays.asList( + FileRange.createFileRange(60, 800), + FileRange.createFileRange(1000, 500), + FileRange.createFileRange(1550, 600))); + + // test with align huge + doTest( + Arrays.asList( + FileRange.createFileRange(0, 5000), + FileRange.createFileRange(6000, 500), + FileRange.createFileRange(7000, 800))); + + // test with no align huge + doTest( + Arrays.asList( + FileRange.createFileRange(60, 5120), + FileRange.createFileRange(6020, 520), + FileRange.createFileRange(7300, 850))); + } + + @Test + public void testRandom() throws Exception { + List ranges = new ArrayList<>(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + int lastEnd = 0; + for (int i = 0; i < random.nextInt(10); i++) { + int start = random.nextInt(102 * 1024) + lastEnd; + int len = random.nextInt(102 * 1024) + 1; + if (start + len > bytes.length) { + break; + } + ranges.add(FileRange.createFileRange(start, len)); + lastEnd = start + len; + } + doTest(ranges); + } +} diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java new file mode 100644 index 000000000000..0ceaadb449d7 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.paimon.format.fs.FSDataWrappedInputStream; +import org.apache.paimon.fs.FileRange; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.VectoredReadable; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.orc.CompressionCodec; +import org.apache.orc.DataReader; +import org.apache.orc.OrcProto; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/* This file is based on source code from the ORC Project (http://orc.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl. + * + *

NOTE: The file was copied and modified to support {@link VectoredReadable}. + */ +public class RecordReaderUtils { + + // for uncompressed streams, what is the most overlap with the following set + // of rows (long vint literal group). + static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; + // the maximum number of values that need to be consumed from the run + static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE; + // the maximum byte width for each value + static final int MAX_BYTE_WIDTH = + SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) + / 8; + private static final HadoopShims SHIMS = HadoopShimsFactory.get(); + private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class); + private static final int BYTE_STREAM_POSITIONS = 1; + private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; + private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; + private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1; + + public static DataReader createDefaultDataReader(DataReaderProperties properties) { + return new DefaultDataReader(properties); + } + + /** + * Does region A overlap region B? The end points are inclusive on both sides. + * + * @param leftA A's left point + * @param rightA A's right point + * @param leftB B's left point + * @param rightB B's right point + * @return Does region A overlap region B? + */ + static boolean overlap(long leftA, long rightA, long leftB, long rightB) { + if (leftA <= leftB) { + return rightA >= leftB; + } + return rightB >= leftA; + } + + public static long estimateRgEndOffset( + boolean isCompressed, + int bufferSize, + boolean isLast, + long nextGroupOffset, + long streamLength) { + // Figure out the worst case last location + long slop = WORST_UNCOMPRESSED_SLOP; + // Stretch the slop by a factor to safely accommodate following compression blocks. + // We need to calculate the maximum number of blocks(stretchFactor) by bufferSize + // accordingly. + if (isCompressed) { + int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize; + slop = (long) stretchFactor * (OutStream.HEADER_SIZE + bufferSize); + } + return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); + } + + /** + * Get the offset in the index positions for the column that the given stream starts. + * + * @param columnEncoding the encoding of the column + * @param columnType the type of the column + * @param streamType the kind of the stream + * @param isCompressed is the stream compressed? + * @param hasNulls does the column have a PRESENT stream? + * @return the number of positions that will be used for that stream + */ + public static int getIndexPosition( + OrcProto.ColumnEncoding.Kind columnEncoding, + TypeDescription.Category columnType, + OrcProto.Stream.Kind streamType, + boolean isCompressed, + boolean hasNulls) { + if (streamType == OrcProto.Stream.Kind.PRESENT) { + return 0; + } + int compressionValue = isCompressed ? 1 : 0; + int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; + switch (columnType) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case STRUCT: + case MAP: + case LIST: + case UNION: + return base; + case CHAR: + case VARCHAR: + case STRING: + if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY + || columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + return base; + } else { + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } else { + return base + BYTE_STREAM_POSITIONS + compressionValue; + } + } + case BINARY: + case DECIMAL: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case TIMESTAMP: + case TIMESTAMP_INSTANT: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + RUN_LENGTH_INT_POSITIONS + compressionValue; + default: + throw new IllegalArgumentException("Unknown type " + columnType); + } + } + + /** + * Is this stream part of a dictionary? + * + * @return is this part of a dictionary? + */ + public static boolean isDictionary( + OrcProto.Stream.Kind kind, OrcProto.ColumnEncoding encoding) { + assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; + OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA + || (kind == OrcProto.Stream.Kind.LENGTH + && (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY + || encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); + } + + /** + * Build a string representation of a list of disk ranges. + * + * @param range ranges to stringify + * @return the resulting string + */ + public static String stringifyDiskRanges(DiskRangeList range) { + StringBuilder buffer = new StringBuilder(); + buffer.append("["); + boolean isFirst = true; + while (range != null) { + if (!isFirst) { + buffer.append(", {"); + } else { + buffer.append("{"); + } + isFirst = false; + buffer.append(range); + buffer.append("}"); + range = range.next; + } + buffer.append("]"); + return buffer.toString(); + } + + static long computeEnd(BufferChunk first, BufferChunk last) { + long end = 0; + for (BufferChunk ptr = first; ptr != last.next; ptr = (BufferChunk) ptr.next) { + end = Math.max(ptr.getEnd(), end); + } + return end; + } + + /** + * Zero-copy read the data from the file based on a list of ranges in a single read. + * + *

As a side note, the HDFS zero copy API really sucks from a user's point of view. + * + * @param file the file we're reading from + * @param zcr the zero copy shim + * @param first the first range to read + * @param last the last range to read + * @param allocateDirect if we need to allocate buffers, should we use direct + * @throws IOException + */ + static void zeroCopyReadRanges( + FSDataInputStream file, + HadoopShims.ZeroCopyReaderShim zcr, + BufferChunk first, + BufferChunk last, + boolean allocateDirect) + throws IOException { + // read all of the bytes that we need + final long offset = first.getOffset(); + int length = (int) (computeEnd(first, last) - offset); + file.seek(offset); + List bytes = new ArrayList<>(); + while (length > 0) { + ByteBuffer read = zcr.readBuffer(length, false); + bytes.add(read); + length -= read.remaining(); + } + long currentOffset = offset; + + // iterate and fill each range + BufferChunk current = first; + Iterator buffers = bytes.iterator(); + ByteBuffer currentBuffer = buffers.next(); + while (current != last.next) { + + // if we are past the start of the range, restart the iterator + if (current.getOffset() < offset) { + buffers = bytes.iterator(); + currentBuffer = buffers.next(); + currentOffset = offset; + } + + // walk through the buffers to find the start of the buffer + while (currentOffset + currentBuffer.remaining() <= current.getOffset()) { + currentOffset += currentBuffer.remaining(); + // We assume that buffers.hasNext is true because we know we read + // enough data to cover the last range. + currentBuffer = buffers.next(); + } + + // did we get the current range in a single read? + if (currentOffset + currentBuffer.remaining() >= current.getEnd()) { + ByteBuffer copy = currentBuffer.duplicate(); + copy.position((int) (current.getOffset() - currentOffset)); + copy.limit(copy.position() + current.getLength()); + current.setChunk(copy); + + } else { + // otherwise, build a single buffer that holds the entire range + ByteBuffer result = + allocateDirect + ? ByteBuffer.allocateDirect(current.getLength()) + : ByteBuffer.allocate(current.getLength()); + // we know that the range spans buffers + ByteBuffer copy = currentBuffer.duplicate(); + // skip over the front matter + copy.position((int) (current.getOffset() - currentOffset)); + result.put(copy); + // advance the buffer + currentOffset += currentBuffer.remaining(); + currentBuffer = buffers.next(); + while (result.hasRemaining()) { + if (result.remaining() > currentBuffer.remaining()) { + result.put(currentBuffer.duplicate()); + currentOffset += currentBuffer.remaining(); + currentBuffer = buffers.next(); + } else { + copy = currentBuffer.duplicate(); + copy.limit(result.remaining()); + result.put(copy); + } + } + result.flip(); + current.setChunk(result); + } + current = (BufferChunk) current.next; + } + } + + /** + * Find the list of ranges that should be read in a single read. The read will stop when there + * is a gap, one of the ranges already has data, or we have reached the maximum read size of + * 2^31. + * + * @param first the first range to read + * @return the last range to read + */ + static BufferChunk findSingleRead(BufferChunk first) { + return findSingleRead(first, 0); + } + + /** + * Find the list of ranges that should be read in a single read. The read will stop when there + * is a gap, one of the ranges already has data, or we have reached the maximum read size of + * 2^31. + * + * @param first the first range to read + * @param minSeekSize minimum size for seek instead of read + * @return the last range to read + */ + private static BufferChunk findSingleRead(BufferChunk first, long minSeekSize) { + BufferChunk last = first; + long currentEnd = first.getEnd(); + while (last.next != null + && !last.next.hasData() + && last.next.getOffset() <= (currentEnd + minSeekSize) + && last.next.getEnd() - first.getOffset() < Integer.MAX_VALUE) { + last = (BufferChunk) last.next; + currentEnd = Math.max(currentEnd, last.getEnd()); + } + return last; + } + + /** + * Read the list of ranges from the file by updating each range in the list with a buffer that + * has the bytes from the file. + * + *

The ranges must be sorted, but may overlap or include holes. + * + * @param file the file to read + * @param zcr the zero copy shim + * @param list the disk ranges within the file to read + * @param doForceDirect allocate direct buffers + * @param minSeekSize the minimum gap to prefer seek vs read + * @param minSeekSizeTolerance allowed tolerance for extra bytes in memory as a result of + * minSeekSize + */ + private static void readDiskRanges( + FSDataInputStream file, + HadoopShims.ZeroCopyReaderShim zcr, + BufferChunkList list, + boolean doForceDirect, + int minSeekSize, + double minSeekSizeTolerance) + throws IOException { + BufferChunk current = list == null ? null : list.get(); + while (current != null) { + while (current.hasData()) { + current = (BufferChunk) current.next; + } + if (zcr != null) { + BufferChunk last = findSingleRead(current); + zeroCopyReadRanges(file, zcr, current, last, doForceDirect); + current = (BufferChunk) last.next; + } else { + ChunkReader chunkReader = ChunkReader.create(current, minSeekSize); + chunkReader.readRanges(file, doForceDirect, minSeekSizeTolerance); + current = (BufferChunk) chunkReader.to.next; + } + } + } + + /** Read the list of ranges from the file by updating each range in the list. */ + private static void readDiskRangesVectored( + VectoredReadable fileInputStream, BufferChunkList range, boolean doForceDirect) + throws IOException { + if (range == null) { + return; + } + + if (doForceDirect) { + throw new UnsupportedOperationException(); + } + + List fileRanges = new ArrayList<>(); + Map map = new HashMap<>(); + BufferChunk cur = range.get(); + while (cur != null) { + if (!cur.hasData()) { + FileRange fileRange = FileRange.createFileRange(cur.getOffset(), cur.getLength()); + fileRanges.add(fileRange); + map.put(fileRange, cur); + } + cur = (BufferChunk) cur.next; + } + fileInputStream.readVectored(fileRanges); + + for (FileRange r : fileRanges) { + cur = map.get(r); + try { + cur.setChunk(ByteBuffer.wrap(r.getData().get())); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + static HadoopShims.ZeroCopyReaderShim createZeroCopyShim( + FSDataInputStream file, CompressionCodec codec, ByteBufferAllocatorPool pool) + throws IOException { + if ((codec == null + || ((codec instanceof DirectDecompressionCodec) + && ((DirectDecompressionCodec) codec).isAvailable()))) { + /* codec is null or is available */ + return SHIMS.getZeroCopyReader(file, pool); + } + return null; + } + + private static class DefaultDataReader implements DataReader { + private final Supplier fileSystemSupplier; + private final Path path; + private final boolean useZeroCopy; + private final int minSeekSize; + private final double minSeekSizeTolerance; + private FSDataInputStream file; + private ByteBufferAllocatorPool pool; + private HadoopShims.ZeroCopyReaderShim zcr = null; + private InStream.StreamOptions options; + private boolean isOpen = false; + + private DefaultDataReader(DataReaderProperties properties) { + this.fileSystemSupplier = properties.getFileSystemSupplier(); + this.path = properties.getPath(); + this.file = properties.getFile(); + this.useZeroCopy = properties.getZeroCopy(); + this.options = properties.getCompression(); + this.minSeekSize = properties.getMinSeekSize(); + this.minSeekSizeTolerance = properties.getMinSeekSizeTolerance(); + } + + @Override + public void open() throws IOException { + if (file == null) { + this.file = fileSystemSupplier.get().open(path); + } + if (useZeroCopy) { + // ZCR only uses codec for boolean checks. + pool = new ByteBufferAllocatorPool(); + zcr = RecordReaderUtils.createZeroCopyShim(file, options.getCodec(), pool); + } else { + zcr = null; + } + isOpen = true; + } + + @Override + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { + if (!isOpen) { + open(); + } + long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); + int tailLength = (int) stripe.getFooterLength(); + + // read the footer + ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); + file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); + return OrcProto.StripeFooter.parseFrom( + InStream.createCodedInputStream( + InStream.create( + "footer", + new BufferChunk(tailBuf, 0), + 0, + tailLength, + options))); + } + + @Override + public BufferChunkList readFileData(BufferChunkList range, boolean doForceDirect) + throws IOException { + SeekableInputStream wrapped = + ((FSDataWrappedInputStream) file.getWrappedStream()).wrapped(); + if (zcr == null && wrapped instanceof VectoredReadable) { + RecordReaderUtils.readDiskRangesVectored( + (VectoredReadable) wrapped, range, doForceDirect); + } else { + RecordReaderUtils.readDiskRanges( + file, zcr, range, doForceDirect, minSeekSize, minSeekSizeTolerance); + } + + return range; + } + + @Override + public void close() throws IOException { + if (options.getCodec() != null) { + OrcCodecPool.returnCodec(options.getCodec().getKind(), options.getCodec()); + options.withCodec(null); + } + if (pool != null) { + pool.clear(); + } + // close both zcr and file + try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { + if (file != null) { + file.close(); + file = null; + } + } + } + + @Override + public boolean isTrackingDiskRanges() { + return zcr != null; + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + zcr.releaseBuffer(buffer); + } + + @Override + public DataReader clone() { + if (this.file != null) { + // We should really throw here, but that will cause failures in Hive. + // While Hive uses clone, just log a warning. + LOG.warn( + "Cloning an opened DataReader; the stream will be reused and closed twice"); + } + try { + DefaultDataReader clone = (DefaultDataReader) super.clone(); + if (options.getCodec() != null) { + // Make sure we don't share the same codec between two readers. + clone.options = options.clone(); + } + return clone; + } catch (CloneNotSupportedException e) { + throw new UnsupportedOperationException("uncloneable", e); + } + } + + @Override + public InStream.StreamOptions getCompressionOptions() { + return options; + } + } + + /** + * this is an implementation copied from ElasticByteBufferPool in hadoop-2, which lacks a + * clear()/clean() operation. + */ + public static final class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim { + private final TreeMap buffers = new TreeMap<>(); + private final TreeMap directBuffers = new TreeMap<>(); + private long currentGeneration = 0; + + private TreeMap getBufferTree(boolean direct) { + return direct ? directBuffers : buffers; + } + + public void clear() { + buffers.clear(); + directBuffers.clear(); + } + + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + TreeMap tree = getBufferTree(direct); + Map.Entry entry = tree.ceilingEntry(new Key(length, 0)); + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length); + } + tree.remove(entry.getKey()); + return entry.getValue(); + } + + @Override + public void putBuffer(ByteBuffer buffer) { + TreeMap tree = getBufferTree(buffer.isDirect()); + Key key; + + // Buffers are indexed by (capacity, generation). + // If our key is not unique on the first try, try again + do { + key = new Key(buffer.capacity(), currentGeneration++); + } while (tree.putIfAbsent(key, buffer) != null); + } + + private static final class Key implements Comparable { + private final int capacity; + private final long insertionGeneration; + + Key(int capacity, long insertionGeneration) { + this.capacity = capacity; + this.insertionGeneration = insertionGeneration; + } + + @Override + public int compareTo(Key other) { + final int c = Integer.compare(capacity, other.capacity); + return (c != 0) ? c : Long.compare(insertionGeneration, other.insertionGeneration); + } + + @Override + public boolean equals(Object rhs) { + if (rhs instanceof Key) { + Key o = (Key) rhs; + return 0 == compareTo(o); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(capacity) + .append(insertionGeneration) + .toHashCode(); + } + } + } + + static class ChunkReader { + private final BufferChunk from; + private final BufferChunk to; + private final int readBytes; + private final int reqBytes; + + private ChunkReader(BufferChunk from, BufferChunk to, int readSize, int reqBytes) { + this.from = from; + this.to = to; + this.readBytes = readSize; + this.reqBytes = reqBytes; + } + + static ChunkReader create(BufferChunk from, BufferChunk to) { + long f = Integer.MAX_VALUE; + long e = Integer.MIN_VALUE; + + long cf = Integer.MAX_VALUE; + long ef = Integer.MIN_VALUE; + int reqBytes = 0; + + BufferChunk current = from; + while (current != to.next) { + f = Math.min(f, current.getOffset()); + e = Math.max(e, current.getEnd()); + if (ef == Integer.MIN_VALUE || current.getOffset() <= ef) { + cf = Math.min(cf, current.getOffset()); + ef = Math.max(ef, current.getEnd()); + } else { + reqBytes += ef - cf; + cf = current.getOffset(); + ef = current.getEnd(); + } + current = (BufferChunk) current.next; + } + reqBytes += ef - cf; + return new ChunkReader(from, to, (int) (e - f), reqBytes); + } + + static ChunkReader create(BufferChunk from, int minSeekSize) { + BufferChunk to = findSingleRead(from, minSeekSize); + return create(from, to); + } + + double getExtraBytesFraction() { + return (readBytes - reqBytes) / ((double) reqBytes); + } + + public int getReadBytes() { + return readBytes; + } + + public int getReqBytes() { + return reqBytes; + } + + public BufferChunk getFrom() { + return from; + } + + public BufferChunk getTo() { + return to; + } + + void populateChunks(ByteBuffer bytes, boolean allocateDirect, double extraByteTolerance) { + if (getExtraBytesFraction() > extraByteTolerance) { + LOG.debug( + "ExtraBytesFraction = {}, ExtraByteTolerance = {}, reducing memory size", + getExtraBytesFraction(), + extraByteTolerance); + populateChunksReduceSize(bytes, allocateDirect); + } else { + LOG.debug( + "ExtraBytesFraction = {}, ExtraByteTolerance = {}, populating as is", + getExtraBytesFraction(), + extraByteTolerance); + populateChunksAsIs(bytes); + } + } + + void populateChunksAsIs(ByteBuffer bytes) { + // populate each BufferChunks with the data + BufferChunk current = from; + long offset = from.getOffset(); + while (current != to.next) { + ByteBuffer currentBytes = current == to ? bytes : bytes.duplicate(); + currentBytes.position((int) (current.getOffset() - offset)); + currentBytes.limit((int) (current.getEnd() - offset)); + current.setChunk(currentBytes); + current = (BufferChunk) current.next; + } + } + + void populateChunksReduceSize(ByteBuffer bytes, boolean allocateDirect) { + ByteBuffer newBuffer; + if (allocateDirect) { + newBuffer = ByteBuffer.allocateDirect(reqBytes); + newBuffer.position(reqBytes); + newBuffer.flip(); + } else { + byte[] newBytes = new byte[reqBytes]; + newBuffer = ByteBuffer.wrap(newBytes); + } + + final long offset = from.getOffset(); + int copyStart = 0; + int copyEnd; + int copyLength; + int skippedBytes = 0; + int srcPosition; + BufferChunk current = from; + while (current != to.next) { + // We can skip bytes as required, but no need to copy bytes that are already copied + srcPosition = (int) (current.getOffset() - offset); + skippedBytes += Math.max(0, srcPosition - copyStart); + copyStart = Math.max(copyStart, srcPosition); + copyEnd = (int) (current.getEnd() - offset); + copyLength = copyStart < copyEnd ? copyEnd - copyStart : 0; + newBuffer.put(bytes.array(), copyStart, copyLength); + copyStart += copyLength; + // Set up new ByteBuffer that wraps on the same backing array + ByteBuffer currentBytes = current == to ? newBuffer : newBuffer.duplicate(); + currentBytes.position(srcPosition - skippedBytes); + currentBytes.limit(currentBytes.position() + current.getLength()); + current.setChunk(currentBytes); + current = (BufferChunk) current.next; + } + } + + /** + * Read the data from the file based on a list of ranges in a single read. + * + * @param file the file to read from + * @param allocateDirect should we use direct buffers + */ + void readRanges(FSDataInputStream file, boolean allocateDirect, double extraByteTolerance) + throws IOException { + // assume that the chunks are sorted by offset + long offset = from.getOffset(); + int readSize = (int) (computeEnd(from, to) - offset); + byte[] buffer = new byte[readSize]; + try { + file.readFully(offset, buffer, 0, buffer.length); + } catch (IOException e) { + throw new IOException( + String.format("Failed while reading %s %d:%d", file, offset, buffer.length), + e); + } + + // get the data into a ByteBuffer + ByteBuffer bytes; + if (allocateDirect) { + bytes = ByteBuffer.allocateDirect(readSize); + bytes.put(buffer); + bytes.flip(); + } else { + bytes = ByteBuffer.wrap(buffer); + } + + // populate each BufferChunks with the data + populateChunks(bytes, allocateDirect, extraByteTolerance); + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java new file mode 100644 index 000000000000..576fd8afedf6 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.fs; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.utils.IOUtils; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.IOException; +import java.io.InputStream; + +/** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's input streams. */ +public class FSDataWrappedInputStream extends InputStream implements Seekable, PositionedReadable { + + private final SeekableInputStream seekableInputStream; + + public FSDataWrappedInputStream(SeekableInputStream seekableInputStream) { + this.seekableInputStream = seekableInputStream; + } + + public SeekableInputStream wrapped() { + return seekableInputStream; + } + + @Override + public int read() throws IOException { + return seekableInputStream.read(); + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + seekableInputStream.seek(position); + return seekableInputStream.read(buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + seekableInputStream.seek(position); + IOUtils.readFully(seekableInputStream, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void seek(long pos) throws IOException { + seekableInputStream.seek(pos); + } + + @Override + public long getPos() throws IOException { + return seekableInputStream.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) { + return false; + } + + @Override + public void close() throws IOException { + seekableInputStream.close(); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java index 1dcc0da008c7..57454368cfca 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java @@ -19,21 +19,16 @@ package org.apache.paimon.format.fs; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.SeekableInputStream; -import org.apache.paimon.utils.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import java.io.IOException; -import java.io.InputStream; import java.net.URI; /** A read only {@link FileSystem} that wraps an {@link FileIO}. */ @@ -138,58 +133,4 @@ public Path getWorkingDirectory() { public boolean mkdirs(Path path, FsPermission fsPermission) { throw new UnsupportedOperationException(); } - - /** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's input streams. */ - private static class FSDataWrappedInputStream extends InputStream - implements Seekable, PositionedReadable { - - private final SeekableInputStream seekableInputStream; - - private FSDataWrappedInputStream(SeekableInputStream seekableInputStream) { - this.seekableInputStream = seekableInputStream; - } - - @Override - public int read() throws IOException { - return seekableInputStream.read(); - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) throws IOException { - seekableInputStream.seek(position); - return seekableInputStream.read(buffer, offset, length); - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) - throws IOException { - seekableInputStream.seek(position); - IOUtils.readFully(seekableInputStream, buffer, offset, length); - } - - @Override - public void readFully(long position, byte[] buffer) throws IOException { - readFully(position, buffer, 0, buffer.length); - } - - @Override - public void seek(long pos) throws IOException { - seekableInputStream.seek(pos); - } - - @Override - public long getPos() throws IOException { - return seekableInputStream.getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) { - return false; - } - - @Override - public void close() throws IOException { - seekableInputStream.close(); - } - } }