confList = new ArrayList<>();
+ if (StringUtils.isEmpty(value)) {
+ return confList;
+ }
+ for (String str : value.split(",")) {
+ String trimStr = StringUtils.trim(str);
+ if (StringUtils.isNotEmpty(trimStr)) {
+ confList.add(trimStr);
+ }
+ }
+ return confList;
+ }
+
+ public void setString(Configuration conf, String value) {
+ conf.set(attribute, value);
+ }
+
+ public boolean getBoolean(Properties tbl, Configuration conf) {
+ String value = lookupValue(tbl, conf);
+ if (value != null) {
+ return Boolean.parseBoolean(value);
+ }
+ return (Boolean) defaultValue;
+ }
+
+ public boolean getBoolean(Configuration conf) {
+ return getBoolean(null, conf);
+ }
+
+ public void setBoolean(Configuration conf, boolean value) {
+ conf.setBoolean(attribute, value);
+ }
+
+ public double getDouble(Properties tbl, Configuration conf) {
+ String value = lookupValue(tbl, conf);
+ if (value != null) {
+ return Double.parseDouble(value);
+ }
+ return ((Number) defaultValue).doubleValue();
+ }
+
+ public double getDouble(Configuration conf) {
+ return getDouble(null, conf);
+ }
+
+ public void setDouble(Configuration conf, double value) {
+ conf.setDouble(attribute, value);
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/orc/OrcFile.java b/paimon-format/src/main/java/org/apache/orc/OrcFile.java
new file mode 100644
index 000000000000..a903ba9e7545
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/OrcFile.java
@@ -0,0 +1,1336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.HadoopShimsFactory;
+import org.apache.orc.impl.KeyProvider;
+import org.apache.orc.impl.MemoryManagerImpl;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.WriterInternal;
+import org.apache.orc.impl.writer.WriterImplV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Contains factory methods to read or write ORC files. */
+public class OrcFile {
+ private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class);
+ public static final String MAGIC = "ORC";
+
+ /**
+ * Create a version number for the ORC file format, so that we can add non-forward compatible
+ * changes in the future. To make it easier for users to understand the version numbers, we use
+ * the Hive release number that first wrote that version of ORC files.
+ *
+ * Thus, if you add new encodings or other non-forward compatible changes to ORC files, which
+ * prevent the old reader from reading the new format, you should change these variable to
+ * reflect the next Hive release number. Non-forward compatible changes should never be added in
+ * patch releases.
+ *
+ *
Do not make any changes that break backwards compatibility, which would prevent the new
+ * reader from reading ORC files generated by any released version of Hive.
+ */
+ public enum Version {
+ V_0_11("0.11", 0, 11),
+ V_0_12("0.12", 0, 12),
+
+ /**
+ * Do not use this format except for testing. It will not be compatible with other versions
+ * of the software. While we iterate on the ORC 2.0 format, we will make incompatible format
+ * changes under this version without providing any forward or backward compatibility.
+ *
+ *
When 2.0 is released, this version identifier will be completely removed.
+ */
+ UNSTABLE_PRE_2_0("UNSTABLE-PRE-2.0", 1, 9999),
+
+ /** The generic identifier for all unknown versions. */
+ FUTURE("future", Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+ public static final Version CURRENT = V_0_12;
+
+ private final String name;
+ private final int major;
+ private final int minor;
+
+ Version(String name, int major, int minor) {
+ this.name = name;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public static Version byName(String name) {
+ for (Version version : values()) {
+ if (version.name.equals(name)) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ORC version " + name);
+ }
+
+ /** Get the human readable name for the version. */
+ public String getName() {
+ return name;
+ }
+
+ /** Get the major version number. */
+ public int getMajor() {
+ return major;
+ }
+
+ /** Get the minor version number. */
+ public int getMinor() {
+ return minor;
+ }
+ }
+
+ /** WriterImplementation Enum. */
+ public enum WriterImplementation {
+ /** ORC_JAVA. */
+ ORC_JAVA(0), // ORC Java writer
+ /** ORC_CPP. */
+ ORC_CPP(1), // ORC C++ writer
+ /** PRESTO. */
+ PRESTO(2), // Presto writer
+ /** SCRITCHLEY_GO. */
+ SCRITCHLEY_GO(3), // Go writer from https://github.com/scritchley/orc
+ /** TRINO. */
+ TRINO(4), // Trino writer
+ /** CUDF. */
+ CUDF(5), // CUDF writer
+ /** UNKNOWN. */
+ UNKNOWN(Integer.MAX_VALUE);
+
+ private final int id;
+
+ WriterImplementation(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public static WriterImplementation from(int id) {
+ WriterImplementation[] values = values();
+ if (id >= 0 && id < values.length - 1) {
+ return values[id];
+ }
+ return UNKNOWN;
+ }
+ }
+
+ /**
+ * Records the version of the writer in terms of which bugs have been fixed. When you fix bugs
+ * in the writer (or make substantial changes) that don't change the file format, add a new
+ * version here instead of Version.
+ *
+ *
The ids are assigned sequentially from 6 per a WriterImplementation so that readers that
+ * predate ORC-202 treat the other writers correctly.
+ */
+ public enum WriterVersion {
+ // Java ORC Writer.
+ ORIGINAL(WriterImplementation.ORC_JAVA, 0),
+ HIVE_8732(WriterImplementation.ORC_JAVA, 1),
+ /** fixed stripe/file maximum statistics and string statistics to use utf8 for min/max. */
+ HIVE_4243(WriterImplementation.ORC_JAVA, 2), // use real column names from Hive tables
+ HIVE_12055(WriterImplementation.ORC_JAVA, 3), // vectorized writer
+ HIVE_13083(WriterImplementation.ORC_JAVA, 4), // decimals write present stream correctly
+ ORC_101(WriterImplementation.ORC_JAVA, 5), // bloom filters use utf8
+ ORC_135(WriterImplementation.ORC_JAVA, 6), // timestamp stats use utc
+ ORC_517(WriterImplementation.ORC_JAVA, 7), // decimal64 min/max are fixed
+ ORC_203(WriterImplementation.ORC_JAVA, 8), // trim long strings & record they were trimmed
+ ORC_14(WriterImplementation.ORC_JAVA, 9), // column encryption added
+
+ // C++ ORC Writer
+ ORC_CPP_ORIGINAL(WriterImplementation.ORC_CPP, 6),
+
+ // Presto Writer
+ PRESTO_ORIGINAL(WriterImplementation.PRESTO, 6),
+
+ // Scritchley Go Writer
+ SCRITCHLEY_GO_ORIGINAL(WriterImplementation.SCRITCHLEY_GO, 6),
+
+ // Trino Writer
+ TRINO_ORIGINAL(WriterImplementation.TRINO, 6),
+
+ // CUDF Writer
+ CUDF_ORIGINAL(WriterImplementation.CUDF, 6),
+
+ // Don't use any magic numbers here except for the below:
+ FUTURE(WriterImplementation.UNKNOWN, Integer.MAX_VALUE); // a version from a future writer
+
+ private final int id;
+ private final WriterImplementation writer;
+
+ public WriterImplementation getWriterImplementation() {
+ return writer;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ WriterVersion(WriterImplementation writer, int id) {
+ this.writer = writer;
+ this.id = id;
+ }
+
+ private static final WriterVersion[][] values =
+ new WriterVersion[WriterImplementation.values().length][];
+
+ static {
+ for (WriterVersion v : WriterVersion.values()) {
+ WriterImplementation writer = v.writer;
+ if (writer != WriterImplementation.UNKNOWN) {
+ if (values[writer.id] == null) {
+ values[writer.id] = new WriterVersion[WriterVersion.values().length];
+ }
+ if (values[writer.id][v.id] != null) {
+ throw new IllegalArgumentException("Duplicate WriterVersion id " + v);
+ }
+ values[writer.id][v.id] = v;
+ }
+ }
+ }
+
+ /**
+ * Convert the integer from OrcProto.PostScript.writerVersion to the enumeration with
+ * unknown versions being mapped to FUTURE.
+ *
+ * @param writer the writer implementation
+ * @param val the serialized writer version
+ * @return the corresponding enumeration value
+ */
+ public static WriterVersion from(WriterImplementation writer, int val) {
+ if (writer == WriterImplementation.UNKNOWN) {
+ return FUTURE;
+ }
+ if (writer != WriterImplementation.ORC_JAVA && val < 6) {
+ throw new IllegalArgumentException(
+ "ORC File with illegal version " + val + " for writer " + writer);
+ }
+ WriterVersion[] versions = values[writer.id];
+ if (val < 0 || versions.length <= val) {
+ return FUTURE;
+ }
+ WriterVersion result = versions[val];
+ return result == null ? FUTURE : result;
+ }
+
+ /**
+ * Does this file include the given fix or come from a different writer?
+ *
+ * @param fix the required fix
+ * @return true if the required fix is present
+ */
+ public boolean includes(WriterVersion fix) {
+ return writer != fix.writer || id >= fix.id;
+ }
+ }
+
+ /** The WriterVersion for this version of the software. */
+ public static final WriterVersion CURRENT_WRITER = WriterVersion.ORC_14;
+
+ /** EncodingStrategy Enum. */
+ public enum EncodingStrategy {
+ /** SPEED. */
+ SPEED,
+ /** COMPRESSION. */
+ COMPRESSION
+ }
+
+ /** CompressionStrategy Enum. */
+ public enum CompressionStrategy {
+ /** SPEED. */
+ SPEED,
+ /** COMPRESSION. */
+ COMPRESSION
+ }
+
+ // unused
+ protected OrcFile() {}
+
+ /** Orc ReaderOptions. */
+ public static class ReaderOptions {
+ private final Configuration conf;
+ private FileSystem filesystem;
+ private long maxLength = Long.MAX_VALUE;
+ private OrcTail orcTail;
+ private KeyProvider keyProvider;
+ // TODO: We can generalize FileMetadata interface. Make OrcTail implement FileMetadata
+ // interface
+ // and remove this class altogether. Both footer caching and llap caching just needs
+ // OrcTail.
+ // For now keeping this around to avoid complex surgery
+ private FileMetadata fileMetadata;
+ private boolean useUTCTimestamp;
+ private boolean useProlepticGregorian;
+
+ public ReaderOptions(Configuration conf) {
+ this.conf = conf;
+ this.useProlepticGregorian = OrcConf.PROLEPTIC_GREGORIAN.getBoolean(conf);
+ }
+
+ public ReaderOptions filesystem(FileSystem fs) {
+ this.filesystem = fs;
+ return this;
+ }
+
+ public ReaderOptions maxLength(long val) {
+ maxLength = val;
+ return this;
+ }
+
+ public ReaderOptions orcTail(OrcTail tail) {
+ this.orcTail = tail;
+ return this;
+ }
+
+ /**
+ * Set the KeyProvider to override the default for getting keys.
+ *
+ * @param provider
+ * @return
+ */
+ public ReaderOptions setKeyProvider(KeyProvider provider) {
+ this.keyProvider = provider;
+ return this;
+ }
+
+ /**
+ * Should the reader convert dates and times to the proleptic Gregorian calendar?
+ *
+ * @param newValue should it use the proleptic Gregorian calendar?
+ * @return this
+ */
+ public ReaderOptions convertToProlepticGregorian(boolean newValue) {
+ this.useProlepticGregorian = newValue;
+ return this;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ public FileSystem getFilesystem() {
+ return filesystem;
+ }
+
+ public long getMaxLength() {
+ return maxLength;
+ }
+
+ public OrcTail getOrcTail() {
+ return orcTail;
+ }
+
+ public KeyProvider getKeyProvider() {
+ return keyProvider;
+ }
+
+ /** @deprecated Use {@link #orcTail(OrcTail)} instead. */
+ public ReaderOptions fileMetadata(final FileMetadata metadata) {
+ fileMetadata = metadata;
+ return this;
+ }
+
+ public FileMetadata getFileMetadata() {
+ return fileMetadata;
+ }
+
+ public ReaderOptions useUTCTimestamp(boolean value) {
+ useUTCTimestamp = value;
+ return this;
+ }
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimestamp;
+ }
+
+ public boolean getConvertToProlepticGregorian() {
+ return useProlepticGregorian;
+ }
+ }
+
+ public static ReaderOptions readerOptions(Configuration conf) {
+ return new ReaderOptions(conf);
+ }
+
+ public static Reader createReader(Path path, ReaderOptions options) throws IOException {
+ return new ReaderImpl(path, options);
+ }
+
+ /** WriterContext. */
+ public interface WriterContext {
+ Writer getWriter();
+ }
+
+ /** WriterCallback. */
+ public interface WriterCallback {
+ void preStripeWrite(WriterContext context) throws IOException;
+
+ void preFooterWrite(WriterContext context) throws IOException;
+ }
+
+ /** BloomFilterVersion. */
+ public enum BloomFilterVersion {
+ // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
+ // both old and new readers.
+ ORIGINAL("original"),
+ // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8.
+ // See ORC-101
+ UTF8("utf8");
+
+ private final String id;
+
+ BloomFilterVersion(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ public static BloomFilterVersion fromString(String s) {
+ for (BloomFilterVersion version : values()) {
+ if (version.id.equals(s)) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unknown BloomFilterVersion " + s);
+ }
+ }
+
+ /** ZstdCompressOptions. */
+ public static class ZstdCompressOptions {
+ private int compressionZstdLevel;
+ private int compressionZstdWindowLog;
+
+ public int getCompressionZstdLevel() {
+ return compressionZstdLevel;
+ }
+
+ public void setCompressionZstdLevel(int compressionZstdLevel) {
+ this.compressionZstdLevel = compressionZstdLevel;
+ }
+
+ public int getCompressionZstdWindowLog() {
+ return compressionZstdWindowLog;
+ }
+
+ public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
+ this.compressionZstdWindowLog = compressionZstdWindowLog;
+ }
+ }
+
+ /** Options for creating ORC file writers. */
+ public static class WriterOptions implements Cloneable {
+ private final Configuration configuration;
+ private FileSystem fileSystemValue = null;
+ private TypeDescription schema = null;
+ private long stripeSizeValue;
+ private long stripeRowCountValue;
+ private long blockSizeValue;
+ private boolean buildIndex;
+ private int rowIndexStrideValue;
+ private int bufferSizeValue;
+ private boolean enforceBufferSize = false;
+ private boolean blockPaddingValue;
+ private CompressionKind compressValue;
+ private MemoryManager memoryManagerValue;
+ private Version versionValue;
+ private WriterCallback callback;
+ private EncodingStrategy encodingStrategy;
+ private CompressionStrategy compressionStrategy;
+ private ZstdCompressOptions zstdCompressOptions;
+ private double paddingTolerance;
+ private String bloomFilterColumns;
+ private double bloomFilterFpp;
+ private BloomFilterVersion bloomFilterVersion;
+ private PhysicalWriter physicalWriter;
+ private WriterVersion writerVersion = CURRENT_WRITER;
+ private boolean useUTCTimestamp;
+ private boolean overwrite;
+ private boolean writeVariableLengthBlocks;
+ private HadoopShims shims;
+ private String directEncodingColumns;
+ private String encryption;
+ private String masks;
+ private KeyProvider provider;
+ private boolean useProlepticGregorian;
+ private Map keyOverrides = new HashMap<>();
+
+ protected WriterOptions(Properties tableProperties, Configuration conf) {
+ configuration = conf;
+ memoryManagerValue = getStaticMemoryManager(conf);
+ overwrite = OrcConf.OVERWRITE_OUTPUT_FILE.getBoolean(tableProperties, conf);
+ stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
+ stripeRowCountValue = OrcConf.STRIPE_ROW_COUNT.getLong(tableProperties, conf);
+ blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
+ buildIndex = OrcConf.ENABLE_INDEXES.getBoolean(tableProperties, conf);
+ rowIndexStrideValue = (int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
+ bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(tableProperties, conf);
+ blockPaddingValue = OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf);
+ compressValue =
+ CompressionKind.valueOf(
+ OrcConf.COMPRESS.getString(tableProperties, conf).toUpperCase());
+ enforceBufferSize =
+ OrcConf.ENFORCE_COMPRESSION_BUFFER_SIZE.getBoolean(tableProperties, conf);
+ String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties, conf);
+ versionValue = Version.byName(versionName);
+ String enString = OrcConf.ENCODING_STRATEGY.getString(tableProperties, conf);
+ encodingStrategy = EncodingStrategy.valueOf(enString);
+
+ String compString = OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
+ compressionStrategy = CompressionStrategy.valueOf(compString);
+
+ zstdCompressOptions = new ZstdCompressOptions();
+ zstdCompressOptions.setCompressionZstdLevel(
+ OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
+ zstdCompressOptions.setCompressionZstdWindowLog(
+ OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));
+
+ paddingTolerance = OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
+
+ bloomFilterColumns = OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties, conf);
+ bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties, conf);
+ bloomFilterVersion =
+ BloomFilterVersion.fromString(
+ OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties, conf));
+ shims = HadoopShimsFactory.get();
+ writeVariableLengthBlocks =
+ OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties, conf);
+ directEncodingColumns =
+ OrcConf.DIRECT_ENCODING_COLUMNS.getString(tableProperties, conf);
+ useProlepticGregorian = OrcConf.PROLEPTIC_GREGORIAN.getBoolean(conf);
+ }
+
+ /** @return a SHALLOW clone */
+ @Override
+ public WriterOptions clone() {
+ try {
+ return (WriterOptions) super.clone();
+ } catch (CloneNotSupportedException ex) {
+ throw new AssertionError("Expected super.clone() to work");
+ }
+ }
+
+ /**
+ * Provide the filesystem for the path, if the client has it available. If it is not
+ * provided, it will be found from the path.
+ */
+ public WriterOptions fileSystem(FileSystem value) {
+ fileSystemValue = value;
+ return this;
+ }
+
+ /**
+ * If the output file already exists, should it be overwritten? If it is not provided, write
+ * operation will fail if the file already exists.
+ */
+ public WriterOptions overwrite(boolean value) {
+ overwrite = value;
+ return this;
+ }
+
+ /**
+ * Set the stripe size for the file. The writer stores the contents of the stripe in memory
+ * until this memory limit is reached and the stripe is flushed to the HDFS file and the
+ * next stripe started.
+ */
+ public WriterOptions stripeSize(long value) {
+ stripeSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Set the file system block size for the file. For optimal performance, set the block size
+ * to be multiple factors of stripe size.
+ */
+ public WriterOptions blockSize(long value) {
+ blockSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Set the distance between entries in the row index. The minimum value is 1000 to prevent
+ * the index from overwhelming the data. If the stride is set to 0, no indexes will be
+ * included in the file.
+ */
+ public WriterOptions rowIndexStride(int value) {
+ rowIndexStrideValue = value;
+ return this;
+ }
+
+ /**
+ * The size of the memory buffers used for compressing and storing the stripe in memory.
+ * NOTE: ORC writer may choose to use smaller buffer size based on stripe size and number of
+ * columns for efficient stripe writing and memory utilization. To enforce writer to use the
+ * requested buffer size use enforceBufferSize().
+ */
+ public WriterOptions bufferSize(int value) {
+ bufferSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Enforce writer to use requested buffer size instead of estimating buffer size based on
+ * stripe size and number of columns. See bufferSize() method for more info. Default: false
+ */
+ public WriterOptions enforceBufferSize() {
+ enforceBufferSize = true;
+ return this;
+ }
+
+ /**
+ * Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks.
+ * Padding improves locality and thus the speed of reading, but costs space.
+ */
+ public WriterOptions blockPadding(boolean value) {
+ blockPaddingValue = value;
+ return this;
+ }
+
+ /** Sets the encoding strategy that is used to encode the data. */
+ public WriterOptions encodingStrategy(EncodingStrategy strategy) {
+ encodingStrategy = strategy;
+ return this;
+ }
+
+ /** Sets the tolerance for block padding as a percentage of stripe size. */
+ public WriterOptions paddingTolerance(double value) {
+ paddingTolerance = value;
+ return this;
+ }
+
+ /** Comma separated values of column names for which bloom filter is to be created. */
+ public WriterOptions bloomFilterColumns(String columns) {
+ bloomFilterColumns = columns;
+ return this;
+ }
+
+ /**
+ * Specify the false positive probability for bloom filter.
+ *
+ * @param fpp - false positive probability
+ * @return this
+ */
+ public WriterOptions bloomFilterFpp(double fpp) {
+ bloomFilterFpp = fpp;
+ return this;
+ }
+
+ /** Sets the generic compression that is used to compress the data. */
+ public WriterOptions compress(CompressionKind value) {
+ compressValue = value;
+ return this;
+ }
+
+ /**
+ * Set the schema for the file. This is a required parameter.
+ *
+ * @param schema the schema for the file.
+ * @return this
+ */
+ public WriterOptions setSchema(TypeDescription schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ /** Sets the version of the file that will be written. */
+ public WriterOptions version(Version value) {
+ versionValue = value;
+ return this;
+ }
+
+ /**
+ * Add a listener for when the stripe and file are about to be closed.
+ *
+ * @param callback the object to be called when the stripe is closed
+ * @return this
+ */
+ public WriterOptions callback(WriterCallback callback) {
+ this.callback = callback;
+ return this;
+ }
+
+ /** Set the version of the bloom filters to write. */
+ public WriterOptions bloomFilterVersion(BloomFilterVersion version) {
+ this.bloomFilterVersion = version;
+ return this;
+ }
+
+ /**
+ * Change the physical writer of the ORC file.
+ *
+ * SHOULD ONLY BE USED BY LLAP.
+ *
+ * @param writer the writer to control the layout and persistence
+ * @return this
+ */
+ public WriterOptions physicalWriter(PhysicalWriter writer) {
+ this.physicalWriter = writer;
+ return this;
+ }
+
+ /** A public option to set the memory manager. */
+ public WriterOptions memory(MemoryManager value) {
+ memoryManagerValue = value;
+ return this;
+ }
+
+ /**
+ * Should the ORC file writer use HDFS variable length blocks, if they are available?
+ *
+ * @param value the new value
+ * @return this
+ */
+ public WriterOptions writeVariableLengthBlocks(boolean value) {
+ writeVariableLengthBlocks = value;
+ return this;
+ }
+
+ /**
+ * Set the HadoopShims to use. This is only for testing.
+ *
+ * @param value the new value
+ * @return this
+ */
+ public WriterOptions setShims(HadoopShims value) {
+ this.shims = value;
+ return this;
+ }
+
+ /**
+ * Manually set the writer version. This is an internal API.
+ *
+ * @param version the version to write
+ * @return this
+ */
+ protected WriterOptions writerVersion(WriterVersion version) {
+ if (version == WriterVersion.FUTURE) {
+ throw new IllegalArgumentException("Can't write a future version.");
+ }
+ this.writerVersion = version;
+ return this;
+ }
+
+ /**
+ * Manually set the time zone for the writer to utc. If not defined, system time zone is
+ * assumed.
+ */
+ public WriterOptions useUTCTimestamp(boolean value) {
+ useUTCTimestamp = value;
+ return this;
+ }
+
+ /**
+ * Set the comma-separated list of columns that should be direct encoded.
+ *
+ * @param value the value to set
+ * @return this
+ */
+ public WriterOptions directEncodingColumns(String value) {
+ directEncodingColumns = value;
+ return this;
+ }
+
+ /**
+ * Encrypt a set of columns with a key.
+ *
+ *
Format of the string is a key-list.
+ *
+ *
+ * - key-list = key (';' key-list)?
+ *
- key = key-name ':' field-list
+ *
- field-list = field-name ( ',' field-list )?
+ *
- field-name = number | field-part ('.' field-name)?
+ *
- field-part = quoted string | simple name
+ *
+ *
+ * @param value a key-list of which columns to encrypt
+ * @return this
+ */
+ public WriterOptions encrypt(String value) {
+ encryption = value;
+ return this;
+ }
+
+ /**
+ * Set the masks for the unencrypted data.
+ *
+ * Format of the string is a mask-list.
+ *
+ *
+ * - mask-list = mask (';' mask-list)?
+ *
- mask = mask-name (',' parameter)* ':' field-list
+ *
- field-list = field-name ( ',' field-list )?
+ *
- field-name = number | field-part ('.' field-name)?
+ *
- field-part = quoted string | simple name
+ *
+ *
+ * @param value a list of the masks and column names
+ * @return this
+ */
+ public WriterOptions masks(String value) {
+ masks = value;
+ return this;
+ }
+
+ /**
+ * For users that need to override the current version of a key, this method allows them to
+ * define the version and algorithm for a given key.
+ *
+ * This will mostly be used for ORC file merging where the writer has to use the same
+ * version of the key that the original files used.
+ *
+ * @param keyName the key name
+ * @param version the version of the key to use
+ * @param algorithm the algorithm for the given key version
+ * @return this
+ */
+ public WriterOptions setKeyVersion(
+ String keyName, int version, EncryptionAlgorithm algorithm) {
+ HadoopShims.KeyMetadata meta = new HadoopShims.KeyMetadata(keyName, version, algorithm);
+ keyOverrides.put(keyName, meta);
+ return this;
+ }
+
+ /**
+ * Set the key provider for column encryption.
+ *
+ * @param provider the object that holds the master secrets
+ * @return this
+ */
+ public WriterOptions setKeyProvider(KeyProvider provider) {
+ this.provider = provider;
+ return this;
+ }
+
+ /**
+ * Should the writer use the proleptic Gregorian calendar for times and dates.
+ *
+ * @param newValue true if we should use the proleptic calendar
+ * @return this
+ */
+ public WriterOptions setProlepticGregorian(boolean newValue) {
+ this.useProlepticGregorian = newValue;
+ return this;
+ }
+
+ public KeyProvider getKeyProvider() {
+ return provider;
+ }
+
+ public boolean getBlockPadding() {
+ return blockPaddingValue;
+ }
+
+ public long getBlockSize() {
+ return blockSizeValue;
+ }
+
+ public String getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ public boolean getOverwrite() {
+ return overwrite;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystemValue;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ public long getStripeSize() {
+ return stripeSizeValue;
+ }
+
+ public long getStripeRowCountValue() {
+ return stripeRowCountValue;
+ }
+
+ public CompressionKind getCompress() {
+ return compressValue;
+ }
+
+ public WriterCallback getCallback() {
+ return callback;
+ }
+
+ public Version getVersion() {
+ return versionValue;
+ }
+
+ public MemoryManager getMemoryManager() {
+ return memoryManagerValue;
+ }
+
+ public int getBufferSize() {
+ return bufferSizeValue;
+ }
+
+ public boolean isEnforceBufferSize() {
+ return enforceBufferSize;
+ }
+
+ public int getRowIndexStride() {
+ return rowIndexStrideValue;
+ }
+
+ public boolean isBuildIndex() {
+ return buildIndex;
+ }
+
+ public CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ public EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ public ZstdCompressOptions getZstdCompressOptions() {
+ return zstdCompressOptions;
+ }
+
+ public double getPaddingTolerance() {
+ return paddingTolerance;
+ }
+
+ public double getBloomFilterFpp() {
+ return bloomFilterFpp;
+ }
+
+ public BloomFilterVersion getBloomFilterVersion() {
+ return bloomFilterVersion;
+ }
+
+ public PhysicalWriter getPhysicalWriter() {
+ return physicalWriter;
+ }
+
+ public WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
+
+ public boolean getWriteVariableLengthBlocks() {
+ return writeVariableLengthBlocks;
+ }
+
+ public HadoopShims getHadoopShims() {
+ return shims;
+ }
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimestamp;
+ }
+
+ public String getDirectEncodingColumns() {
+ return directEncodingColumns;
+ }
+
+ public String getEncryption() {
+ return encryption;
+ }
+
+ public String getMasks() {
+ return masks;
+ }
+
+ public Map getKeyOverrides() {
+ return keyOverrides;
+ }
+
+ public boolean getProlepticGregorian() {
+ return useProlepticGregorian;
+ }
+ }
+
+ /**
+ * Create a set of writer options based on a configuration.
+ *
+ * @param conf the configuration to use for values
+ * @return A WriterOptions object that can be modified
+ */
+ public static WriterOptions writerOptions(Configuration conf) {
+ return new WriterOptions(null, conf);
+ }
+
+ /**
+ * Create a set of write options based on a set of table properties and configuration.
+ *
+ * @param tableProperties the properties of the table
+ * @param conf the configuration of the query
+ * @return a WriterOptions object that can be modified
+ */
+ public static WriterOptions writerOptions(Properties tableProperties, Configuration conf) {
+ return new WriterOptions(tableProperties, conf);
+ }
+
+ private static MemoryManager memoryManager = null;
+
+ private static synchronized MemoryManager getStaticMemoryManager(Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManagerImpl(conf);
+ }
+ return memoryManager;
+ }
+
+ /**
+ * Create an ORC file writer. This is the public interface for creating writers going forward
+ * and new options will only be added to this method.
+ *
+ * @param path filename to write to
+ * @param opts the options
+ * @return a new ORC file writer
+ * @throws IOException
+ */
+ public static Writer createWriter(Path path, WriterOptions opts) throws IOException {
+ FileSystem fs =
+ opts.getFileSystem() == null
+ ? path.getFileSystem(opts.getConfiguration())
+ : opts.getFileSystem();
+ switch (opts.getVersion()) {
+ case V_0_11:
+ case V_0_12:
+ return new WriterImpl(fs, path, opts);
+ case UNSTABLE_PRE_2_0:
+ return new WriterImplV2(fs, path, opts);
+ default:
+ throw new IllegalArgumentException("Unknown version " + opts.getVersion());
+ }
+ }
+
+ /**
+ * Do we understand the version in the reader?
+ *
+ * @param path the path of the file
+ * @param reader the ORC file reader
+ * @return is the version understood by this writer?
+ */
+ static boolean understandFormat(Path path, Reader reader) {
+ if (reader.getFileVersion() == Version.FUTURE) {
+ LOG.info("Can't merge {} because it has a future version.", path);
+ return false;
+ }
+ if (reader.getWriterVersion() == WriterVersion.FUTURE) {
+ LOG.info("Can't merge {} because it has a future writerVersion.", path);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean sameKeys(EncryptionKey[] first, EncryptionKey[] next) {
+ if (first.length != next.length) {
+ return false;
+ }
+ for (int k = 0; k < first.length; ++k) {
+ if (!first[k].getKeyName().equals(next[k].getKeyName())
+ || first[k].getKeyVersion() != next[k].getKeyVersion()
+ || first[k].getAlgorithm() != next[k].getAlgorithm()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean sameMasks(DataMaskDescription[] first, DataMaskDescription[] next) {
+ if (first.length != next.length) {
+ return false;
+ }
+ for (int k = 0; k < first.length; ++k) {
+ if (!first[k].getName().equals(next[k].getName())) {
+ return false;
+ }
+ String[] firstParam = first[k].getParameters();
+ String[] nextParam = next[k].getParameters();
+ if (firstParam.length != nextParam.length) {
+ return false;
+ }
+ for (int p = 0; p < firstParam.length; ++p) {
+ if (!firstParam[p].equals(nextParam[p])) {
+ return false;
+ }
+ }
+ TypeDescription[] firstRoots = first[k].getColumns();
+ TypeDescription[] nextRoots = next[k].getColumns();
+ if (firstRoots.length != nextRoots.length) {
+ return false;
+ }
+ for (int r = 0; r < firstRoots.length; ++r) {
+ if (firstRoots[r].getId() != nextRoots[r].getId()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static boolean sameVariants(EncryptionVariant[] first, EncryptionVariant[] next) {
+ if (first.length != next.length) {
+ return false;
+ }
+ for (int k = 0; k < first.length; ++k) {
+ if ((first[k].getKeyDescription() == null) != (next[k].getKeyDescription() == null)
+ || !first[k].getKeyDescription()
+ .getKeyName()
+ .equals(next[k].getKeyDescription().getKeyName())
+ || first[k].getRoot().getId() != next[k].getRoot().getId()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Is the new reader compatible with the file that is being written?
+ *
+ * @param firstReader the first reader that others must match
+ * @param userMetadata the user metadata
+ * @param path the new path name for warning messages
+ * @param reader the new reader
+ * @return is the reader compatible with the previous ones?
+ */
+ static boolean readerIsCompatible(
+ Reader firstReader, Map userMetadata, Path path, Reader reader) {
+ // now we have to check compatibility
+ TypeDescription schema = firstReader.getSchema();
+ if (!reader.getSchema().equals(schema)) {
+ LOG.info(
+ "Can't merge {} because of different schemas {} vs {}",
+ path,
+ reader.getSchema(),
+ schema);
+ return false;
+ }
+ CompressionKind compression = firstReader.getCompressionKind();
+ if (reader.getCompressionKind() != compression) {
+ LOG.info(
+ "Can't merge {} because of different compression {} vs {}",
+ path,
+ reader.getCompressionKind(),
+ compression);
+ return false;
+ }
+ Version fileVersion = firstReader.getFileVersion();
+ if (reader.getFileVersion() != fileVersion) {
+ LOG.info(
+ "Can't merge {} because of different file versions {} vs {}",
+ path,
+ reader.getFileVersion(),
+ fileVersion);
+ return false;
+ }
+ WriterVersion writerVersion = firstReader.getWriterVersion();
+ if (reader.getWriterVersion() != writerVersion) {
+ LOG.info(
+ "Can't merge {} because of different writer versions {} vs {}",
+ path,
+ reader.getFileVersion(),
+ fileVersion);
+ return false;
+ }
+ int rowIndexStride = firstReader.getRowIndexStride();
+ if (reader.getRowIndexStride() != rowIndexStride) {
+ LOG.info(
+ "Can't merge {} because of different row index strides {} vs {}",
+ path,
+ reader.getRowIndexStride(),
+ rowIndexStride);
+ return false;
+ }
+ for (String key : reader.getMetadataKeys()) {
+ ByteBuffer currentValue = userMetadata.get(key);
+ if (currentValue != null) {
+ ByteBuffer newValue = reader.getMetadataValue(key);
+ if (!newValue.equals(currentValue)) {
+ LOG.info("Can't merge {} because of different user metadata {}", path, key);
+ return false;
+ }
+ }
+ }
+ if (!sameKeys(firstReader.getColumnEncryptionKeys(), reader.getColumnEncryptionKeys())) {
+ LOG.info("Can't merge {} because it has different encryption keys", path);
+ return false;
+ }
+ if (!sameMasks(firstReader.getDataMasks(), reader.getDataMasks())) {
+ LOG.info("Can't merge {} because it has different encryption masks", path);
+ return false;
+ }
+ if (!sameVariants(firstReader.getEncryptionVariants(), reader.getEncryptionVariants())) {
+ LOG.info("Can't merge {} because it has different encryption variants", path);
+ return false;
+ }
+ if (firstReader.writerUsedProlepticGregorian() != reader.writerUsedProlepticGregorian()) {
+ LOG.info("Can't merge {} because it uses a different calendar", path);
+ return false;
+ }
+ return true;
+ }
+
+ static void mergeMetadata(Map metadata, Reader reader) {
+ for (String key : reader.getMetadataKeys()) {
+ metadata.put(key, reader.getMetadataValue(key));
+ }
+ }
+
+ /**
+ * Merges multiple ORC files that all have the same schema to produce a single ORC file. The
+ * merge will reject files that aren't compatible with the merged file so the output list may be
+ * shorter than the input list. The stripes are copied as serialized byte buffers. The user
+ * metadata are merged and files that disagree on the value associated with a key will be
+ * rejected.
+ *
+ * @param outputPath the output file
+ * @param options the options for writing with although the options related to the input files'
+ * encodings are overridden
+ * @param inputFiles the list of files to merge
+ * @return the list of files that were successfully merged
+ * @throws IOException
+ */
+ public static List mergeFiles(
+ Path outputPath, WriterOptions options, List inputFiles) throws IOException {
+ Writer output = null;
+ final Configuration conf = options.getConfiguration();
+ KeyProvider keyProvider = options.getKeyProvider();
+ try {
+ byte[] buffer = new byte[0];
+ Reader firstFile = null;
+ List result = new ArrayList<>(inputFiles.size());
+ Map userMetadata = new HashMap<>();
+ int bufferSize = 0;
+
+ for (Path input : inputFiles) {
+ FileSystem fs = input.getFileSystem(conf);
+ Reader reader =
+ createReader(
+ input,
+ readerOptions(options.getConfiguration())
+ .filesystem(fs)
+ .setKeyProvider(keyProvider));
+
+ if (!understandFormat(input, reader)) {
+ continue;
+ } else if (firstFile == null) {
+ // if this is the first file that we are including, grab the values
+ firstFile = reader;
+ bufferSize = reader.getCompressionSize();
+ CompressionKind compression = reader.getCompressionKind();
+ options.bufferSize(bufferSize)
+ .version(reader.getFileVersion())
+ .writerVersion(reader.getWriterVersion())
+ .compress(compression)
+ .rowIndexStride(reader.getRowIndexStride())
+ .setSchema(reader.getSchema());
+ if (compression != CompressionKind.NONE) {
+ options.enforceBufferSize().bufferSize(bufferSize);
+ }
+ mergeMetadata(userMetadata, reader);
+ // ensure that the merged file uses the same key versions
+ for (EncryptionKey key : reader.getColumnEncryptionKeys()) {
+ options.setKeyVersion(
+ key.getKeyName(), key.getKeyVersion(), key.getAlgorithm());
+ }
+ output = createWriter(outputPath, options);
+ } else if (!readerIsCompatible(firstFile, userMetadata, input, reader)) {
+ continue;
+ } else {
+ mergeMetadata(userMetadata, reader);
+ if (bufferSize < reader.getCompressionSize()) {
+ bufferSize = reader.getCompressionSize();
+ ((WriterInternal) output).increaseCompressionSize(bufferSize);
+ }
+ }
+ EncryptionVariant[] variants = reader.getEncryptionVariants();
+ List[] completeList = new List[variants.length + 1];
+ for (int v = 0; v < variants.length; ++v) {
+ completeList[v] = reader.getVariantStripeStatistics(variants[v]);
+ }
+ completeList[completeList.length - 1] = reader.getVariantStripeStatistics(null);
+ StripeStatistics[] stripeStats = new StripeStatistics[completeList.length];
+ try (FSDataInputStream inputStream = ((ReaderImpl) reader).takeFile()) {
+ result.add(input);
+
+ for (StripeInformation stripe : reader.getStripes()) {
+ int length = (int) stripe.getLength();
+ if (buffer.length < length) {
+ buffer = new byte[length];
+ }
+ long offset = stripe.getOffset();
+ inputStream.readFully(offset, buffer, 0, length);
+ int stripeId = (int) stripe.getStripeId();
+ for (int v = 0; v < completeList.length; ++v) {
+ stripeStats[v] = completeList[v].get(stripeId);
+ }
+ output.appendStripe(buffer, 0, length, stripe, stripeStats);
+ }
+ }
+ }
+ if (output != null) {
+ for (Map.Entry entry : userMetadata.entrySet()) {
+ output.addUserMetadata(entry.getKey(), entry.getValue());
+ }
+ output.close();
+ }
+ return result;
+ } catch (Throwable t) {
+ if (output != null) {
+ try {
+ output.close();
+ } catch (Throwable ignore) {
+ // PASS
+ }
+ try {
+ FileSystem fs =
+ options.getFileSystem() == null
+ ? outputPath.getFileSystem(conf)
+ : options.getFileSystem();
+ fs.delete(outputPath, false);
+ } catch (Throwable ignore) {
+ // PASS
+ }
+ }
+ throw new IOException("Problem merging files into " + outputPath, t);
+ }
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java b/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 000000000000..f887e860a830
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionVariant;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.writer.StreamOptions;
+import org.apache.orc.impl.writer.WriterEncryptionKey;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/** A orc PhysicalFsWriter. */
+public class PhysicalFsWriter implements PhysicalWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ private FSDataOutputStream rawWriter;
+ private final DirectStream rawStream;
+
+ // the compressed metadata information outStream
+ private OutStream compressStream;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream codedCompressStream;
+
+ private Path path;
+ private final HadoopShims shims;
+ private final long blockSize;
+ private final int maxPadding;
+ private final StreamOptions compress;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean addBlockPadding;
+ private final boolean writeVariableLengthBlocks;
+ private final VariantTracker unencrypted;
+
+ private long headerLength;
+ private long stripeStart;
+ // The position of the last time we wrote a short block, which becomes the
+ // natural blocks
+ private long blockOffset;
+ private int metadataLength;
+ private int stripeStatisticsLength = 0;
+ private int footerLength;
+ private int stripeNumber = 0;
+
+ private final Map variants = new TreeMap<>();
+
+ public PhysicalFsWriter(FileSystem fs, Path path, OrcFile.WriterOptions opts)
+ throws IOException {
+ this(fs, path, opts, new WriterEncryptionVariant[0]);
+ }
+
+ public PhysicalFsWriter(
+ FileSystem fs,
+ Path path,
+ OrcFile.WriterOptions opts,
+ WriterEncryptionVariant[] encryption)
+ throws IOException {
+ this(
+ fs.create(
+ path,
+ opts.getOverwrite(),
+ HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(path),
+ opts.getBlockSize()),
+ opts,
+ encryption);
+ this.path = path;
+ LOG.info(
+ "ORC writer created for path: {} with stripeSize: {} blockSize: {}"
+ + " compression: {}",
+ path,
+ opts.getStripeSize(),
+ blockSize,
+ compress);
+ }
+
+ public PhysicalFsWriter(
+ FSDataOutputStream outputStream,
+ OrcFile.WriterOptions opts,
+ WriterEncryptionVariant[] encryption)
+ throws IOException {
+ this.rawWriter = outputStream;
+ long defaultStripeSize = opts.getStripeSize();
+ this.addBlockPadding = opts.getBlockPadding();
+ if (opts.isEnforceBufferSize()) {
+ this.compress = new StreamOptions(opts.getBufferSize());
+ } else {
+ this.compress =
+ new StreamOptions(
+ WriterImpl.getEstimatedBufferSize(
+ defaultStripeSize,
+ opts.getSchema().getMaximumId() + 1,
+ opts.getBufferSize()));
+ }
+ CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
+ if (codec != null) {
+ CompressionCodec.Options tempOptions = codec.getDefaultOptions();
+ if (codec instanceof ZstdCodec
+ && codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions) {
+ ZstdCodec.ZstdOptions options = (ZstdCodec.ZstdOptions) codec.getDefaultOptions();
+ OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions();
+ if (zstdCompressOptions != null) {
+ options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
+ options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
+ }
+ }
+ compress.withCodec(codec, tempOptions);
+ }
+ this.compressionStrategy = opts.getCompressionStrategy();
+ this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
+ this.blockSize = opts.getBlockSize();
+ blockOffset = 0;
+ unencrypted = new VariantTracker(opts.getSchema(), compress);
+ writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+ shims = opts.getHadoopShims();
+ rawStream = new DirectStream(rawWriter);
+ compressStream = new OutStream("stripe footer", compress, rawStream);
+ codedCompressStream = CodedOutputStream.newInstance(compressStream);
+ for (WriterEncryptionVariant variant : encryption) {
+ WriterEncryptionKey key = variant.getKeyDescription();
+ StreamOptions encryptOptions =
+ new StreamOptions(unencrypted.options)
+ .withEncryption(key.getAlgorithm(), variant.getFileFooterKey());
+ variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions));
+ }
+ }
+
+ /**
+ * Record the information about each column encryption variant. The unencrypted data and each
+ * encrypted column root are variants.
+ */
+ protected static class VariantTracker {
+ // the streams that make up the current stripe
+ protected final Map streams = new TreeMap<>();
+ private final int rootColumn;
+ private final int lastColumn;
+ protected final StreamOptions options;
+ // a list for each column covered by this variant
+ // the elements in the list correspond to each stripe in the file
+ protected final List[] stripeStats;
+ protected final List stripeStatsStreams = new ArrayList<>();
+ protected final OrcProto.ColumnStatistics[] fileStats;
+
+ VariantTracker(TypeDescription schema, StreamOptions options) {
+ rootColumn = schema.getId();
+ lastColumn = schema.getMaximumId();
+ this.options = options;
+ stripeStats = new List[schema.getMaximumId() - schema.getId() + 1];
+ for (int i = 0; i < stripeStats.length; ++i) {
+ stripeStats[i] = new ArrayList<>();
+ }
+ fileStats = new OrcProto.ColumnStatistics[stripeStats.length];
+ }
+
+ public BufferedStream createStream(StreamName name) {
+ BufferedStream result = new BufferedStream();
+ streams.put(name, result);
+ return result;
+ }
+
+ /**
+ * Place the streams in the appropriate area while updating the sizes with the number of
+ * bytes in the area.
+ *
+ * @param area the area to write
+ * @param sizes the sizes of the areas
+ * @return the list of stream descriptions to add
+ */
+ public List placeStreams(StreamName.Area area, SizeCounters sizes) {
+ List result = new ArrayList<>(streams.size());
+ for (Map.Entry stream : streams.entrySet()) {
+ StreamName name = stream.getKey();
+ BufferedStream bytes = stream.getValue();
+ if (name.getArea() == area && !bytes.isSuppressed) {
+ OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder();
+ long size = bytes.getOutputSize();
+ if (area == StreamName.Area.INDEX) {
+ sizes.index += size;
+ } else {
+ sizes.data += size;
+ }
+ builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size);
+ result.add(builder.build());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Write the streams in the appropriate area.
+ *
+ * @param area the area to write
+ * @param raw the raw stream to write to
+ */
+ public void writeStreams(StreamName.Area area, FSDataOutputStream raw) throws IOException {
+ for (Map.Entry stream : streams.entrySet()) {
+ if (stream.getKey().getArea() == area) {
+ stream.getValue().spillToDiskAndClear(raw);
+ }
+ }
+ }
+
+ /**
+ * Computed the size of the given column on disk for this stripe. It excludes the index
+ * streams.
+ *
+ * @param column a column id
+ * @return the total number of bytes
+ */
+ public long getFileBytes(int column) {
+ long result = 0;
+ if (column >= rootColumn && column <= lastColumn) {
+ for (Map.Entry entry : streams.entrySet()) {
+ StreamName name = entry.getKey();
+ if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) {
+ result += entry.getValue().getOutputSize();
+ }
+ }
+ }
+ return result;
+ }
+ }
+
+ VariantTracker getVariant(EncryptionVariant column) {
+ if (column == null) {
+ return unencrypted;
+ }
+ return variants.get(column);
+ }
+
+ /**
+ * Get the number of bytes for a file in a given column by finding all the streams (not
+ * suppressed) for a given column and returning the sum of their sizes. excludes index
+ *
+ * @param column column from which to get file size
+ * @return number of bytes for the given column
+ */
+ @Override
+ public long getFileBytes(int column, WriterEncryptionVariant variant) {
+ return getVariant(variant).getFileBytes(column);
+ }
+
+ @Override
+ public StreamOptions getStreamOptions() {
+ return unencrypted.options;
+ }
+
+ private static final byte[] ZEROS = new byte[64 * 1024];
+
+ private static void writeZeros(OutputStream output, long remaining) throws IOException {
+ while (remaining > 0) {
+ long size = Math.min(ZEROS.length, remaining);
+ output.write(ZEROS, 0, (int) size);
+ remaining -= size;
+ }
+ }
+
+ /**
+ * Do any required shortening of the HDFS block or padding to avoid stradling HDFS blocks. This
+ * is called before writing the current stripe.
+ *
+ * @param stripeSize the number of bytes in the current stripe
+ */
+ private void padStripe(long stripeSize) throws IOException {
+ this.stripeStart = rawWriter.getPos();
+ long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
+ // We only have options if this isn't the first stripe in the block
+ if (previousBytesInBlock > 0) {
+ if (previousBytesInBlock + stripeSize >= blockSize) {
+ // Try making a short block
+ if (writeVariableLengthBlocks && shims.endVariableLengthBlock(rawWriter)) {
+ blockOffset = stripeStart;
+ } else if (addBlockPadding) {
+ // if we cross the block boundary, figure out what we should do
+ long padding = blockSize - previousBytesInBlock;
+ if (padding <= maxPadding) {
+ writeZeros(rawWriter, padding);
+ stripeStart += padding;
+ }
+ }
+ }
+ }
+ }
+
+ /** An output receiver that writes the ByteBuffers to the output stream as they are received. */
+ private static class DirectStream implements OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+
+ @Override
+ public void suppress() {
+ throw new UnsupportedOperationException("Can't suppress direct stream");
+ }
+ }
+
+ private void writeStripeFooter(
+ OrcProto.StripeFooter footer,
+ SizeCounters sizes,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ footer.writeTo(codedCompressStream);
+ codedCompressStream.flush();
+ compressStream.flush();
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total());
+ }
+
+ /**
+ * Write the saved encrypted stripe statistic in a variant out to the file. The streams that are
+ * written are added to the tracker.stripeStatsStreams.
+ *
+ * @param output the file we are writing to
+ * @param stripeNumber the number of stripes in the file
+ * @param tracker the variant to write out
+ */
+ static void writeEncryptedStripeStatistics(
+ DirectStream output, int stripeNumber, VariantTracker tracker) throws IOException {
+ StreamOptions options = new StreamOptions(tracker.options);
+ tracker.stripeStatsStreams.clear();
+ for (int col = tracker.rootColumn;
+ col < tracker.rootColumn + tracker.stripeStats.length;
+ ++col) {
+ options.modifyIv(
+ CryptoUtils.modifyIvForStream(
+ col, OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1));
+ OutStream stream = new OutStream("stripe stats for " + col, options, output);
+ OrcProto.ColumnarStripeStatistics stats =
+ OrcProto.ColumnarStripeStatistics.newBuilder()
+ .addAllColStats(tracker.stripeStats[col - tracker.rootColumn])
+ .build();
+ long start = output.output.getPos();
+ stats.writeTo(stream);
+ stream.flush();
+ OrcProto.Stream description =
+ OrcProto.Stream.newBuilder()
+ .setColumn(col)
+ .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS)
+ .setLength(output.output.getPos() - start)
+ .build();
+ tracker.stripeStatsStreams.add(description);
+ }
+ }
+
+ /**
+ * Merge the saved unencrypted stripe statistics into the Metadata section of the footer.
+ *
+ * @param builder the Metadata section of the file
+ * @param stripeCount the number of stripes in the file
+ * @param stats the stripe statistics
+ */
+ static void setUnencryptedStripeStatistics(
+ OrcProto.Metadata.Builder builder,
+ int stripeCount,
+ List[] stats) {
+ // Make the unencrypted stripe stats into lists of StripeStatistics.
+ builder.clearStripeStats();
+ for (int s = 0; s < stripeCount; ++s) {
+ OrcProto.StripeStatistics.Builder stripeStats = OrcProto.StripeStatistics.newBuilder();
+ for (List col : stats) {
+ stripeStats.addColStats(col.get(s));
+ }
+ builder.addStripeStats(stripeStats.build());
+ }
+ }
+
+ static void setEncryptionStatistics(
+ OrcProto.Encryption.Builder encryption,
+ int stripeNumber,
+ Collection variants)
+ throws IOException {
+ int v = 0;
+ for (VariantTracker variant : variants) {
+ OrcProto.EncryptionVariant.Builder variantBuilder = encryption.getVariantsBuilder(v++);
+
+ // Add the stripe statistics streams to the variant description.
+ variantBuilder.clearStripeStatistics();
+ variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams);
+
+ // Serialize and encrypt the file statistics.
+ OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder();
+ for (OrcProto.ColumnStatistics col : variant.fileStats) {
+ file.addColumn(col);
+ }
+ StreamOptions options = new StreamOptions(variant.options);
+ options.modifyIv(
+ CryptoUtils.modifyIvForStream(
+ variant.rootColumn,
+ OrcProto.Stream.Kind.FILE_STATISTICS,
+ stripeNumber + 1));
+ BufferedStream buffer = new BufferedStream();
+ OutStream stream = new OutStream("stats for " + variant, options, buffer);
+ file.build().writeTo(stream);
+ stream.flush();
+ variantBuilder.setFileStatistics(buffer.getBytes());
+ }
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+ long stripeStatisticsStart = rawWriter.getPos();
+ for (VariantTracker variant : variants.values()) {
+ writeEncryptedStripeStatistics(rawStream, stripeNumber, variant);
+ }
+ setUnencryptedStripeStatistics(builder, stripeNumber, unencrypted.stripeStats);
+ long metadataStart = rawWriter.getPos();
+ builder.build().writeTo(codedCompressStream);
+ codedCompressStream.flush();
+ compressStream.flush();
+ this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart);
+ this.metadataLength = (int) (rawWriter.getPos() - metadataStart);
+ }
+
+ static void addUnencryptedStatistics(
+ OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] stats) {
+ for (OrcProto.ColumnStatistics stat : stats) {
+ builder.addStatistics(stat);
+ }
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+ if (variants.size() > 0) {
+ OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder();
+ setEncryptionStatistics(encryption, stripeNumber, variants.values());
+ }
+ addUnencryptedStatistics(builder, unencrypted.fileStats);
+ long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosn = rawWriter.getPos();
+ OrcProto.Footer footer = builder.build();
+ footer.writeTo(codedCompressStream);
+ codedCompressStream.flush();
+ compressStream.flush();
+ this.footerLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+ if (variants.size() > 0) {
+ builder.setStripeStatisticsLength(stripeStatisticsLength);
+ }
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ rawWriter.writeByte((int) length);
+ return rawWriter.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // We don't use the codec directly but do give it out codec in getCompressionCodec;
+ // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that
+ // would get rid of this pattern require cross-project interface changes, so just return the
+ // codec for now.
+ CompressionCodec codec = compress.getCodec();
+ if (codec != null) {
+ OrcCodecPool.returnCodec(codec.getKind(), codec);
+ }
+ compress.withCodec(null, null);
+ rawWriter.close();
+ rawWriter = null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ rawWriter.hflush();
+ }
+
+ @Override
+ public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ long start = rawWriter.getPos();
+ int length = buffer.remaining();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace && addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info("Padding ORC by {} bytes while merging", availBlockSpace);
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+ rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
+ dirEntry.setOffset(start);
+ stripeNumber += 1;
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered. The TreeWriters
+ * write to the outStream and the codec compresses the data as buffers fill up and stores them
+ * in the output list. When the stripe is being written, the whole stream is written to the
+ * file.
+ */
+ static final class BufferedStream implements OutputReceiver {
+ private boolean isSuppressed = false;
+ private final List output = new ArrayList<>();
+
+ @Override
+ public void output(ByteBuffer buffer) {
+ if (!isSuppressed) {
+ output.add(buffer);
+ }
+ }
+
+ @Override
+ public void suppress() {
+ isSuppressed = true;
+ output.clear();
+ }
+
+ /**
+ * Write any saved buffers to the OutputStream if needed, and clears all the buffers.
+ *
+ * @return true if the stream was written
+ */
+ boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
+ if (!isSuppressed) {
+ for (ByteBuffer buffer : output) {
+ raw.write(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ output.clear();
+ return true;
+ }
+ isSuppressed = false;
+ return false;
+ }
+
+ /**
+ * Get the buffer as a protobuf ByteString and clears the BufferedStream.
+ *
+ * @return the bytes
+ */
+ ByteString getBytes() {
+ int len = output.size();
+ if (len == 0) {
+ return ByteString.EMPTY;
+ } else {
+ ByteString result = ByteString.copyFrom(output.get(0));
+ for (int i = 1; i < output.size(); ++i) {
+ result = result.concat(ByteString.copyFrom(output.get(i)));
+ }
+ output.clear();
+ return result;
+ }
+ }
+
+ /**
+ * Get the stream as a ByteBuffer and clear it.
+ *
+ * @return a single ByteBuffer with the contents of the stream
+ */
+ ByteBuffer getByteBuffer() {
+ ByteBuffer result;
+ if (output.size() == 1) {
+ result = output.get(0);
+ } else {
+ result = ByteBuffer.allocate((int) getOutputSize());
+ for (ByteBuffer buffer : output) {
+ result.put(buffer);
+ }
+ output.clear();
+ result.flip();
+ }
+ return result;
+ }
+
+ /**
+ * Get the number of bytes that will be written to the output.
+ *
+ * Assumes the stream writing into this receiver has already been flushed.
+ *
+ * @return number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer : output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+ }
+
+ static class SizeCounters {
+ long index = 0;
+ long data = 0;
+
+ long total() {
+ return index + data;
+ }
+ }
+
+ void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, SizeCounters sizes)
+ throws IOException {
+ footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, sizes));
+ final long unencryptedIndexSize = sizes.index;
+ int v = 0;
+ for (VariantTracker variant : variants.values()) {
+ OrcProto.StripeEncryptionVariant.Builder builder =
+ footerBuilder.getEncryptionBuilder(v++);
+ builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, sizes));
+ }
+ if (sizes.index != unencryptedIndexSize) {
+ // add a placeholder that covers the hole where the encrypted indexes are
+ footerBuilder.addStreams(
+ OrcProto.Stream.newBuilder()
+ .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX)
+ .setLength(sizes.index - unencryptedIndexSize));
+ }
+ footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, sizes));
+ final long unencryptedDataSize = sizes.data;
+ v = 0;
+ for (VariantTracker variant : variants.values()) {
+ OrcProto.StripeEncryptionVariant.Builder builder =
+ footerBuilder.getEncryptionBuilder(v++);
+ builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, sizes));
+ }
+ if (sizes.data != unencryptedDataSize) {
+ // add a placeholder that covers the hole where the encrypted indexes are
+ footerBuilder.addStreams(
+ OrcProto.Stream.newBuilder()
+ .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA)
+ .setLength(sizes.data - unencryptedDataSize));
+ }
+ }
+
+ @Override
+ public void finalizeStripe(
+ OrcProto.StripeFooter.Builder footerBuilder,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ SizeCounters sizes = new SizeCounters();
+ buildStreamList(footerBuilder, sizes);
+
+ OrcProto.StripeFooter footer = footerBuilder.build();
+
+ // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+ padStripe(sizes.total() + footer.getSerializedSize());
+
+ // write the unencrypted index streams
+ unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
+ // write the encrypted index streams
+ for (VariantTracker variant : variants.values()) {
+ variant.writeStreams(StreamName.Area.INDEX, rawWriter);
+ }
+
+ // write the unencrypted data streams
+ unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
+ // write out the unencrypted data streams
+ for (VariantTracker variant : variants.values()) {
+ variant.writeStreams(StreamName.Area.DATA, rawWriter);
+ }
+
+ // Write out the footer.
+ writeStripeFooter(footer, sizes, dirEntry);
+
+ // fill in the data sizes
+ dirEntry.setDataLength(sizes.data);
+ dirEntry.setIndexLength(sizes.index);
+
+ stripeNumber += 1;
+ }
+
+ @Override
+ public void writeHeader() throws IOException {
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ }
+
+ @Override
+ public BufferedStream createDataStream(StreamName name) {
+ VariantTracker variant = getVariant(name.getEncryption());
+ BufferedStream result = variant.streams.get(name);
+ if (result == null) {
+ result = new BufferedStream();
+ variant.streams.put(name, result);
+ }
+ return result;
+ }
+
+ private StreamOptions getOptions(OrcProto.Stream.Kind kind) {
+ return SerializationUtils.getCustomizedCodec(compress, compressionStrategy, kind);
+ }
+
+ protected OutputStream createIndexStream(StreamName name) {
+ BufferedStream buffer = createDataStream(name);
+ VariantTracker tracker = getVariant(name.getEncryption());
+ StreamOptions options =
+ SerializationUtils.getCustomizedCodec(
+ tracker.options, compressionStrategy, name.getKind());
+ if (options.isEncrypted()) {
+ if (options == tracker.options) {
+ options = new StreamOptions(options);
+ }
+ options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1));
+ }
+ return new OutStream(name.toString(), options, buffer);
+ }
+
+ @Override
+ public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) throws IOException {
+ OutputStream stream = createIndexStream(name);
+ index.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom)
+ throws IOException {
+ OutputStream stream = createIndexStream(name);
+ bloom.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder statistics) {
+ VariantTracker tracker = getVariant(name.getEncryption());
+ if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) {
+ tracker.fileStats[name.getColumn() - tracker.rootColumn] = statistics.build();
+ } else {
+ tracker.stripeStats[name.getColumn() - tracker.rootColumn].add(statistics.build());
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (path != null) {
+ return path.toString();
+ } else {
+ return ByteString.EMPTY.toString();
+ }
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java
new file mode 100644
index 000000000000..bbbc8f03172d
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java
@@ -0,0 +1,1063 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import com.github.luben.zstd.util.Native;
+import com.google.protobuf.ByteString;
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.DataMask;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.writer.StreamOptions;
+import org.apache.orc.impl.writer.TreeWriter;
+import org.apache.orc.impl.writer.WriterContext;
+import org.apache.orc.impl.writer.WriterEncryptionKey;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural unit of work when
+ * reading. Each stripe is buffered in memory until the memory reaches the stripe size and then it
+ * is written out broken down by columns. Each column is written by a TreeWriter that is specific to
+ * that type of column. TreeWriters may have children TreeWriters that handle the sub-types. Each of
+ * the TreeWriters writes the column's data as a set of streams.
+ *
+ *
This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and
+ * all access to a single instance has to be from a single thread.
+ *
+ *
There are no known cases where these happen between different threads today.
+ *
+ *
Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a
+ * single thread as well.
+ */
+public class WriterImpl implements WriterInternal, MemoryManager.Callback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ private final Path path;
+ private final long stripeSize;
+ private final long stripeRowCount;
+ private final int rowIndexStride;
+ private final TypeDescription schema;
+ private final PhysicalWriter physicalWriter;
+ private final OrcFile.WriterVersion writerVersion;
+ private final StreamOptions unencryptedOptions;
+
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private long rawDataSize = 0;
+ private int rowsInIndex = 0;
+ private long lastFlushOffset = 0;
+ private int stripesAtLastFlush = -1;
+ private final List stripes = new ArrayList<>();
+ private final Map userMetadata = new TreeMap<>();
+ private final TreeWriter treeWriter;
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+ private long previousAllocation = -1;
+ private long memoryLimit;
+ private final long rowsPerCheck;
+ private long rowsSinceCheck = 0;
+ private final OrcFile.Version version;
+ private final Configuration conf;
+ private final OrcFile.WriterCallback callback;
+ private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean[] bloomFilterColumns;
+ private final double bloomFilterFpp;
+ private final OrcFile.BloomFilterVersion bloomFilterVersion;
+ private final boolean writeTimeZone;
+ private final boolean useUTCTimeZone;
+ private final double dictionaryKeySizeThreshold;
+ private final boolean[] directEncodingColumns;
+ private final List unencryptedEncodings = new ArrayList<>();
+
+ // the list of maskDescriptions, keys, and variants
+ private SortedMap maskDescriptions = new TreeMap<>();
+ private SortedMap keys = new TreeMap<>();
+ private final WriterEncryptionVariant[] encryption;
+ // the mapping of columns to maskDescriptions
+ private final MaskDescriptionImpl[] columnMaskDescriptions;
+ // the mapping of columns to EncryptionVariants
+ private final WriterEncryptionVariant[] columnEncryption;
+ private KeyProvider keyProvider;
+ // do we need to include the current encryption keys in the next stripe
+ // information
+ private boolean needKeyFlush;
+ private final boolean useProlepticGregorian;
+ private boolean isClose = false;
+
+ public WriterImpl(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException {
+ this.path = path;
+ this.conf = opts.getConfiguration();
+ // clone it so that we can annotate it with encryption
+ this.schema = opts.getSchema().clone();
+ int numColumns = schema.getMaximumId() + 1;
+ if (!opts.isEnforceBufferSize()) {
+ opts.bufferSize(
+ getEstimatedBufferSize(opts.getStripeSize(), numColumns, opts.getBufferSize()));
+ }
+
+ // Annotate the schema with the column encryption
+ schema.annotateEncryption(opts.getEncryption(), opts.getMasks());
+ columnEncryption = new WriterEncryptionVariant[numColumns];
+ columnMaskDescriptions = new MaskDescriptionImpl[numColumns];
+ encryption = setupEncryption(opts.getKeyProvider(), schema, opts.getKeyOverrides());
+ needKeyFlush = encryption.length > 0;
+
+ this.directEncodingColumns =
+ OrcUtils.includeColumns(opts.getDirectEncodingColumns(), opts.getSchema());
+ dictionaryKeySizeThreshold = OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+
+ this.callback = opts.getCallback();
+ if (callback != null) {
+ callbackContext = () -> WriterImpl.this;
+ } else {
+ callbackContext = null;
+ }
+
+ this.useProlepticGregorian = opts.getProlepticGregorian();
+ this.writeTimeZone = hasTimestamp(schema);
+ this.useUTCTimeZone = opts.getUseUTCTimestamp();
+
+ this.encodingStrategy = opts.getEncodingStrategy();
+ this.compressionStrategy = opts.getCompressionStrategy();
+
+ if (opts.getRowIndexStride() >= 0) {
+ this.rowIndexStride = opts.getRowIndexStride();
+ } else {
+ this.rowIndexStride = 0;
+ }
+
+ // ORC-1343: We ignore `opts.isBuildIndex` due to the lack of reader support
+ this.buildIndex = rowIndexStride > 0;
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException(
+ "Row stride must be at least " + MIN_ROW_INDEX_STRIDE);
+ }
+
+ this.writerVersion = opts.getWriterVersion();
+ this.version = opts.getVersion();
+ if (version == OrcFile.Version.FUTURE) {
+ throw new IllegalArgumentException("Can not write in a unknown version.");
+ } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) {
+ LOG.warn(
+ "ORC files written in "
+ + version.getName()
+ + " will not be"
+ + " readable by other versions of the software. It is only for"
+ + " developer testing.");
+ }
+
+ this.bloomFilterVersion = opts.getBloomFilterVersion();
+ this.bloomFilterFpp = opts.getBloomFilterFpp();
+ /* do not write bloom filters for ORC v11 */
+ if (!buildIndex || version == OrcFile.Version.V_0_11) {
+ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+ } else {
+ this.bloomFilterColumns = OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+ }
+
+ // ensure that we are able to handle callbacks before we register ourselves
+ rowsPerCheck =
+ Math.min(opts.getStripeRowCountValue(), OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf));
+ this.stripeRowCount = opts.getStripeRowCountValue();
+ this.stripeSize = opts.getStripeSize();
+ memoryLimit = stripeSize;
+ memoryManager = opts.getMemoryManager();
+ memoryManager.addWriter(path, stripeSize, this);
+
+ // Set up the physical writer
+ this.physicalWriter =
+ opts.getPhysicalWriter() == null
+ ? new PhysicalFsWriter(fs, path, opts, encryption)
+ : opts.getPhysicalWriter();
+ physicalWriter.writeHeader();
+ unencryptedOptions = physicalWriter.getStreamOptions();
+ OutStream.assertBufferSizeValid(unencryptedOptions.getBufferSize());
+
+ treeWriter = TreeWriter.Factory.create(schema, null, new StreamFactory());
+
+ LOG.info(
+ "ORC writer created for path: {} with stripeSize: {} options: {}",
+ path,
+ stripeSize,
+ unencryptedOptions);
+ }
+
+ // @VisibleForTesting
+ public static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) {
+ // The worst case is that there are 2 big streams per a column and
+ // we want to guarantee that each stream gets ~10 buffers.
+ // This keeps buffers small enough that we don't get really small stripe
+ // sizes.
+ int estBufferSize = (int) (stripeSize / (20L * numColumns));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ return Math.min(estBufferSize, bs);
+ }
+
+ @Override
+ public void increaseCompressionSize(int newSize) {
+ if (newSize > unencryptedOptions.getBufferSize()) {
+ unencryptedOptions.bufferSize(newSize);
+ }
+ }
+
+ /**
+ * Given a buffer size, return the nearest superior power of 2. Min value is 4Kib, Max value is
+ * 256Kib.
+ *
+ * @param size Proposed buffer size
+ * @return the suggested buffer size
+ */
+ private static int getClosestBufferSize(int size) {
+ final int kb4 = 4 * 1024;
+ final int kb256 = 256 * 1024;
+ final int pow2 = size == 1 ? 1 : Integer.highestOneBit(size - 1) * 2;
+ return Math.min(kb256, Math.max(kb4, pow2));
+ }
+
+ static {
+ try {
+ if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
+ Native.load();
+ }
+ } catch (UnsatisfiedLinkError | ExceptionInInitializerError e) {
+ LOG.warn(
+ "Unable to load zstd-jni library for your platform. "
+ + "Using builtin-java classes where applicable");
+ }
+ }
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ return new AircompressorCodec(kind, new LzoCompressor(), new LzoDecompressor());
+ case LZ4:
+ return new AircompressorCodec(kind, new Lz4Compressor(), new Lz4Decompressor());
+ case ZSTD:
+ if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
+ return new AircompressorCodec(
+ kind, new ZstdCompressor(), new ZstdDecompressor());
+ }
+ if (Native.isLoaded()) {
+ return new ZstdCodec();
+ } else {
+ return new AircompressorCodec(
+ kind, new ZstdCompressor(), new ZstdDecompressor());
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " + kind);
+ }
+ }
+
+ @Override
+ public boolean checkMemory(double newScale) throws IOException {
+ memoryLimit = Math.round(stripeSize * newScale);
+ return checkMemory();
+ }
+
+ private boolean checkMemory() throws IOException {
+ if (rowsSinceCheck >= rowsPerCheck) {
+ rowsSinceCheck = 0;
+ long size = treeWriter.estimateMemory();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "ORC writer "
+ + physicalWriter
+ + " size = "
+ + size
+ + " memoryLimit = "
+ + memoryLimit
+ + " rowsInStripe = "
+ + rowsInStripe
+ + " stripeRowCountLimit = "
+ + stripeRowCount);
+ }
+ if (size > memoryLimit || rowsInStripe >= stripeRowCount) {
+ flushStripe();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility that the TreeWriters
+ * have into the Writer.
+ */
+ private class StreamFactory implements WriterContext {
+
+ /**
+ * Create a stream to store part of a column.
+ *
+ * @param name the name for the stream
+ * @return The output outStream that the section needs to be written to.
+ */
+ @Override
+ public OutStream createStream(StreamName name) throws IOException {
+ StreamOptions options =
+ SerializationUtils.getCustomizedCodec(
+ unencryptedOptions, compressionStrategy, name.getKind());
+ WriterEncryptionVariant encryption = (WriterEncryptionVariant) name.getEncryption();
+ if (encryption != null) {
+ if (options == unencryptedOptions) {
+ options = new StreamOptions(options);
+ }
+ options.withEncryption(
+ encryption.getKeyDescription().getAlgorithm(),
+ encryption.getFileFooterKey())
+ .modifyIv(CryptoUtils.modifyIvForStream(name, 1));
+ }
+ return new OutStream(name, options, physicalWriter.createDataStream(name));
+ }
+
+ /** Get the stride rate of the row index. */
+ @Override
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ *
+ * @return true if we are building the index
+ */
+ @Override
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ *
+ * @return are the streams compressed
+ */
+ @Override
+ public boolean isCompressed() {
+ return unencryptedOptions.getCodec() != null;
+ }
+
+ /**
+ * Get the encoding strategy to use.
+ *
+ * @return encoding strategy
+ */
+ @Override
+ public OrcFile.EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
+ * Get the bloom filter columns.
+ *
+ * @return bloom filter columns
+ */
+ @Override
+ public boolean[] getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ /**
+ * Get bloom filter false positive percentage.
+ *
+ * @return fpp
+ */
+ @Override
+ public double getBloomFilterFPP() {
+ return bloomFilterFpp;
+ }
+
+ /**
+ * Get the writer's configuration.
+ *
+ * @return configuration
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /** Get the version of the file to write. */
+ @Override
+ public OrcFile.Version getVersion() {
+ return version;
+ }
+
+ /**
+ * Get the PhysicalWriter.
+ *
+ * @return the file's physical writer.
+ */
+ @Override
+ public PhysicalWriter getPhysicalWriter() {
+ return physicalWriter;
+ }
+
+ @Override
+ public OrcFile.BloomFilterVersion getBloomFilterVersion() {
+ return bloomFilterVersion;
+ }
+
+ @Override
+ public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index)
+ throws IOException {
+ physicalWriter.writeIndex(name, index);
+ }
+
+ @Override
+ public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom)
+ throws IOException {
+ physicalWriter.writeBloomFilter(name, bloom);
+ }
+
+ @Override
+ public WriterEncryptionVariant getEncryption(int columnId) {
+ return columnId < columnEncryption.length ? columnEncryption[columnId] : null;
+ }
+
+ @Override
+ public DataMask getUnencryptedMask(int columnId) {
+ if (columnMaskDescriptions != null) {
+ MaskDescriptionImpl descr = columnMaskDescriptions[columnId];
+ if (descr != null) {
+ return DataMask.Factory.build(
+ descr,
+ schema.findSubtype(columnId),
+ (type) -> columnMaskDescriptions[type.getId()]);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void setEncoding(
+ int column, WriterEncryptionVariant encryption, OrcProto.ColumnEncoding encoding) {
+ if (encryption == null) {
+ unencryptedEncodings.add(encoding);
+ } else {
+ encryption.addEncoding(encoding);
+ }
+ }
+
+ @Override
+ public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder stats)
+ throws IOException {
+ physicalWriter.writeStatistics(name, stats);
+ }
+
+ @Override
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimeZone;
+ }
+
+ @Override
+ public double getDictionaryKeySizeThreshold(int columnId) {
+ return directEncodingColumns[columnId] ? 0.0 : dictionaryKeySizeThreshold;
+ }
+
+ @Override
+ public boolean getProlepticGregorian() {
+ return useProlepticGregorian;
+ }
+ }
+
+ private static void writeTypes(OrcProto.Footer.Builder builder, TypeDescription schema) {
+ builder.addAllTypes(OrcUtils.getOrcTypes(schema));
+ }
+
+ private void createRowIndexEntry() throws IOException {
+ treeWriter.createRowIndexEntry();
+ rowsInIndex = 0;
+ }
+
+ /**
+ * Write the encrypted keys into the StripeInformation along with the stripe id, so that the
+ * readers can decrypt the data.
+ *
+ * @param dirEntry the entry to modify
+ */
+ private void addEncryptedKeys(OrcProto.StripeInformation.Builder dirEntry) {
+ for (WriterEncryptionVariant variant : encryption) {
+ dirEntry.addEncryptedLocalKeys(
+ ByteString.copyFrom(variant.getMaterial().getEncryptedKey()));
+ }
+ dirEntry.setEncryptStripeId(1 + stripes.size());
+ }
+
+ private void flushStripe() throws IOException {
+ if (buildIndex && rowsInIndex != 0) {
+ createRowIndexEntry();
+ }
+ if (rowsInStripe != 0) {
+ if (callback != null) {
+ callback.preStripeWrite(callbackContext);
+ }
+ // finalize the data for the stripe
+ int requiredIndexEntries =
+ rowIndexStride == 0
+ ? 0
+ : (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+ OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder();
+ if (writeTimeZone) {
+ if (useUTCTimeZone) {
+ builder.setWriterTimezone("UTC");
+ } else {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
+ }
+ treeWriter.flushStreams();
+ treeWriter.writeStripe(requiredIndexEntries);
+ // update the encodings
+ builder.addAllColumns(unencryptedEncodings);
+ unencryptedEncodings.clear();
+ for (WriterEncryptionVariant writerEncryptionVariant : encryption) {
+ OrcProto.StripeEncryptionVariant.Builder encrypt =
+ OrcProto.StripeEncryptionVariant.newBuilder();
+ encrypt.addAllEncoding(writerEncryptionVariant.getEncodings());
+ writerEncryptionVariant.clearEncodings();
+ builder.addEncryption(encrypt);
+ }
+ OrcProto.StripeInformation.Builder dirEntry =
+ OrcProto.StripeInformation.newBuilder().setNumberOfRows(rowsInStripe);
+ if (encryption.length > 0 && needKeyFlush) {
+ addEncryptedKeys(dirEntry);
+ needKeyFlush = false;
+ }
+ physicalWriter.finalizeStripe(builder, dirEntry);
+
+ stripes.add(dirEntry.build());
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ }
+ }
+
+ private long computeRawDataSize() {
+ return treeWriter.getRawDataSize();
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return OrcProto.CompressionKind.NONE;
+ case ZLIB:
+ return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY:
+ return OrcProto.CompressionKind.SNAPPY;
+ case LZO:
+ return OrcProto.CompressionKind.LZO;
+ case LZ4:
+ return OrcProto.CompressionKind.LZ4;
+ case ZSTD:
+ return OrcProto.CompressionKind.ZSTD;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ private void writeMetadata() throws IOException {
+ // The physical writer now has the stripe statistics, so we pass a
+ // new builder in here.
+ physicalWriter.writeFileMetadata(OrcProto.Metadata.newBuilder());
+ }
+
+ private long writePostScript() throws IOException {
+ OrcProto.PostScript.Builder builder =
+ OrcProto.PostScript.newBuilder()
+ .setMagic(OrcFile.MAGIC)
+ .addVersion(version.getMajor())
+ .addVersion(version.getMinor())
+ .setWriterVersion(writerVersion.getId());
+ CompressionCodec codec = unencryptedOptions.getCodec();
+ if (codec == null) {
+ builder.setCompression(OrcProto.CompressionKind.NONE);
+ } else {
+ builder.setCompression(writeCompressionKind(codec.getKind()))
+ .setCompressionBlockSize(unencryptedOptions.getBufferSize());
+ }
+ return physicalWriter.writePostScript(builder);
+ }
+
+ private OrcProto.EncryptionKey.Builder writeEncryptionKey(WriterEncryptionKey key) {
+ OrcProto.EncryptionKey.Builder result = OrcProto.EncryptionKey.newBuilder();
+ HadoopShims.KeyMetadata meta = key.getMetadata();
+ result.setKeyName(meta.getKeyName());
+ result.setKeyVersion(meta.getVersion());
+ result.setAlgorithm(
+ OrcProto.EncryptionAlgorithm.valueOf(meta.getAlgorithm().getSerialization()));
+ return result;
+ }
+
+ private OrcProto.EncryptionVariant.Builder writeEncryptionVariant(
+ WriterEncryptionVariant variant) {
+ OrcProto.EncryptionVariant.Builder result = OrcProto.EncryptionVariant.newBuilder();
+ result.setRoot(variant.getRoot().getId());
+ result.setKey(variant.getKeyDescription().getId());
+ result.setEncryptedKey(ByteString.copyFrom(variant.getMaterial().getEncryptedKey()));
+ return result;
+ }
+
+ private OrcProto.Encryption.Builder writeEncryptionFooter() {
+ OrcProto.Encryption.Builder encrypt = OrcProto.Encryption.newBuilder();
+ for (MaskDescriptionImpl mask : maskDescriptions.values()) {
+ OrcProto.DataMask.Builder maskBuilder = OrcProto.DataMask.newBuilder();
+ maskBuilder.setName(mask.getName());
+ for (String param : mask.getParameters()) {
+ maskBuilder.addMaskParameters(param);
+ }
+ for (TypeDescription column : mask.getColumns()) {
+ maskBuilder.addColumns(column.getId());
+ }
+ encrypt.addMask(maskBuilder);
+ }
+ for (WriterEncryptionKey key : keys.values()) {
+ encrypt.addKey(writeEncryptionKey(key));
+ }
+ for (WriterEncryptionVariant variant : encryption) {
+ encrypt.addVariants(writeEncryptionVariant(variant));
+ }
+ encrypt.setKeyProvider(OrcProto.KeyProviderKind.valueOf(keyProvider.getKind().getValue()));
+ return encrypt;
+ }
+
+ private long writeFooter() throws IOException {
+ writeMetadata();
+ OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+ builder.setNumberOfRows(rowCount);
+ builder.setRowIndexStride(rowIndexStride);
+ rawDataSize = computeRawDataSize();
+ // serialize the types
+ writeTypes(builder, schema);
+ builder.setCalendar(
+ useProlepticGregorian
+ ? OrcProto.CalendarKind.PROLEPTIC_GREGORIAN
+ : OrcProto.CalendarKind.JULIAN_GREGORIAN);
+ // add the stripe information
+ for (OrcProto.StripeInformation stripe : stripes) {
+ builder.addStripes(stripe);
+ }
+ // add the column statistics
+ treeWriter.writeFileStatistics();
+ // add all of the user metadata
+ for (Map.Entry entry : userMetadata.entrySet()) {
+ builder.addMetadata(
+ OrcProto.UserMetadataItem.newBuilder()
+ .setName(entry.getKey())
+ .setValue(entry.getValue()));
+ }
+ if (encryption.length > 0) {
+ builder.setEncryption(writeEncryptionFooter());
+ }
+ builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId());
+ builder.setSoftwareVersion(OrcUtils.getOrcVersion());
+ physicalWriter.writeFileFooter(builder);
+ return writePostScript();
+ }
+
+ @Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void addUserMetadata(String name, ByteBuffer value) {
+ userMetadata.put(name, ByteString.copyFrom(value));
+ }
+
+ @Override
+ public void addRowBatch(VectorizedRowBatch batch) throws IOException {
+ try {
+ // If this is the first set of rows in this stripe, tell the tree writers
+ // to prepare the stripe.
+ if (batch.size != 0 && rowsInStripe == 0) {
+ treeWriter.prepareStripe(stripes.size() + 1);
+ }
+ if (buildIndex) {
+ // Batch the writes up to the rowIndexStride so that we can get the
+ // right size indexes.
+ int posn = 0;
+ while (posn < batch.size) {
+ int chunkSize = Math.min(batch.size - posn, rowIndexStride - rowsInIndex);
+ if (batch.isSelectedInUse()) {
+ // find the longest chunk that is continuously selected from posn
+ for (int len = 1; len < chunkSize; ++len) {
+ if (batch.selected[posn + len] - batch.selected[posn] != len) {
+ chunkSize = len;
+ break;
+ }
+ }
+ treeWriter.writeRootBatch(batch, batch.selected[posn], chunkSize);
+ } else {
+ treeWriter.writeRootBatch(batch, posn, chunkSize);
+ }
+ posn += chunkSize;
+ rowsInIndex += chunkSize;
+ rowsInStripe += chunkSize;
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
+ }
+ } else {
+ if (batch.isSelectedInUse()) {
+ int posn = 0;
+ while (posn < batch.size) {
+ int chunkSize = 1;
+ while (posn + chunkSize < batch.size) {
+ // find the longest chunk that is continuously selected from posn
+ if (batch.selected[posn + chunkSize] - batch.selected[posn]
+ != chunkSize) {
+ break;
+ }
+ ++chunkSize;
+ }
+ treeWriter.writeRootBatch(batch, batch.selected[posn], chunkSize);
+ posn += chunkSize;
+ }
+ } else {
+ treeWriter.writeRootBatch(batch, 0, batch.size);
+ }
+ rowsInStripe += batch.size;
+ }
+ rowsSinceCheck += batch.size;
+ previousAllocation = memoryManager.checkMemory(previousAllocation, this);
+ checkMemory();
+ } catch (Throwable t) {
+ try {
+ close();
+ } catch (Throwable ignore) {
+ // ignore
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new IOException("Problem adding row to " + path, t);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!isClose) {
+ try {
+ if (callback != null) {
+ callback.preFooterWrite(callbackContext);
+ }
+ // remove us from the memory manager so that we don't get any callbacks
+ memoryManager.removeWriter(path);
+ // actually close the file
+ flushStripe();
+ lastFlushOffset = writeFooter();
+ physicalWriter.close();
+ } finally {
+ isClose = true;
+ }
+ }
+ }
+
+ /**
+ * Raw data size will be compute when writing the file footer. Hence raw data size value will be
+ * available only after closing the writer.
+ */
+ @Override
+ public long getRawDataSize() {
+ return rawDataSize;
+ }
+
+ /**
+ * Row count gets updated when flushing the stripes. To get accurate row count call this method
+ * after writer is closed.
+ */
+ @Override
+ public long getNumberOfRows() {
+ return rowCount;
+ }
+
+ @Override
+ public long writeIntermediateFooter() throws IOException {
+ // flush any buffered rows
+ flushStripe();
+ // write a footer
+ if (stripesAtLastFlush != stripes.size()) {
+ if (callback != null) {
+ callback.preFooterWrite(callbackContext);
+ }
+ lastFlushOffset = writeFooter();
+ stripesAtLastFlush = stripes.size();
+ physicalWriter.flush();
+ }
+ return lastFlushOffset;
+ }
+
+ private static void checkArgument(boolean expression, String message) {
+ if (!expression) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ @Override
+ public void appendStripe(
+ byte[] stripe,
+ int offset,
+ int length,
+ StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics)
+ throws IOException {
+ appendStripe(
+ stripe,
+ offset,
+ length,
+ stripeInfo,
+ new StripeStatistics[] {
+ new StripeStatisticsImpl(
+ schema, stripeStatistics.getColStatsList(), false, false)
+ });
+ }
+
+ @Override
+ public void appendStripe(
+ byte[] stripe,
+ int offset,
+ int length,
+ StripeInformation stripeInfo,
+ StripeStatistics[] stripeStatistics)
+ throws IOException {
+ checkArgument(stripe != null, "Stripe must not be null");
+ checkArgument(
+ length <= stripe.length,
+ "Specified length must not be greater specified array length");
+ checkArgument(stripeInfo != null, "Stripe information must not be null");
+ checkArgument(stripeStatistics != null, "Stripe statistics must not be null");
+
+ // If we have buffered rows, flush them
+ if (rowsInStripe > 0) {
+ flushStripe();
+ }
+ rowsInStripe = stripeInfo.getNumberOfRows();
+ // update stripe information
+ OrcProto.StripeInformation.Builder dirEntry =
+ OrcProto.StripeInformation.newBuilder()
+ .setNumberOfRows(rowsInStripe)
+ .setIndexLength(stripeInfo.getIndexLength())
+ .setDataLength(stripeInfo.getDataLength())
+ .setFooterLength(stripeInfo.getFooterLength());
+ // If this is the first stripe of the original file, we need to copy the
+ // encryption information.
+ if (stripeInfo.hasEncryptionStripeId()) {
+ dirEntry.setEncryptStripeId(stripeInfo.getEncryptionStripeId());
+ for (byte[] key : stripeInfo.getEncryptedLocalKeys()) {
+ dirEntry.addEncryptedLocalKeys(ByteString.copyFrom(key));
+ }
+ }
+ physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length), dirEntry);
+
+ // since we have already written the stripe, just update stripe statistics
+ treeWriter.addStripeStatistics(stripeStatistics);
+
+ stripes.add(dirEntry.build());
+
+ // reset it after writing the stripe
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ needKeyFlush = encryption.length > 0;
+ }
+
+ @Override
+ public void appendUserMetadata(List userMetadata) {
+ if (userMetadata != null) {
+ for (OrcProto.UserMetadataItem item : userMetadata) {
+ this.userMetadata.put(item.getName(), item.getValue());
+ }
+ }
+ }
+
+ @Override
+ public ColumnStatistics[] getStatistics() {
+ // get the column statistics
+ final ColumnStatistics[] result = new ColumnStatistics[schema.getMaximumId() + 1];
+ // Get the file statistics, preferring the encrypted one.
+ treeWriter.getCurrentStatistics(result);
+ return result;
+ }
+
+ @Override
+ public List getStripes() throws IOException {
+ return Collections.unmodifiableList(OrcUtils.convertProtoStripesToStripes(stripes));
+ }
+
+ public CompressionCodec getCompressionCodec() {
+ return unencryptedOptions.getCodec();
+ }
+
+ private static boolean hasTimestamp(TypeDescription schema) {
+ if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) {
+ return true;
+ }
+ List children = schema.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ if (hasTimestamp(child)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private WriterEncryptionKey getKey(String keyName, KeyProvider provider) throws IOException {
+ WriterEncryptionKey result = keys.get(keyName);
+ if (result == null) {
+ result = new WriterEncryptionKey(provider.getCurrentKeyVersion(keyName));
+ keys.put(keyName, result);
+ }
+ return result;
+ }
+
+ private MaskDescriptionImpl getMask(String maskString) {
+ // if it is already there, get the earlier object
+ MaskDescriptionImpl result = maskDescriptions.get(maskString);
+ if (result == null) {
+ result = ParserUtils.buildMaskDescription(maskString);
+ maskDescriptions.put(maskString, result);
+ }
+ return result;
+ }
+
+ private int visitTypeTree(TypeDescription schema, boolean encrypted, KeyProvider provider)
+ throws IOException {
+ int result = 0;
+ String keyName = schema.getAttributeValue(TypeDescription.ENCRYPT_ATTRIBUTE);
+ String maskName = schema.getAttributeValue(TypeDescription.MASK_ATTRIBUTE);
+ if (keyName != null) {
+ if (provider == null) {
+ throw new IllegalArgumentException("Encryption requires a KeyProvider.");
+ }
+ if (encrypted) {
+ throw new IllegalArgumentException("Nested encryption type: " + schema);
+ }
+ encrypted = true;
+ result += 1;
+ WriterEncryptionKey key = getKey(keyName, provider);
+ HadoopShims.KeyMetadata metadata = key.getMetadata();
+ WriterEncryptionVariant variant =
+ new WriterEncryptionVariant(key, schema, provider.createLocalKey(metadata));
+ key.addRoot(variant);
+ }
+ if (encrypted && (keyName != null || maskName != null)) {
+ MaskDescriptionImpl mask = getMask(maskName == null ? "nullify" : maskName);
+ mask.addColumn(schema);
+ }
+ List children = schema.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ result += visitTypeTree(child, encrypted, provider);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Iterate through the encryption options given by the user and set up our data structures.
+ *
+ * @param provider the KeyProvider to use to generate keys
+ * @param schema the type tree that we search for annotations
+ * @param keyOverrides user specified key overrides
+ */
+ private WriterEncryptionVariant[] setupEncryption(
+ KeyProvider provider,
+ TypeDescription schema,
+ Map keyOverrides)
+ throws IOException {
+ keyProvider =
+ provider != null ? provider : CryptoUtils.getKeyProvider(conf, new SecureRandom());
+ // Load the overrides into the cache so that we use the required key versions.
+ for (HadoopShims.KeyMetadata key : keyOverrides.values()) {
+ keys.put(key.getKeyName(), new WriterEncryptionKey(key));
+ }
+ int variantCount = visitTypeTree(schema, false, keyProvider);
+
+ // Now that we have de-duped the keys and maskDescriptions, make the arrays
+ int nextId = 0;
+ if (variantCount > 0) {
+ for (MaskDescriptionImpl mask : maskDescriptions.values()) {
+ mask.setId(nextId++);
+ for (TypeDescription column : mask.getColumns()) {
+ this.columnMaskDescriptions[column.getId()] = mask;
+ }
+ }
+ }
+ nextId = 0;
+ int nextVariantId = 0;
+ WriterEncryptionVariant[] result = new WriterEncryptionVariant[variantCount];
+ for (WriterEncryptionKey key : keys.values()) {
+ key.setId(nextId++);
+ key.sortRoots();
+ for (WriterEncryptionVariant variant : key.getEncryptionRoots()) {
+ result[nextVariantId] = variant;
+ columnEncryption[variant.getRoot().getId()] = variant;
+ variant.setId(nextVariantId++);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public long estimateMemory() {
+ return this.treeWriter.estimateMemory();
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java b/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java
new file mode 100644
index 000000000000..2eeb723c4ac5
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdCompressCtx;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** orc ZstdCodec. */
+public class ZstdCodec implements CompressionCodec, DirectDecompressionCodec {
+ private ZstdOptions zstdOptions = null;
+ private ZstdCompressCtx zstdCompressCtx = null;
+
+ public ZstdCodec(int level, int windowLog) {
+ this.zstdOptions = new ZstdOptions(level, windowLog);
+ }
+
+ public ZstdCodec() {
+ this(3, 0);
+ }
+
+ public ZstdOptions getZstdOptions() {
+ return zstdOptions;
+ }
+
+ // Thread local buffer
+ private static final ThreadLocal threadBuffer = ThreadLocal.withInitial(() -> null);
+
+ protected static byte[] getBuffer(int size) {
+ byte[] result = threadBuffer.get();
+ if (result == null || result.length < size || result.length > size * 2) {
+ result = new byte[size];
+ threadBuffer.set(result);
+ }
+ return result;
+ }
+
+ static class ZstdOptions implements Options {
+ private int level;
+ private int windowLog;
+
+ ZstdOptions(int level, int windowLog) {
+ this.level = level;
+ this.windowLog = windowLog;
+ }
+
+ @Override
+ public ZstdOptions copy() {
+ return new ZstdOptions(level, windowLog);
+ }
+
+ @Override
+ public Options setSpeed(SpeedModifier newValue) {
+ return this;
+ }
+
+ /**
+ * Sets the Zstandard long mode maximum back-reference distance, expressed as a power of 2.
+ *
+ * The value must be between ZSTD_WINDOWLOG_MIN (10) and ZSTD_WINDOWLOG_MAX (30 and 31 on
+ * 32/64-bit architectures, respectively).
+ *
+ *
A value of 0 is a special value indicating to use the default
+ * ZSTD_WINDOWLOG_LIMIT_DEFAULT of 27, which corresponds to back-reference window size of
+ * 128MiB.
+ *
+ * @param newValue The desired power-of-2 value back-reference distance.
+ * @return ZstdOptions
+ */
+ public ZstdOptions setWindowLog(int newValue) {
+ if ((newValue < Zstd.windowLogMin() || newValue > Zstd.windowLogMax())
+ && newValue != 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Zstd compression window size should be in the range %d to %d,"
+ + " or set to the default value of 0.",
+ Zstd.windowLogMin(), Zstd.windowLogMax()));
+ }
+ windowLog = newValue;
+ return this;
+ }
+
+ /**
+ * Sets the Zstandard compression codec compression level directly using the integer
+ * setting. This value is typically between 0 and 22, with larger numbers indicating more
+ * aggressive compression and lower speed.
+ *
+ *
This method provides additional granularity beyond the setSpeed method so that users
+ * can select a specific level.
+ *
+ * @param newValue The level value of compression to set.
+ * @return ZstdOptions
+ */
+ public ZstdOptions setLevel(int newValue) {
+ if (newValue < Zstd.minCompressionLevel() || newValue > Zstd.maxCompressionLevel()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Zstd compression level should be in the range %d to %d",
+ Zstd.minCompressionLevel(), Zstd.maxCompressionLevel()));
+ }
+ level = newValue;
+ return this;
+ }
+
+ @Override
+ public ZstdOptions setData(DataKind newValue) {
+ return this; // We don't support setting DataKind in ZstdCodec.
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ZstdOptions that = (ZstdOptions) o;
+
+ if (level != that.level) {
+ return false;
+ }
+ return windowLog == that.windowLog;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = level;
+ result = 31 * result + windowLog;
+ return result;
+ }
+ }
+
+ private static final ZstdOptions DEFAULT_OPTIONS = new ZstdOptions(3, 0);
+
+ @Override
+ public Options getDefaultOptions() {
+ return DEFAULT_OPTIONS;
+ }
+
+ /**
+ * Compresses an input ByteBuffer into an output ByteBuffer using Zstandard compression. If the
+ * maximum bound of the number of output bytes exceeds the output ByteBuffer size, the remaining
+ * bytes are written to the overflow ByteBuffer.
+ *
+ * @param in the bytes to compress
+ * @param out the compressed bytes
+ * @param overflow put any additional bytes here
+ * @param options the options to control compression
+ * @return ZstdOptions
+ */
+ @Override
+ public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow, Options options)
+ throws IOException {
+ int inBytes = in.remaining();
+ // Skip with minimum ZStandard format size:
+ // https://datatracker.ietf.org/doc/html/rfc8878#name-zstandard-frames
+ // Magic Number (4 bytes) + Frame Header (2 bytes) + Data Block Header (3 bytes)
+ if (inBytes < 10) {
+ return false;
+ }
+
+ ZstdOptions zso = (ZstdOptions) options;
+
+ zstdCompressCtx = new ZstdCompressCtx();
+ zstdCompressCtx.setLevel(zso.level);
+ zstdCompressCtx.setLong(zso.windowLog);
+ zstdCompressCtx.setChecksum(false);
+
+ try {
+ byte[] compressed = getBuffer((int) Zstd.compressBound(inBytes));
+
+ int outBytes =
+ zstdCompressCtx.compressByteArray(
+ compressed,
+ 0,
+ compressed.length,
+ in.array(),
+ in.arrayOffset() + in.position(),
+ inBytes);
+ if (outBytes < inBytes) {
+ int remaining = out.remaining();
+ if (remaining >= outBytes) {
+ System.arraycopy(
+ compressed,
+ 0,
+ out.array(),
+ out.arrayOffset() + out.position(),
+ outBytes);
+ out.position(out.position() + outBytes);
+ } else {
+ System.arraycopy(
+ compressed,
+ 0,
+ out.array(),
+ out.arrayOffset() + out.position(),
+ remaining);
+ out.position(out.limit());
+ System.arraycopy(
+ compressed,
+ remaining,
+ overflow.array(),
+ overflow.arrayOffset(),
+ outBytes - remaining);
+ overflow.position(outBytes - remaining);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ zstdCompressCtx.close();
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ if (in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
+
+ int srcOffset = in.arrayOffset() + in.position();
+ int srcSize = in.remaining();
+ int dstOffset = out.arrayOffset() + out.position();
+ int dstSize = out.remaining() - dstOffset;
+
+ long decompressOut =
+ Zstd.decompressByteArray(
+ out.array(), dstOffset, dstSize, in.array(), srcOffset, srcSize);
+ in.position(in.limit());
+ out.position(dstOffset + (int) decompressOut);
+ out.flip();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ Zstd.decompress(out, in);
+ out.flip();
+ }
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public void destroy() {
+ if (zstdCompressCtx != null) {
+ zstdCompressCtx.close();
+ }
+ }
+
+ @Override
+ public CompressionKind getKind() {
+ return CompressionKind.ZSTD;
+ }
+
+ @Override
+ public void close() {
+ OrcCodecPool.returnCodec(CompressionKind.ZSTD, this);
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index ac33991c3062..1a925c7250f3 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -79,7 +79,7 @@ public OrcFileFormat(FormatContext formatContext) {
}
@VisibleForTesting
- Properties orcProperties() {
+ public Properties orcProperties() {
return orcProperties;
}
diff --git a/paimon-format/src/main/resources/META-INF/NOTICE b/paimon-format/src/main/resources/META-INF/NOTICE
index f6e607ef3502..dae8d5fec19a 100644
--- a/paimon-format/src/main/resources/META-INF/NOTICE
+++ b/paimon-format/src/main/resources/META-INF/NOTICE
@@ -6,8 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.orc:orc-core:1.8.3
-- org.apache.orc:orc-shims:1.8.3
+- org.apache.orc:orc-core:1.9.2
+- org.apache.orc:orc-shims:1.9.2
- org.apache.hive:hive-storage-api:2.8.1
- io.airlift:aircompressor:0.21
- commons-lang:commons-lang:2.6
@@ -32,5 +32,5 @@ You find it under licenses/LICENSE.protobuf, licenses/LICENSE.zstd-jni
and licenses/LICENSE.threeten-extra
- com.google.protobuf:protobuf-java:3.17.3
-- com.github.luben:zstd-jni:1.5.0-1
+- com.github.luben:zstd-jni:1.5.5-11
- org.threeten:threeten-extra:1.7.1
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java
new file mode 100644
index 000000000000..d5f8f8bc7cc7
--- /dev/null
+++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.orc.writer;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.orc.OrcFileFormat;
+import org.apache.paimon.format.orc.OrcFileFormatFactory;
+import org.apache.paimon.format.orc.OrcWriterFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import com.github.luben.zstd.ZstdException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.ZstdCodec;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.paimon.format.orc.OrcFileFormatFactory.IDENTIFIER;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+class OrcZstdTest {
+
+ @Test
+ void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOException {
+ Options options = new Options();
+ options.set("compress", "zstd");
+ options.set("stripe.size", "31457280");
+ options.set("compression.zstd.level", "1");
+ OrcFileFormat orc =
+ new OrcFileFormatFactory()
+ .create(new FileFormatFactory.FormatContext(options, 1024));
+ Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class);
+
+ Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", ""))
+ .isEqualTo("zstd");
+ Assertions.assertThat(
+ orc.orcProperties().getProperty(IDENTIFIER + ".compression.zstd.level", ""))
+ .isEqualTo("1");
+ Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".stripe.size", ""))
+ .isEqualTo("31457280");
+
+ RowType rowType =
+ RowType.builder()
+ .field("a", DataTypes.INT())
+ .field("b", DataTypes.STRING())
+ .field("c", DataTypes.STRING())
+ .field("d", DataTypes.STRING())
+ .build();
+ FormatWriterFactory writerFactory = orc.createWriterFactory(rowType);
+ Assertions.assertThat(writerFactory).isInstanceOf(OrcWriterFactory.class);
+
+ Path path = new Path(tempDir.toUri().toString(), "1.orc");
+ PositionOutputStream out = LocalFileIO.create().newOutputStream(path, true);
+ FormatWriter formatWriter = writerFactory.create(out, "zstd");
+
+ Assertions.assertThat(formatWriter).isInstanceOf(OrcBulkWriter.class);
+
+ Options optionsWithLowLevel = new Options();
+ optionsWithLowLevel.set("compress", "zstd");
+ optionsWithLowLevel.set("stripe.size", "31457280");
+ optionsWithLowLevel.set("compression.zstd.level", "1");
+
+ Random random = new Random();
+ for (int i = 0; i < 1000; i++) {
+ GenericRow element =
+ GenericRow.of(
+ random.nextInt(),
+ BinaryString.fromString(
+ UUID.randomUUID().toString() + random.nextInt()),
+ BinaryString.fromString(
+ UUID.randomUUID().toString() + random.nextInt()),
+ BinaryString.fromString(
+ UUID.randomUUID().toString() + random.nextInt()));
+ formatWriter.addElement(element);
+ }
+ formatWriter.finish();
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration());
+ Reader reader =
+ OrcFile.createReader(new org.apache.hadoop.fs.Path(path.toString()), readerOptions);
+ Assertions.assertThat(reader.getNumberOfRows()).isEqualTo(1000);
+ Assertions.assertThat(reader.getCompressionKind()).isEqualTo(CompressionKind.ZSTD);
+ Assertions.assertThat(com.github.luben.zstd.util.Native.isLoaded()).isEqualTo(true);
+ }
+
+ @Test
+ public void testCorrupt() throws Exception {
+ ByteBuffer buf = ByteBuffer.allocate(1000);
+ buf.put(new byte[] {127, 125, 1, 99, 98, 1});
+ buf.flip();
+ try (CompressionCodec codec = new ZstdCodec()) {
+ ByteBuffer out = ByteBuffer.allocate(1000);
+ codec.decompress(buf, out);
+ fail();
+ } catch (ZstdException ioe) {
+ // EXPECTED
+ }
+ }
+
+ @Test
+ public void testZstdDirectDecompress() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(10000);
+ ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
+ ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ try (ZstdCodec zstdCodec = new ZstdCodec()) {
+ // write bytes to heap buffer.
+ assertTrue(zstdCodec.compress(in, out, null, zstdCodec.getDefaultOptions()));
+ int position = out.position();
+ out.flip();
+ // copy heap buffer to direct buffer.
+ directOut.put(out.array());
+ directOut.flip();
+ directOut.limit(position);
+
+ zstdCodec.decompress(directOut, directResult);
+
+ // copy result from direct buffer to heap.
+ byte[] heapBytes = new byte[in.array().length];
+ directResult.get(heapBytes, 0, directResult.limit());
+
+ assertArrayEquals(in.array(), heapBytes);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+}