From 532cd8f6e85618947375901f7955dce266fc04e2 Mon Sep 17 00:00:00 2001 From: Jason Zhang Date: Wed, 20 Mar 2024 17:49:15 +0800 Subject: [PATCH] [orc] ORC Support ZStandard compression using zstd-jni (#3056) --- paimon-format/pom.xml | 33 +- .../java/org/apache/orc/CompressionKind.java | 32 + .../src/main/java/org/apache/orc/OrcConf.java | 519 +++++++ .../src/main/java/org/apache/orc/OrcFile.java | 1336 +++++++++++++++++ .../org/apache/orc/impl/PhysicalFsWriter.java | 793 ++++++++++ .../java/org/apache/orc/impl/WriterImpl.java | 1063 +++++++++++++ .../java/org/apache/orc/impl/ZstdCodec.java | 287 ++++ .../paimon/format/orc/OrcFileFormat.java | 2 +- .../src/main/resources/META-INF/NOTICE | 6 +- .../paimon/format/orc/writer/OrcZstdTest.java | 166 ++ 10 files changed, 4232 insertions(+), 5 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/orc/CompressionKind.java create mode 100644 paimon-format/src/main/java/org/apache/orc/OrcConf.java create mode 100644 paimon-format/src/main/java/org/apache/orc/OrcFile.java create mode 100644 paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java create mode 100644 paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java create mode 100644 paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java create mode 100644 paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml index b9e755996011..1b38aad99bf5 100644 --- a/paimon-format/pom.xml +++ b/paimon-format/pom.xml @@ -33,10 +33,13 @@ under the License. 1.13.1 - 1.8.3 + 1.9.2 2.5 1.6 3.12.0 + 1.5.5-11 + 2.8.1 + 3.17.3 @@ -53,6 +56,12 @@ under the License. ${snappy.version} + + com.github.luben + zstd-jni + ${zstd-jni.version} + + @@ -116,6 +125,24 @@ under the License. ${commons.lang3.version} + + org.apache.hive + hive-storage-api + ${storage-api.version} + + + org.apache.hadoop + hadoop-hdfs + + + + + + com.google.protobuf + protobuf-java + ${protobuf-java.version} + + @@ -150,6 +177,10 @@ under the License. org.xerial.snappy snappy-java + + zstd-jni + com.github.luben + diff --git a/paimon-format/src/main/java/org/apache/orc/CompressionKind.java b/paimon-format/src/main/java/org/apache/orc/CompressionKind.java new file mode 100644 index 000000000000..9841f5a2919e --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/CompressionKind.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc; + +/** + * An enumeration that lists the generic compression algorithms that can be applied to ORC files. + */ +public enum CompressionKind { + NONE, + ZLIB, + SNAPPY, + LZO, + LZ4, + ZSTD, + BROTLI +} diff --git a/paimon-format/src/main/java/org/apache/orc/OrcConf.java b/paimon-format/src/main/java/org/apache/orc/OrcConf.java new file mode 100644 index 000000000000..2e1507eedf82 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/OrcConf.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** Define the configuration properties that Orc understands. */ +public enum OrcConf { + STRIPE_SIZE( + "orc.stripe.size", + "hive.exec.orc.default.stripe.size", + 64L * 1024 * 1024, + "Define the default ORC stripe size, in bytes."), + STRIPE_ROW_COUNT( + "orc.stripe.row.count", + "orc.stripe.row.count", + Integer.MAX_VALUE, + "This value limit the row count in one stripe. \n" + + "The number of stripe rows can be controlled at \n" + + "(0, \"orc.stripe.row.count\" + max(batchSize, \"orc.rows.between.memory.checks\"))"), + BLOCK_SIZE( + "orc.block.size", + "hive.exec.orc.default.block.size", + 256L * 1024 * 1024, + "Define the default file system block size for ORC files."), + ENABLE_INDEXES( + "orc.create.index", + "orc.create.index", + true, + "Should the ORC writer create indexes as part of the file."), + ROW_INDEX_STRIDE( + "orc.row.index.stride", + "hive.exec.orc.default.row.index.stride", + 10000, + "Define the default ORC index stride in number of rows. (Stride is the\n" + + " number of rows an index entry represents.)"), + BUFFER_SIZE( + "orc.compress.size", + "hive.exec.orc.default.buffer.size", + 256 * 1024, + "Define the default ORC buffer size, in bytes."), + BASE_DELTA_RATIO( + "orc.base.delta.ratio", + "hive.exec.orc.base.delta.ratio", + 8, + "The ratio of base writer and delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."), + BLOCK_PADDING( + "orc.block.padding", + "hive.exec.orc.default.block.padding", + true, + "Define whether stripes should be padded to the HDFS block boundaries."), + COMPRESS( + "orc.compress", + "hive.exec.orc.default.compress", + "ZLIB", + "Define the default compression codec for ORC file"), + WRITE_FORMAT( + "orc.write.format", + "hive.exec.orc.write.format", + "0.12", + "Define the version of the file to write. Possible values are 0.11 and\n" + + " 0.12. If this parameter is not defined, ORC will use the run\n" + + " length encoding (RLE) introduced in Hive 0.12."), + ENFORCE_COMPRESSION_BUFFER_SIZE( + "orc.buffer.size.enforce", + "hive.exec.orc.buffer.size.enforce", + false, + "Defines whether to enforce ORC compression buffer size."), + ENCODING_STRATEGY( + "orc.encoding.strategy", + "hive.exec.orc.encoding.strategy", + "SPEED", + "Define the encoding strategy to use while writing data. Changing this\n" + + "will only affect the light weight encoding for integers. This\n" + + "flag will not change the compression level of higher level\n" + + "compression codec (like ZLIB)."), + COMPRESSION_STRATEGY( + "orc.compression.strategy", + "hive.exec.orc.compression.strategy", + "SPEED", + "Define the compression strategy to use while writing data.\n" + + "This changes the compression level of higher level compression\n" + + "codec (like ZLIB)."), + COMPRESSION_ZSTD_LEVEL( + "orc.compression.zstd.level", + "hive.exec.orc.compression.zstd.level", + 1, + "Define the compression level to use with ZStandard codec " + + "while writing data. The valid range is 1~22"), + COMPRESSION_ZSTD_WINDOWLOG( + "orc.compression.zstd.windowlog", + "hive.exec.orc.compression.zstd.windowlog", + 0, + "Set the maximum allowed back-reference distance for " + + "ZStandard codec, expressed as power of 2."), + BLOCK_PADDING_TOLERANCE( + "orc.block.padding.tolerance", + "hive.exec.orc.block.padding.tolerance", + 0.05, + "Define the tolerance for block padding as a decimal fraction of\n" + + "stripe size (for example, the default value 0.05 is 5% of the\n" + + "stripe size). For the defaults of 64Mb ORC stripe and 256Mb HDFS\n" + + "blocks, the default block padding tolerance of 5% will\n" + + "reserve a maximum of 3.2Mb for padding within the 256Mb block.\n" + + "In that case, if the available size within the block is more than\n" + + "3.2Mb, a new smaller stripe will be inserted to fit within that\n" + + "space. This will make sure that no stripe written will block\n" + + " boundaries and cause remote reads within a node local task."), + BLOOM_FILTER_FPP( + "orc.bloom.filter.fpp", + "orc.default.bloom.fpp", + 0.01, + "Define the default false positive probability for bloom filters."), + USE_ZEROCOPY( + "orc.use.zerocopy", + "hive.exec.orc.zerocopy", + false, + "Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)"), + SKIP_CORRUPT_DATA( + "orc.skip.corrupt.data", + "hive.exec.orc.skip.corrupt.data", + false, + "If ORC reader encounters corrupt data, this value will be used to\n" + + "determine whether to skip the corrupt data or throw exception.\n" + + "The default behavior is to throw exception."), + TOLERATE_MISSING_SCHEMA( + "orc.tolerate.missing.schema", + "hive.exec.orc.tolerate.missing.schema", + true, + "Writers earlier than HIVE-4243 may have inaccurate schema metadata.\n" + + "This setting will enable best effort schema evolution rather\n" + + "than rejecting mismatched schemas"), + MEMORY_POOL( + "orc.memory.pool", + "hive.exec.orc.memory.pool", + 0.5, + "Maximum fraction of heap that can be used by ORC file writers"), + DICTIONARY_KEY_SIZE_THRESHOLD( + "orc.dictionary.key.threshold", + "hive.exec.orc.dictionary.key.size.threshold", + 0.8, + "If the number of distinct keys in a dictionary is greater than this\n" + + "fraction of the total number of non-null rows, turn off \n" + + "dictionary encoding. Use 1 to always use dictionary encoding."), + ROW_INDEX_STRIDE_DICTIONARY_CHECK( + "orc.dictionary.early.check", + "hive.orc.row.index.stride.dictionary.check", + true, + "If enabled dictionary check will happen after first row index stride\n" + + "(default 10000 rows) else dictionary check will happen before\n" + + "writing first stripe. In both cases, the decision to use\n" + + "dictionary or not will be retained thereafter."), + DICTIONARY_IMPL( + "orc.dictionary.implementation", + "orc.dictionary.implementation", + "rbtree", + "the implementation for the dictionary used for string-type column encoding.\n" + + "The choices are:\n" + + " rbtree - use red-black tree as the implementation for the dictionary.\n" + + " hash - use hash table as the implementation for the dictionary."), + BLOOM_FILTER_COLUMNS( + "orc.bloom.filter.columns", + "orc.bloom.filter.columns", + "", + "List of columns to create bloom filters for when writing."), + BLOOM_FILTER_WRITE_VERSION( + "orc.bloom.filter.write.version", + "orc.bloom.filter.write.version", + OrcFile.BloomFilterVersion.UTF8.toString(), + "Which version of the bloom filters should we write.\n" + + "The choices are:\n" + + " original - writes two versions of the bloom filters for use by\n" + + " both old and new readers.\n" + + " utf8 - writes just the new bloom filters."), + IGNORE_NON_UTF8_BLOOM_FILTERS( + "orc.bloom.filter.ignore.non-utf8", + "orc.bloom.filter.ignore.non-utf8", + false, + "Should the reader ignore the obsolete non-UTF8 bloom filters."), + MAX_FILE_LENGTH( + "orc.max.file.length", + "orc.max.file.length", + Long.MAX_VALUE, + "The maximum size of the file to read for finding the file tail. This\n" + + "is primarily used for streaming ingest to read intermediate\n" + + "footers while the file is still open"), + MAPRED_INPUT_SCHEMA( + "orc.mapred.input.schema", + null, + null, + "The schema that the user desires to read. The values are\n" + + "interpreted using TypeDescription.fromString."), + MAPRED_SHUFFLE_KEY_SCHEMA( + "orc.mapred.map.output.key.schema", + null, + null, + "The schema of the MapReduce shuffle key. The values are\n" + + "interpreted using TypeDescription.fromString."), + MAPRED_SHUFFLE_VALUE_SCHEMA( + "orc.mapred.map.output.value.schema", + null, + null, + "The schema of the MapReduce shuffle value. The values are\n" + + "interpreted using TypeDescription.fromString."), + MAPRED_OUTPUT_SCHEMA( + "orc.mapred.output.schema", + null, + null, + "The schema that the user desires to write. The values are\n" + + "interpreted using TypeDescription.fromString."), + INCLUDE_COLUMNS( + "orc.include.columns", + "hive.io.file.readcolumn.ids", + null, + "The list of comma separated column ids that should be read with 0\n" + + "being the first column, 1 being the next, and so on. ."), + KRYO_SARG( + "orc.kryo.sarg", + "orc.kryo.sarg", + null, + "The kryo and base64 encoded SearchArgument for predicate pushdown."), + KRYO_SARG_BUFFER( + "orc.kryo.sarg.buffer", + null, + 8192, + "The kryo buffer size for SearchArgument for predicate pushdown."), + SARG_COLUMNS( + "orc.sarg.column.names", + "orc.sarg.column.names", + null, + "The list of column names for the SearchArgument."), + FORCE_POSITIONAL_EVOLUTION( + "orc.force.positional.evolution", + "orc.force.positional.evolution", + false, + "Require schema evolution to match the top level columns using position\n" + + "rather than column names. This provides backwards compatibility with\n" + + "Hive 2.1."), + FORCE_POSITIONAL_EVOLUTION_LEVEL( + "orc.force.positional.evolution.level", + "orc.force.positional.evolution.level", + 1, + "Require schema evolution to match the the defined no. of level columns using position\n" + + "rather than column names. This provides backwards compatibility with Hive 2.1."), + ROWS_BETWEEN_CHECKS( + "orc.rows.between.memory.checks", + "orc.rows.between.memory.checks", + 5000, + "How often should MemoryManager check the memory sizes? Measured in rows\n" + + "added to all of the writers. Valid range is [1,10000] and is primarily meant for" + + "testing. Setting this too low may negatively affect performance." + + " Use orc.stripe.row.count instead if the value larger than orc.stripe.row.count."), + OVERWRITE_OUTPUT_FILE( + "orc.overwrite.output.file", + "orc.overwrite.output.file", + false, + "A boolean flag to enable overwriting of the output file if it already exists.\n"), + IS_SCHEMA_EVOLUTION_CASE_SENSITIVE( + "orc.schema.evolution.case.sensitive", + "orc.schema.evolution.case.sensitive", + true, + "A boolean flag to determine if the comparision of field names " + + "in schema evolution is case sensitive .\n"), + ALLOW_SARG_TO_FILTER( + "orc.sarg.to.filter", + "orc.sarg.to.filter", + false, + "A boolean flag to determine if a SArg is allowed to become a filter"), + READER_USE_SELECTED( + "orc.filter.use.selected", + "orc.filter.use.selected", + false, + "A boolean flag to determine if the selected vector is supported by\n" + + "the reading application. If false, the output of the ORC reader " + + "must have the filter\n" + + "reapplied to avoid using unset values in the unselected rows.\n" + + "If unsure please leave this as false."), + ALLOW_PLUGIN_FILTER( + "orc.filter.plugin", + "orc.filter.plugin", + false, + "Enables the use of plugin filters during read. The plugin filters " + + "are discovered against the service " + + "org.apache.orc.filter.PluginFilterService, if multiple filters are " + + "determined, they are combined using AND. The order of application is " + + "non-deterministic and the filter functionality should not depend on the " + + "order of application."), + WRITE_VARIABLE_LENGTH_BLOCKS( + "orc.write.variable.length.blocks", + null, + false, + "A boolean flag as to whether the ORC writer should write variable length\n" + + "HDFS blocks."), + DIRECT_ENCODING_COLUMNS( + "orc.column.encoding.direct", + "orc.column.encoding.direct", + "", + "Comma-separated list of columns for which dictionary encoding is to be skipped."), + // some JVM doesn't allow array creation of size Integer.MAX_VALUE, so chunk size is slightly + // less than max int + ORC_MAX_DISK_RANGE_CHUNK_LIMIT( + "orc.max.disk.range.chunk.limit", + "hive.exec.orc.max.disk.range.chunk.limit", + Integer.MAX_VALUE - 1024, + "When reading stripes >2GB, specify max limit for the chunk size."), + ORC_MIN_DISK_SEEK_SIZE( + "orc.min.disk.seek.size", + "orc.min.disk.seek.size", + 0, + "When determining contiguous reads, gaps within this size are " + + "read contiguously and not seeked. Default value of zero disables this " + + "optimization"), + ORC_MIN_DISK_SEEK_SIZE_TOLERANCE( + "orc.min.disk.seek.size.tolerance", + "orc.min.disk.seek.size.tolerance", + 0.00, + "Define the tolerance for for extra bytes read as a result of " + + "orc.min.disk.seek.size. If the " + + "(bytesRead - bytesNeeded) / bytesNeeded is greater than this " + + "threshold then extra work is performed to drop the extra bytes from " + + "memory after the read."), + ENCRYPTION("orc.encrypt", "orc.encrypt", null, "The list of keys and columns to encrypt with"), + DATA_MASK("orc.mask", "orc.mask", null, "The masks to apply to the encrypted columns"), + KEY_PROVIDER( + "orc.key.provider", + "orc.key.provider", + "hadoop", + "The kind of KeyProvider to use for encryption."), + PROLEPTIC_GREGORIAN( + "orc.proleptic.gregorian", + "orc.proleptic.gregorian", + false, + "Should we read and write dates & times using the proleptic Gregorian calendar\n" + + "instead of the hybrid Julian Gregorian? Hive before 3.1 and Spark before 3.0\n" + + "used hybrid."), + PROLEPTIC_GREGORIAN_DEFAULT( + "orc.proleptic.gregorian.default", + "orc.proleptic.gregorian.default", + false, + "This value controls whether pre-ORC 27 files are using the hybrid or proleptic\n" + + "calendar. Only Hive 3.1 and the C++ library wrote using the proleptic, so hybrid\n" + + "is the default."), + ROW_BATCH_SIZE( + "orc.row.batch.size", + "orc.row.batch.size", + 1024, + "The number of rows to include in a orc vectorized reader batch. " + + "The value should be carefully chosen to minimize overhead and avoid OOMs in reading data."), + ROW_BATCH_CHILD_LIMIT( + "orc.row.child.limit", + "orc.row.child.limit", + 1024 * 32, + "The maximum number of child elements to buffer before " + + "the ORC row writer writes the batch to the file."); + + private final String attribute; + private final String hiveConfName; + private final Object defaultValue; + private final String description; + + OrcConf(String attribute, String hiveConfName, Object defaultValue, String description) { + this.attribute = attribute; + this.hiveConfName = hiveConfName; + this.defaultValue = defaultValue; + this.description = description; + } + + public String getAttribute() { + return attribute; + } + + public String getHiveConfName() { + return hiveConfName; + } + + public Object getDefaultValue() { + return defaultValue; + } + + public String getDescription() { + return description; + } + + private String lookupValue(Properties tbl, Configuration conf) { + String result = null; + if (tbl != null) { + result = tbl.getProperty(attribute); + } + if (result == null && conf != null) { + result = conf.get(attribute); + if (result == null && hiveConfName != null) { + result = conf.get(hiveConfName); + } + } + return result; + } + + public int getInt(Properties tbl, Configuration conf) { + String value = lookupValue(tbl, conf); + if (value != null) { + return Integer.parseInt(value); + } + return ((Number) defaultValue).intValue(); + } + + public int getInt(Configuration conf) { + return getInt(null, conf); + } + + /** + * @deprecated Use {@link #getInt(Configuration)} instead. This method was incorrectly added and + * shouldn't be used anymore. + */ + @Deprecated + public void getInt(Configuration conf, int value) { + // noop + } + + public void setInt(Configuration conf, int value) { + conf.setInt(attribute, value); + } + + public long getLong(Properties tbl, Configuration conf) { + String value = lookupValue(tbl, conf); + if (value != null) { + return Long.parseLong(value); + } + return ((Number) defaultValue).longValue(); + } + + public long getLong(Configuration conf) { + return getLong(null, conf); + } + + public void setLong(Configuration conf, long value) { + conf.setLong(attribute, value); + } + + public String getString(Properties tbl, Configuration conf) { + String value = lookupValue(tbl, conf); + return value == null ? (String) defaultValue : value; + } + + public String getString(Configuration conf) { + return getString(null, conf); + } + + public List getStringAsList(Configuration conf) { + String value = getString(null, conf); + List confList = new ArrayList<>(); + if (StringUtils.isEmpty(value)) { + return confList; + } + for (String str : value.split(",")) { + String trimStr = StringUtils.trim(str); + if (StringUtils.isNotEmpty(trimStr)) { + confList.add(trimStr); + } + } + return confList; + } + + public void setString(Configuration conf, String value) { + conf.set(attribute, value); + } + + public boolean getBoolean(Properties tbl, Configuration conf) { + String value = lookupValue(tbl, conf); + if (value != null) { + return Boolean.parseBoolean(value); + } + return (Boolean) defaultValue; + } + + public boolean getBoolean(Configuration conf) { + return getBoolean(null, conf); + } + + public void setBoolean(Configuration conf, boolean value) { + conf.setBoolean(attribute, value); + } + + public double getDouble(Properties tbl, Configuration conf) { + String value = lookupValue(tbl, conf); + if (value != null) { + return Double.parseDouble(value); + } + return ((Number) defaultValue).doubleValue(); + } + + public double getDouble(Configuration conf) { + return getDouble(null, conf); + } + + public void setDouble(Configuration conf, double value) { + conf.setDouble(attribute, value); + } +} diff --git a/paimon-format/src/main/java/org/apache/orc/OrcFile.java b/paimon-format/src/main/java/org/apache/orc/OrcFile.java new file mode 100644 index 000000000000..a903ba9e7545 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/OrcFile.java @@ -0,0 +1,1336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.impl.HadoopShims; +import org.apache.orc.impl.HadoopShimsFactory; +import org.apache.orc.impl.KeyProvider; +import org.apache.orc.impl.MemoryManagerImpl; +import org.apache.orc.impl.OrcTail; +import org.apache.orc.impl.ReaderImpl; +import org.apache.orc.impl.WriterImpl; +import org.apache.orc.impl.WriterInternal; +import org.apache.orc.impl.writer.WriterImplV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** Contains factory methods to read or write ORC files. */ +public class OrcFile { + private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class); + public static final String MAGIC = "ORC"; + + /** + * Create a version number for the ORC file format, so that we can add non-forward compatible + * changes in the future. To make it easier for users to understand the version numbers, we use + * the Hive release number that first wrote that version of ORC files. + * + *

Thus, if you add new encodings or other non-forward compatible changes to ORC files, which + * prevent the old reader from reading the new format, you should change these variable to + * reflect the next Hive release number. Non-forward compatible changes should never be added in + * patch releases. + * + *

Do not make any changes that break backwards compatibility, which would prevent the new + * reader from reading ORC files generated by any released version of Hive. + */ + public enum Version { + V_0_11("0.11", 0, 11), + V_0_12("0.12", 0, 12), + + /** + * Do not use this format except for testing. It will not be compatible with other versions + * of the software. While we iterate on the ORC 2.0 format, we will make incompatible format + * changes under this version without providing any forward or backward compatibility. + * + *

When 2.0 is released, this version identifier will be completely removed. + */ + UNSTABLE_PRE_2_0("UNSTABLE-PRE-2.0", 1, 9999), + + /** The generic identifier for all unknown versions. */ + FUTURE("future", Integer.MAX_VALUE, Integer.MAX_VALUE); + + public static final Version CURRENT = V_0_12; + + private final String name; + private final int major; + private final int minor; + + Version(String name, int major, int minor) { + this.name = name; + this.major = major; + this.minor = minor; + } + + public static Version byName(String name) { + for (Version version : values()) { + if (version.name.equals(name)) { + return version; + } + } + throw new IllegalArgumentException("Unknown ORC version " + name); + } + + /** Get the human readable name for the version. */ + public String getName() { + return name; + } + + /** Get the major version number. */ + public int getMajor() { + return major; + } + + /** Get the minor version number. */ + public int getMinor() { + return minor; + } + } + + /** WriterImplementation Enum. */ + public enum WriterImplementation { + /** ORC_JAVA. */ + ORC_JAVA(0), // ORC Java writer + /** ORC_CPP. */ + ORC_CPP(1), // ORC C++ writer + /** PRESTO. */ + PRESTO(2), // Presto writer + /** SCRITCHLEY_GO. */ + SCRITCHLEY_GO(3), // Go writer from https://github.com/scritchley/orc + /** TRINO. */ + TRINO(4), // Trino writer + /** CUDF. */ + CUDF(5), // CUDF writer + /** UNKNOWN. */ + UNKNOWN(Integer.MAX_VALUE); + + private final int id; + + WriterImplementation(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public static WriterImplementation from(int id) { + WriterImplementation[] values = values(); + if (id >= 0 && id < values.length - 1) { + return values[id]; + } + return UNKNOWN; + } + } + + /** + * Records the version of the writer in terms of which bugs have been fixed. When you fix bugs + * in the writer (or make substantial changes) that don't change the file format, add a new + * version here instead of Version. + * + *

The ids are assigned sequentially from 6 per a WriterImplementation so that readers that + * predate ORC-202 treat the other writers correctly. + */ + public enum WriterVersion { + // Java ORC Writer. + ORIGINAL(WriterImplementation.ORC_JAVA, 0), + HIVE_8732(WriterImplementation.ORC_JAVA, 1), + /** fixed stripe/file maximum statistics and string statistics to use utf8 for min/max. */ + HIVE_4243(WriterImplementation.ORC_JAVA, 2), // use real column names from Hive tables + HIVE_12055(WriterImplementation.ORC_JAVA, 3), // vectorized writer + HIVE_13083(WriterImplementation.ORC_JAVA, 4), // decimals write present stream correctly + ORC_101(WriterImplementation.ORC_JAVA, 5), // bloom filters use utf8 + ORC_135(WriterImplementation.ORC_JAVA, 6), // timestamp stats use utc + ORC_517(WriterImplementation.ORC_JAVA, 7), // decimal64 min/max are fixed + ORC_203(WriterImplementation.ORC_JAVA, 8), // trim long strings & record they were trimmed + ORC_14(WriterImplementation.ORC_JAVA, 9), // column encryption added + + // C++ ORC Writer + ORC_CPP_ORIGINAL(WriterImplementation.ORC_CPP, 6), + + // Presto Writer + PRESTO_ORIGINAL(WriterImplementation.PRESTO, 6), + + // Scritchley Go Writer + SCRITCHLEY_GO_ORIGINAL(WriterImplementation.SCRITCHLEY_GO, 6), + + // Trino Writer + TRINO_ORIGINAL(WriterImplementation.TRINO, 6), + + // CUDF Writer + CUDF_ORIGINAL(WriterImplementation.CUDF, 6), + + // Don't use any magic numbers here except for the below: + FUTURE(WriterImplementation.UNKNOWN, Integer.MAX_VALUE); // a version from a future writer + + private final int id; + private final WriterImplementation writer; + + public WriterImplementation getWriterImplementation() { + return writer; + } + + public int getId() { + return id; + } + + WriterVersion(WriterImplementation writer, int id) { + this.writer = writer; + this.id = id; + } + + private static final WriterVersion[][] values = + new WriterVersion[WriterImplementation.values().length][]; + + static { + for (WriterVersion v : WriterVersion.values()) { + WriterImplementation writer = v.writer; + if (writer != WriterImplementation.UNKNOWN) { + if (values[writer.id] == null) { + values[writer.id] = new WriterVersion[WriterVersion.values().length]; + } + if (values[writer.id][v.id] != null) { + throw new IllegalArgumentException("Duplicate WriterVersion id " + v); + } + values[writer.id][v.id] = v; + } + } + } + + /** + * Convert the integer from OrcProto.PostScript.writerVersion to the enumeration with + * unknown versions being mapped to FUTURE. + * + * @param writer the writer implementation + * @param val the serialized writer version + * @return the corresponding enumeration value + */ + public static WriterVersion from(WriterImplementation writer, int val) { + if (writer == WriterImplementation.UNKNOWN) { + return FUTURE; + } + if (writer != WriterImplementation.ORC_JAVA && val < 6) { + throw new IllegalArgumentException( + "ORC File with illegal version " + val + " for writer " + writer); + } + WriterVersion[] versions = values[writer.id]; + if (val < 0 || versions.length <= val) { + return FUTURE; + } + WriterVersion result = versions[val]; + return result == null ? FUTURE : result; + } + + /** + * Does this file include the given fix or come from a different writer? + * + * @param fix the required fix + * @return true if the required fix is present + */ + public boolean includes(WriterVersion fix) { + return writer != fix.writer || id >= fix.id; + } + } + + /** The WriterVersion for this version of the software. */ + public static final WriterVersion CURRENT_WRITER = WriterVersion.ORC_14; + + /** EncodingStrategy Enum. */ + public enum EncodingStrategy { + /** SPEED. */ + SPEED, + /** COMPRESSION. */ + COMPRESSION + } + + /** CompressionStrategy Enum. */ + public enum CompressionStrategy { + /** SPEED. */ + SPEED, + /** COMPRESSION. */ + COMPRESSION + } + + // unused + protected OrcFile() {} + + /** Orc ReaderOptions. */ + public static class ReaderOptions { + private final Configuration conf; + private FileSystem filesystem; + private long maxLength = Long.MAX_VALUE; + private OrcTail orcTail; + private KeyProvider keyProvider; + // TODO: We can generalize FileMetadata interface. Make OrcTail implement FileMetadata + // interface + // and remove this class altogether. Both footer caching and llap caching just needs + // OrcTail. + // For now keeping this around to avoid complex surgery + private FileMetadata fileMetadata; + private boolean useUTCTimestamp; + private boolean useProlepticGregorian; + + public ReaderOptions(Configuration conf) { + this.conf = conf; + this.useProlepticGregorian = OrcConf.PROLEPTIC_GREGORIAN.getBoolean(conf); + } + + public ReaderOptions filesystem(FileSystem fs) { + this.filesystem = fs; + return this; + } + + public ReaderOptions maxLength(long val) { + maxLength = val; + return this; + } + + public ReaderOptions orcTail(OrcTail tail) { + this.orcTail = tail; + return this; + } + + /** + * Set the KeyProvider to override the default for getting keys. + * + * @param provider + * @return + */ + public ReaderOptions setKeyProvider(KeyProvider provider) { + this.keyProvider = provider; + return this; + } + + /** + * Should the reader convert dates and times to the proleptic Gregorian calendar? + * + * @param newValue should it use the proleptic Gregorian calendar? + * @return this + */ + public ReaderOptions convertToProlepticGregorian(boolean newValue) { + this.useProlepticGregorian = newValue; + return this; + } + + public Configuration getConfiguration() { + return conf; + } + + public FileSystem getFilesystem() { + return filesystem; + } + + public long getMaxLength() { + return maxLength; + } + + public OrcTail getOrcTail() { + return orcTail; + } + + public KeyProvider getKeyProvider() { + return keyProvider; + } + + /** @deprecated Use {@link #orcTail(OrcTail)} instead. */ + public ReaderOptions fileMetadata(final FileMetadata metadata) { + fileMetadata = metadata; + return this; + } + + public FileMetadata getFileMetadata() { + return fileMetadata; + } + + public ReaderOptions useUTCTimestamp(boolean value) { + useUTCTimestamp = value; + return this; + } + + public boolean getUseUTCTimestamp() { + return useUTCTimestamp; + } + + public boolean getConvertToProlepticGregorian() { + return useProlepticGregorian; + } + } + + public static ReaderOptions readerOptions(Configuration conf) { + return new ReaderOptions(conf); + } + + public static Reader createReader(Path path, ReaderOptions options) throws IOException { + return new ReaderImpl(path, options); + } + + /** WriterContext. */ + public interface WriterContext { + Writer getWriter(); + } + + /** WriterCallback. */ + public interface WriterCallback { + void preStripeWrite(WriterContext context) throws IOException; + + void preFooterWrite(WriterContext context) throws IOException; + } + + /** BloomFilterVersion. */ + public enum BloomFilterVersion { + // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support + // both old and new readers. + ORIGINAL("original"), + // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8. + // See ORC-101 + UTF8("utf8"); + + private final String id; + + BloomFilterVersion(String id) { + this.id = id; + } + + @Override + public String toString() { + return id; + } + + public static BloomFilterVersion fromString(String s) { + for (BloomFilterVersion version : values()) { + if (version.id.equals(s)) { + return version; + } + } + throw new IllegalArgumentException("Unknown BloomFilterVersion " + s); + } + } + + /** ZstdCompressOptions. */ + public static class ZstdCompressOptions { + private int compressionZstdLevel; + private int compressionZstdWindowLog; + + public int getCompressionZstdLevel() { + return compressionZstdLevel; + } + + public void setCompressionZstdLevel(int compressionZstdLevel) { + this.compressionZstdLevel = compressionZstdLevel; + } + + public int getCompressionZstdWindowLog() { + return compressionZstdWindowLog; + } + + public void setCompressionZstdWindowLog(int compressionZstdWindowLog) { + this.compressionZstdWindowLog = compressionZstdWindowLog; + } + } + + /** Options for creating ORC file writers. */ + public static class WriterOptions implements Cloneable { + private final Configuration configuration; + private FileSystem fileSystemValue = null; + private TypeDescription schema = null; + private long stripeSizeValue; + private long stripeRowCountValue; + private long blockSizeValue; + private boolean buildIndex; + private int rowIndexStrideValue; + private int bufferSizeValue; + private boolean enforceBufferSize = false; + private boolean blockPaddingValue; + private CompressionKind compressValue; + private MemoryManager memoryManagerValue; + private Version versionValue; + private WriterCallback callback; + private EncodingStrategy encodingStrategy; + private CompressionStrategy compressionStrategy; + private ZstdCompressOptions zstdCompressOptions; + private double paddingTolerance; + private String bloomFilterColumns; + private double bloomFilterFpp; + private BloomFilterVersion bloomFilterVersion; + private PhysicalWriter physicalWriter; + private WriterVersion writerVersion = CURRENT_WRITER; + private boolean useUTCTimestamp; + private boolean overwrite; + private boolean writeVariableLengthBlocks; + private HadoopShims shims; + private String directEncodingColumns; + private String encryption; + private String masks; + private KeyProvider provider; + private boolean useProlepticGregorian; + private Map keyOverrides = new HashMap<>(); + + protected WriterOptions(Properties tableProperties, Configuration conf) { + configuration = conf; + memoryManagerValue = getStaticMemoryManager(conf); + overwrite = OrcConf.OVERWRITE_OUTPUT_FILE.getBoolean(tableProperties, conf); + stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf); + stripeRowCountValue = OrcConf.STRIPE_ROW_COUNT.getLong(tableProperties, conf); + blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf); + buildIndex = OrcConf.ENABLE_INDEXES.getBoolean(tableProperties, conf); + rowIndexStrideValue = (int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf); + bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(tableProperties, conf); + blockPaddingValue = OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf); + compressValue = + CompressionKind.valueOf( + OrcConf.COMPRESS.getString(tableProperties, conf).toUpperCase()); + enforceBufferSize = + OrcConf.ENFORCE_COMPRESSION_BUFFER_SIZE.getBoolean(tableProperties, conf); + String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties, conf); + versionValue = Version.byName(versionName); + String enString = OrcConf.ENCODING_STRATEGY.getString(tableProperties, conf); + encodingStrategy = EncodingStrategy.valueOf(enString); + + String compString = OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf); + compressionStrategy = CompressionStrategy.valueOf(compString); + + zstdCompressOptions = new ZstdCompressOptions(); + zstdCompressOptions.setCompressionZstdLevel( + OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf)); + zstdCompressOptions.setCompressionZstdWindowLog( + OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf)); + + paddingTolerance = OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf); + + bloomFilterColumns = OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties, conf); + bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties, conf); + bloomFilterVersion = + BloomFilterVersion.fromString( + OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties, conf)); + shims = HadoopShimsFactory.get(); + writeVariableLengthBlocks = + OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties, conf); + directEncodingColumns = + OrcConf.DIRECT_ENCODING_COLUMNS.getString(tableProperties, conf); + useProlepticGregorian = OrcConf.PROLEPTIC_GREGORIAN.getBoolean(conf); + } + + /** @return a SHALLOW clone */ + @Override + public WriterOptions clone() { + try { + return (WriterOptions) super.clone(); + } catch (CloneNotSupportedException ex) { + throw new AssertionError("Expected super.clone() to work"); + } + } + + /** + * Provide the filesystem for the path, if the client has it available. If it is not + * provided, it will be found from the path. + */ + public WriterOptions fileSystem(FileSystem value) { + fileSystemValue = value; + return this; + } + + /** + * If the output file already exists, should it be overwritten? If it is not provided, write + * operation will fail if the file already exists. + */ + public WriterOptions overwrite(boolean value) { + overwrite = value; + return this; + } + + /** + * Set the stripe size for the file. The writer stores the contents of the stripe in memory + * until this memory limit is reached and the stripe is flushed to the HDFS file and the + * next stripe started. + */ + public WriterOptions stripeSize(long value) { + stripeSizeValue = value; + return this; + } + + /** + * Set the file system block size for the file. For optimal performance, set the block size + * to be multiple factors of stripe size. + */ + public WriterOptions blockSize(long value) { + blockSizeValue = value; + return this; + } + + /** + * Set the distance between entries in the row index. The minimum value is 1000 to prevent + * the index from overwhelming the data. If the stride is set to 0, no indexes will be + * included in the file. + */ + public WriterOptions rowIndexStride(int value) { + rowIndexStrideValue = value; + return this; + } + + /** + * The size of the memory buffers used for compressing and storing the stripe in memory. + * NOTE: ORC writer may choose to use smaller buffer size based on stripe size and number of + * columns for efficient stripe writing and memory utilization. To enforce writer to use the + * requested buffer size use enforceBufferSize(). + */ + public WriterOptions bufferSize(int value) { + bufferSizeValue = value; + return this; + } + + /** + * Enforce writer to use requested buffer size instead of estimating buffer size based on + * stripe size and number of columns. See bufferSize() method for more info. Default: false + */ + public WriterOptions enforceBufferSize() { + enforceBufferSize = true; + return this; + } + + /** + * Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. + * Padding improves locality and thus the speed of reading, but costs space. + */ + public WriterOptions blockPadding(boolean value) { + blockPaddingValue = value; + return this; + } + + /** Sets the encoding strategy that is used to encode the data. */ + public WriterOptions encodingStrategy(EncodingStrategy strategy) { + encodingStrategy = strategy; + return this; + } + + /** Sets the tolerance for block padding as a percentage of stripe size. */ + public WriterOptions paddingTolerance(double value) { + paddingTolerance = value; + return this; + } + + /** Comma separated values of column names for which bloom filter is to be created. */ + public WriterOptions bloomFilterColumns(String columns) { + bloomFilterColumns = columns; + return this; + } + + /** + * Specify the false positive probability for bloom filter. + * + * @param fpp - false positive probability + * @return this + */ + public WriterOptions bloomFilterFpp(double fpp) { + bloomFilterFpp = fpp; + return this; + } + + /** Sets the generic compression that is used to compress the data. */ + public WriterOptions compress(CompressionKind value) { + compressValue = value; + return this; + } + + /** + * Set the schema for the file. This is a required parameter. + * + * @param schema the schema for the file. + * @return this + */ + public WriterOptions setSchema(TypeDescription schema) { + this.schema = schema; + return this; + } + + /** Sets the version of the file that will be written. */ + public WriterOptions version(Version value) { + versionValue = value; + return this; + } + + /** + * Add a listener for when the stripe and file are about to be closed. + * + * @param callback the object to be called when the stripe is closed + * @return this + */ + public WriterOptions callback(WriterCallback callback) { + this.callback = callback; + return this; + } + + /** Set the version of the bloom filters to write. */ + public WriterOptions bloomFilterVersion(BloomFilterVersion version) { + this.bloomFilterVersion = version; + return this; + } + + /** + * Change the physical writer of the ORC file. + * + *

SHOULD ONLY BE USED BY LLAP. + * + * @param writer the writer to control the layout and persistence + * @return this + */ + public WriterOptions physicalWriter(PhysicalWriter writer) { + this.physicalWriter = writer; + return this; + } + + /** A public option to set the memory manager. */ + public WriterOptions memory(MemoryManager value) { + memoryManagerValue = value; + return this; + } + + /** + * Should the ORC file writer use HDFS variable length blocks, if they are available? + * + * @param value the new value + * @return this + */ + public WriterOptions writeVariableLengthBlocks(boolean value) { + writeVariableLengthBlocks = value; + return this; + } + + /** + * Set the HadoopShims to use. This is only for testing. + * + * @param value the new value + * @return this + */ + public WriterOptions setShims(HadoopShims value) { + this.shims = value; + return this; + } + + /** + * Manually set the writer version. This is an internal API. + * + * @param version the version to write + * @return this + */ + protected WriterOptions writerVersion(WriterVersion version) { + if (version == WriterVersion.FUTURE) { + throw new IllegalArgumentException("Can't write a future version."); + } + this.writerVersion = version; + return this; + } + + /** + * Manually set the time zone for the writer to utc. If not defined, system time zone is + * assumed. + */ + public WriterOptions useUTCTimestamp(boolean value) { + useUTCTimestamp = value; + return this; + } + + /** + * Set the comma-separated list of columns that should be direct encoded. + * + * @param value the value to set + * @return this + */ + public WriterOptions directEncodingColumns(String value) { + directEncodingColumns = value; + return this; + } + + /** + * Encrypt a set of columns with a key. + * + *

Format of the string is a key-list. + * + *

    + *
  • key-list = key (';' key-list)? + *
  • key = key-name ':' field-list + *
  • field-list = field-name ( ',' field-list )? + *
  • field-name = number | field-part ('.' field-name)? + *
  • field-part = quoted string | simple name + *
+ * + * @param value a key-list of which columns to encrypt + * @return this + */ + public WriterOptions encrypt(String value) { + encryption = value; + return this; + } + + /** + * Set the masks for the unencrypted data. + * + *

Format of the string is a mask-list. + * + *

    + *
  • mask-list = mask (';' mask-list)? + *
  • mask = mask-name (',' parameter)* ':' field-list + *
  • field-list = field-name ( ',' field-list )? + *
  • field-name = number | field-part ('.' field-name)? + *
  • field-part = quoted string | simple name + *
+ * + * @param value a list of the masks and column names + * @return this + */ + public WriterOptions masks(String value) { + masks = value; + return this; + } + + /** + * For users that need to override the current version of a key, this method allows them to + * define the version and algorithm for a given key. + * + *

This will mostly be used for ORC file merging where the writer has to use the same + * version of the key that the original files used. + * + * @param keyName the key name + * @param version the version of the key to use + * @param algorithm the algorithm for the given key version + * @return this + */ + public WriterOptions setKeyVersion( + String keyName, int version, EncryptionAlgorithm algorithm) { + HadoopShims.KeyMetadata meta = new HadoopShims.KeyMetadata(keyName, version, algorithm); + keyOverrides.put(keyName, meta); + return this; + } + + /** + * Set the key provider for column encryption. + * + * @param provider the object that holds the master secrets + * @return this + */ + public WriterOptions setKeyProvider(KeyProvider provider) { + this.provider = provider; + return this; + } + + /** + * Should the writer use the proleptic Gregorian calendar for times and dates. + * + * @param newValue true if we should use the proleptic calendar + * @return this + */ + public WriterOptions setProlepticGregorian(boolean newValue) { + this.useProlepticGregorian = newValue; + return this; + } + + public KeyProvider getKeyProvider() { + return provider; + } + + public boolean getBlockPadding() { + return blockPaddingValue; + } + + public long getBlockSize() { + return blockSizeValue; + } + + public String getBloomFilterColumns() { + return bloomFilterColumns; + } + + public boolean getOverwrite() { + return overwrite; + } + + public FileSystem getFileSystem() { + return fileSystemValue; + } + + public Configuration getConfiguration() { + return configuration; + } + + public TypeDescription getSchema() { + return schema; + } + + public long getStripeSize() { + return stripeSizeValue; + } + + public long getStripeRowCountValue() { + return stripeRowCountValue; + } + + public CompressionKind getCompress() { + return compressValue; + } + + public WriterCallback getCallback() { + return callback; + } + + public Version getVersion() { + return versionValue; + } + + public MemoryManager getMemoryManager() { + return memoryManagerValue; + } + + public int getBufferSize() { + return bufferSizeValue; + } + + public boolean isEnforceBufferSize() { + return enforceBufferSize; + } + + public int getRowIndexStride() { + return rowIndexStrideValue; + } + + public boolean isBuildIndex() { + return buildIndex; + } + + public CompressionStrategy getCompressionStrategy() { + return compressionStrategy; + } + + public EncodingStrategy getEncodingStrategy() { + return encodingStrategy; + } + + public ZstdCompressOptions getZstdCompressOptions() { + return zstdCompressOptions; + } + + public double getPaddingTolerance() { + return paddingTolerance; + } + + public double getBloomFilterFpp() { + return bloomFilterFpp; + } + + public BloomFilterVersion getBloomFilterVersion() { + return bloomFilterVersion; + } + + public PhysicalWriter getPhysicalWriter() { + return physicalWriter; + } + + public WriterVersion getWriterVersion() { + return writerVersion; + } + + public boolean getWriteVariableLengthBlocks() { + return writeVariableLengthBlocks; + } + + public HadoopShims getHadoopShims() { + return shims; + } + + public boolean getUseUTCTimestamp() { + return useUTCTimestamp; + } + + public String getDirectEncodingColumns() { + return directEncodingColumns; + } + + public String getEncryption() { + return encryption; + } + + public String getMasks() { + return masks; + } + + public Map getKeyOverrides() { + return keyOverrides; + } + + public boolean getProlepticGregorian() { + return useProlepticGregorian; + } + } + + /** + * Create a set of writer options based on a configuration. + * + * @param conf the configuration to use for values + * @return A WriterOptions object that can be modified + */ + public static WriterOptions writerOptions(Configuration conf) { + return new WriterOptions(null, conf); + } + + /** + * Create a set of write options based on a set of table properties and configuration. + * + * @param tableProperties the properties of the table + * @param conf the configuration of the query + * @return a WriterOptions object that can be modified + */ + public static WriterOptions writerOptions(Properties tableProperties, Configuration conf) { + return new WriterOptions(tableProperties, conf); + } + + private static MemoryManager memoryManager = null; + + private static synchronized MemoryManager getStaticMemoryManager(Configuration conf) { + if (memoryManager == null) { + memoryManager = new MemoryManagerImpl(conf); + } + return memoryManager; + } + + /** + * Create an ORC file writer. This is the public interface for creating writers going forward + * and new options will only be added to this method. + * + * @param path filename to write to + * @param opts the options + * @return a new ORC file writer + * @throws IOException + */ + public static Writer createWriter(Path path, WriterOptions opts) throws IOException { + FileSystem fs = + opts.getFileSystem() == null + ? path.getFileSystem(opts.getConfiguration()) + : opts.getFileSystem(); + switch (opts.getVersion()) { + case V_0_11: + case V_0_12: + return new WriterImpl(fs, path, opts); + case UNSTABLE_PRE_2_0: + return new WriterImplV2(fs, path, opts); + default: + throw new IllegalArgumentException("Unknown version " + opts.getVersion()); + } + } + + /** + * Do we understand the version in the reader? + * + * @param path the path of the file + * @param reader the ORC file reader + * @return is the version understood by this writer? + */ + static boolean understandFormat(Path path, Reader reader) { + if (reader.getFileVersion() == Version.FUTURE) { + LOG.info("Can't merge {} because it has a future version.", path); + return false; + } + if (reader.getWriterVersion() == WriterVersion.FUTURE) { + LOG.info("Can't merge {} because it has a future writerVersion.", path); + return false; + } + return true; + } + + private static boolean sameKeys(EncryptionKey[] first, EncryptionKey[] next) { + if (first.length != next.length) { + return false; + } + for (int k = 0; k < first.length; ++k) { + if (!first[k].getKeyName().equals(next[k].getKeyName()) + || first[k].getKeyVersion() != next[k].getKeyVersion() + || first[k].getAlgorithm() != next[k].getAlgorithm()) { + return false; + } + } + return true; + } + + private static boolean sameMasks(DataMaskDescription[] first, DataMaskDescription[] next) { + if (first.length != next.length) { + return false; + } + for (int k = 0; k < first.length; ++k) { + if (!first[k].getName().equals(next[k].getName())) { + return false; + } + String[] firstParam = first[k].getParameters(); + String[] nextParam = next[k].getParameters(); + if (firstParam.length != nextParam.length) { + return false; + } + for (int p = 0; p < firstParam.length; ++p) { + if (!firstParam[p].equals(nextParam[p])) { + return false; + } + } + TypeDescription[] firstRoots = first[k].getColumns(); + TypeDescription[] nextRoots = next[k].getColumns(); + if (firstRoots.length != nextRoots.length) { + return false; + } + for (int r = 0; r < firstRoots.length; ++r) { + if (firstRoots[r].getId() != nextRoots[r].getId()) { + return false; + } + } + } + return true; + } + + private static boolean sameVariants(EncryptionVariant[] first, EncryptionVariant[] next) { + if (first.length != next.length) { + return false; + } + for (int k = 0; k < first.length; ++k) { + if ((first[k].getKeyDescription() == null) != (next[k].getKeyDescription() == null) + || !first[k].getKeyDescription() + .getKeyName() + .equals(next[k].getKeyDescription().getKeyName()) + || first[k].getRoot().getId() != next[k].getRoot().getId()) { + return false; + } + } + return true; + } + + /** + * Is the new reader compatible with the file that is being written? + * + * @param firstReader the first reader that others must match + * @param userMetadata the user metadata + * @param path the new path name for warning messages + * @param reader the new reader + * @return is the reader compatible with the previous ones? + */ + static boolean readerIsCompatible( + Reader firstReader, Map userMetadata, Path path, Reader reader) { + // now we have to check compatibility + TypeDescription schema = firstReader.getSchema(); + if (!reader.getSchema().equals(schema)) { + LOG.info( + "Can't merge {} because of different schemas {} vs {}", + path, + reader.getSchema(), + schema); + return false; + } + CompressionKind compression = firstReader.getCompressionKind(); + if (reader.getCompressionKind() != compression) { + LOG.info( + "Can't merge {} because of different compression {} vs {}", + path, + reader.getCompressionKind(), + compression); + return false; + } + Version fileVersion = firstReader.getFileVersion(); + if (reader.getFileVersion() != fileVersion) { + LOG.info( + "Can't merge {} because of different file versions {} vs {}", + path, + reader.getFileVersion(), + fileVersion); + return false; + } + WriterVersion writerVersion = firstReader.getWriterVersion(); + if (reader.getWriterVersion() != writerVersion) { + LOG.info( + "Can't merge {} because of different writer versions {} vs {}", + path, + reader.getFileVersion(), + fileVersion); + return false; + } + int rowIndexStride = firstReader.getRowIndexStride(); + if (reader.getRowIndexStride() != rowIndexStride) { + LOG.info( + "Can't merge {} because of different row index strides {} vs {}", + path, + reader.getRowIndexStride(), + rowIndexStride); + return false; + } + for (String key : reader.getMetadataKeys()) { + ByteBuffer currentValue = userMetadata.get(key); + if (currentValue != null) { + ByteBuffer newValue = reader.getMetadataValue(key); + if (!newValue.equals(currentValue)) { + LOG.info("Can't merge {} because of different user metadata {}", path, key); + return false; + } + } + } + if (!sameKeys(firstReader.getColumnEncryptionKeys(), reader.getColumnEncryptionKeys())) { + LOG.info("Can't merge {} because it has different encryption keys", path); + return false; + } + if (!sameMasks(firstReader.getDataMasks(), reader.getDataMasks())) { + LOG.info("Can't merge {} because it has different encryption masks", path); + return false; + } + if (!sameVariants(firstReader.getEncryptionVariants(), reader.getEncryptionVariants())) { + LOG.info("Can't merge {} because it has different encryption variants", path); + return false; + } + if (firstReader.writerUsedProlepticGregorian() != reader.writerUsedProlepticGregorian()) { + LOG.info("Can't merge {} because it uses a different calendar", path); + return false; + } + return true; + } + + static void mergeMetadata(Map metadata, Reader reader) { + for (String key : reader.getMetadataKeys()) { + metadata.put(key, reader.getMetadataValue(key)); + } + } + + /** + * Merges multiple ORC files that all have the same schema to produce a single ORC file. The + * merge will reject files that aren't compatible with the merged file so the output list may be + * shorter than the input list. The stripes are copied as serialized byte buffers. The user + * metadata are merged and files that disagree on the value associated with a key will be + * rejected. + * + * @param outputPath the output file + * @param options the options for writing with although the options related to the input files' + * encodings are overridden + * @param inputFiles the list of files to merge + * @return the list of files that were successfully merged + * @throws IOException + */ + public static List mergeFiles( + Path outputPath, WriterOptions options, List inputFiles) throws IOException { + Writer output = null; + final Configuration conf = options.getConfiguration(); + KeyProvider keyProvider = options.getKeyProvider(); + try { + byte[] buffer = new byte[0]; + Reader firstFile = null; + List result = new ArrayList<>(inputFiles.size()); + Map userMetadata = new HashMap<>(); + int bufferSize = 0; + + for (Path input : inputFiles) { + FileSystem fs = input.getFileSystem(conf); + Reader reader = + createReader( + input, + readerOptions(options.getConfiguration()) + .filesystem(fs) + .setKeyProvider(keyProvider)); + + if (!understandFormat(input, reader)) { + continue; + } else if (firstFile == null) { + // if this is the first file that we are including, grab the values + firstFile = reader; + bufferSize = reader.getCompressionSize(); + CompressionKind compression = reader.getCompressionKind(); + options.bufferSize(bufferSize) + .version(reader.getFileVersion()) + .writerVersion(reader.getWriterVersion()) + .compress(compression) + .rowIndexStride(reader.getRowIndexStride()) + .setSchema(reader.getSchema()); + if (compression != CompressionKind.NONE) { + options.enforceBufferSize().bufferSize(bufferSize); + } + mergeMetadata(userMetadata, reader); + // ensure that the merged file uses the same key versions + for (EncryptionKey key : reader.getColumnEncryptionKeys()) { + options.setKeyVersion( + key.getKeyName(), key.getKeyVersion(), key.getAlgorithm()); + } + output = createWriter(outputPath, options); + } else if (!readerIsCompatible(firstFile, userMetadata, input, reader)) { + continue; + } else { + mergeMetadata(userMetadata, reader); + if (bufferSize < reader.getCompressionSize()) { + bufferSize = reader.getCompressionSize(); + ((WriterInternal) output).increaseCompressionSize(bufferSize); + } + } + EncryptionVariant[] variants = reader.getEncryptionVariants(); + List[] completeList = new List[variants.length + 1]; + for (int v = 0; v < variants.length; ++v) { + completeList[v] = reader.getVariantStripeStatistics(variants[v]); + } + completeList[completeList.length - 1] = reader.getVariantStripeStatistics(null); + StripeStatistics[] stripeStats = new StripeStatistics[completeList.length]; + try (FSDataInputStream inputStream = ((ReaderImpl) reader).takeFile()) { + result.add(input); + + for (StripeInformation stripe : reader.getStripes()) { + int length = (int) stripe.getLength(); + if (buffer.length < length) { + buffer = new byte[length]; + } + long offset = stripe.getOffset(); + inputStream.readFully(offset, buffer, 0, length); + int stripeId = (int) stripe.getStripeId(); + for (int v = 0; v < completeList.length; ++v) { + stripeStats[v] = completeList[v].get(stripeId); + } + output.appendStripe(buffer, 0, length, stripe, stripeStats); + } + } + } + if (output != null) { + for (Map.Entry entry : userMetadata.entrySet()) { + output.addUserMetadata(entry.getKey(), entry.getValue()); + } + output.close(); + } + return result; + } catch (Throwable t) { + if (output != null) { + try { + output.close(); + } catch (Throwable ignore) { + // PASS + } + try { + FileSystem fs = + options.getFileSystem() == null + ? outputPath.getFileSystem(conf) + : options.getFileSystem(); + fs.delete(outputPath, false); + } catch (Throwable ignore) { + // PASS + } + } + throw new IOException("Problem merging files into " + outputPath, t); + } + } +} diff --git a/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java b/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java new file mode 100644 index 000000000000..f887e860a830 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; +import org.apache.orc.EncryptionVariant; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto; +import org.apache.orc.PhysicalWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.writer.StreamOptions; +import org.apache.orc.impl.writer.WriterEncryptionKey; +import org.apache.orc.impl.writer.WriterEncryptionVariant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** A orc PhysicalFsWriter. */ +public class PhysicalFsWriter implements PhysicalWriter { + private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); + + private static final int HDFS_BUFFER_SIZE = 256 * 1024; + + private FSDataOutputStream rawWriter; + private final DirectStream rawStream; + + // the compressed metadata information outStream + private OutStream compressStream; + // a protobuf outStream around streamFactory + private CodedOutputStream codedCompressStream; + + private Path path; + private final HadoopShims shims; + private final long blockSize; + private final int maxPadding; + private final StreamOptions compress; + private final OrcFile.CompressionStrategy compressionStrategy; + private final boolean addBlockPadding; + private final boolean writeVariableLengthBlocks; + private final VariantTracker unencrypted; + + private long headerLength; + private long stripeStart; + // The position of the last time we wrote a short block, which becomes the + // natural blocks + private long blockOffset; + private int metadataLength; + private int stripeStatisticsLength = 0; + private int footerLength; + private int stripeNumber = 0; + + private final Map variants = new TreeMap<>(); + + public PhysicalFsWriter(FileSystem fs, Path path, OrcFile.WriterOptions opts) + throws IOException { + this(fs, path, opts, new WriterEncryptionVariant[0]); + } + + public PhysicalFsWriter( + FileSystem fs, + Path path, + OrcFile.WriterOptions opts, + WriterEncryptionVariant[] encryption) + throws IOException { + this( + fs.create( + path, + opts.getOverwrite(), + HDFS_BUFFER_SIZE, + fs.getDefaultReplication(path), + opts.getBlockSize()), + opts, + encryption); + this.path = path; + LOG.info( + "ORC writer created for path: {} with stripeSize: {} blockSize: {}" + + " compression: {}", + path, + opts.getStripeSize(), + blockSize, + compress); + } + + public PhysicalFsWriter( + FSDataOutputStream outputStream, + OrcFile.WriterOptions opts, + WriterEncryptionVariant[] encryption) + throws IOException { + this.rawWriter = outputStream; + long defaultStripeSize = opts.getStripeSize(); + this.addBlockPadding = opts.getBlockPadding(); + if (opts.isEnforceBufferSize()) { + this.compress = new StreamOptions(opts.getBufferSize()); + } else { + this.compress = + new StreamOptions( + WriterImpl.getEstimatedBufferSize( + defaultStripeSize, + opts.getSchema().getMaximumId() + 1, + opts.getBufferSize())); + } + CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress()); + if (codec != null) { + CompressionCodec.Options tempOptions = codec.getDefaultOptions(); + if (codec instanceof ZstdCodec + && codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions) { + ZstdCodec.ZstdOptions options = (ZstdCodec.ZstdOptions) codec.getDefaultOptions(); + OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions(); + if (zstdCompressOptions != null) { + options.setLevel(zstdCompressOptions.getCompressionZstdLevel()); + options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog()); + } + } + compress.withCodec(codec, tempOptions); + } + this.compressionStrategy = opts.getCompressionStrategy(); + this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); + this.blockSize = opts.getBlockSize(); + blockOffset = 0; + unencrypted = new VariantTracker(opts.getSchema(), compress); + writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); + shims = opts.getHadoopShims(); + rawStream = new DirectStream(rawWriter); + compressStream = new OutStream("stripe footer", compress, rawStream); + codedCompressStream = CodedOutputStream.newInstance(compressStream); + for (WriterEncryptionVariant variant : encryption) { + WriterEncryptionKey key = variant.getKeyDescription(); + StreamOptions encryptOptions = + new StreamOptions(unencrypted.options) + .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); + variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); + } + } + + /** + * Record the information about each column encryption variant. The unencrypted data and each + * encrypted column root are variants. + */ + protected static class VariantTracker { + // the streams that make up the current stripe + protected final Map streams = new TreeMap<>(); + private final int rootColumn; + private final int lastColumn; + protected final StreamOptions options; + // a list for each column covered by this variant + // the elements in the list correspond to each stripe in the file + protected final List[] stripeStats; + protected final List stripeStatsStreams = new ArrayList<>(); + protected final OrcProto.ColumnStatistics[] fileStats; + + VariantTracker(TypeDescription schema, StreamOptions options) { + rootColumn = schema.getId(); + lastColumn = schema.getMaximumId(); + this.options = options; + stripeStats = new List[schema.getMaximumId() - schema.getId() + 1]; + for (int i = 0; i < stripeStats.length; ++i) { + stripeStats[i] = new ArrayList<>(); + } + fileStats = new OrcProto.ColumnStatistics[stripeStats.length]; + } + + public BufferedStream createStream(StreamName name) { + BufferedStream result = new BufferedStream(); + streams.put(name, result); + return result; + } + + /** + * Place the streams in the appropriate area while updating the sizes with the number of + * bytes in the area. + * + * @param area the area to write + * @param sizes the sizes of the areas + * @return the list of stream descriptions to add + */ + public List placeStreams(StreamName.Area area, SizeCounters sizes) { + List result = new ArrayList<>(streams.size()); + for (Map.Entry stream : streams.entrySet()) { + StreamName name = stream.getKey(); + BufferedStream bytes = stream.getValue(); + if (name.getArea() == area && !bytes.isSuppressed) { + OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder(); + long size = bytes.getOutputSize(); + if (area == StreamName.Area.INDEX) { + sizes.index += size; + } else { + sizes.data += size; + } + builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size); + result.add(builder.build()); + } + } + return result; + } + + /** + * Write the streams in the appropriate area. + * + * @param area the area to write + * @param raw the raw stream to write to + */ + public void writeStreams(StreamName.Area area, FSDataOutputStream raw) throws IOException { + for (Map.Entry stream : streams.entrySet()) { + if (stream.getKey().getArea() == area) { + stream.getValue().spillToDiskAndClear(raw); + } + } + } + + /** + * Computed the size of the given column on disk for this stripe. It excludes the index + * streams. + * + * @param column a column id + * @return the total number of bytes + */ + public long getFileBytes(int column) { + long result = 0; + if (column >= rootColumn && column <= lastColumn) { + for (Map.Entry entry : streams.entrySet()) { + StreamName name = entry.getKey(); + if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { + result += entry.getValue().getOutputSize(); + } + } + } + return result; + } + } + + VariantTracker getVariant(EncryptionVariant column) { + if (column == null) { + return unencrypted; + } + return variants.get(column); + } + + /** + * Get the number of bytes for a file in a given column by finding all the streams (not + * suppressed) for a given column and returning the sum of their sizes. excludes index + * + * @param column column from which to get file size + * @return number of bytes for the given column + */ + @Override + public long getFileBytes(int column, WriterEncryptionVariant variant) { + return getVariant(variant).getFileBytes(column); + } + + @Override + public StreamOptions getStreamOptions() { + return unencrypted.options; + } + + private static final byte[] ZEROS = new byte[64 * 1024]; + + private static void writeZeros(OutputStream output, long remaining) throws IOException { + while (remaining > 0) { + long size = Math.min(ZEROS.length, remaining); + output.write(ZEROS, 0, (int) size); + remaining -= size; + } + } + + /** + * Do any required shortening of the HDFS block or padding to avoid stradling HDFS blocks. This + * is called before writing the current stripe. + * + * @param stripeSize the number of bytes in the current stripe + */ + private void padStripe(long stripeSize) throws IOException { + this.stripeStart = rawWriter.getPos(); + long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; + // We only have options if this isn't the first stripe in the block + if (previousBytesInBlock > 0) { + if (previousBytesInBlock + stripeSize >= blockSize) { + // Try making a short block + if (writeVariableLengthBlocks && shims.endVariableLengthBlock(rawWriter)) { + blockOffset = stripeStart; + } else if (addBlockPadding) { + // if we cross the block boundary, figure out what we should do + long padding = blockSize - previousBytesInBlock; + if (padding <= maxPadding) { + writeZeros(rawWriter, padding); + stripeStart += padding; + } + } + } + } + } + + /** An output receiver that writes the ByteBuffers to the output stream as they are received. */ + private static class DirectStream implements OutputReceiver { + private final FSDataOutputStream output; + + DirectStream(FSDataOutputStream output) { + this.output = output; + } + + @Override + public void output(ByteBuffer buffer) throws IOException { + output.write( + buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + + @Override + public void suppress() { + throw new UnsupportedOperationException("Can't suppress direct stream"); + } + } + + private void writeStripeFooter( + OrcProto.StripeFooter footer, + SizeCounters sizes, + OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + footer.writeTo(codedCompressStream); + codedCompressStream.flush(); + compressStream.flush(); + dirEntry.setOffset(stripeStart); + dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total()); + } + + /** + * Write the saved encrypted stripe statistic in a variant out to the file. The streams that are + * written are added to the tracker.stripeStatsStreams. + * + * @param output the file we are writing to + * @param stripeNumber the number of stripes in the file + * @param tracker the variant to write out + */ + static void writeEncryptedStripeStatistics( + DirectStream output, int stripeNumber, VariantTracker tracker) throws IOException { + StreamOptions options = new StreamOptions(tracker.options); + tracker.stripeStatsStreams.clear(); + for (int col = tracker.rootColumn; + col < tracker.rootColumn + tracker.stripeStats.length; + ++col) { + options.modifyIv( + CryptoUtils.modifyIvForStream( + col, OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1)); + OutStream stream = new OutStream("stripe stats for " + col, options, output); + OrcProto.ColumnarStripeStatistics stats = + OrcProto.ColumnarStripeStatistics.newBuilder() + .addAllColStats(tracker.stripeStats[col - tracker.rootColumn]) + .build(); + long start = output.output.getPos(); + stats.writeTo(stream); + stream.flush(); + OrcProto.Stream description = + OrcProto.Stream.newBuilder() + .setColumn(col) + .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS) + .setLength(output.output.getPos() - start) + .build(); + tracker.stripeStatsStreams.add(description); + } + } + + /** + * Merge the saved unencrypted stripe statistics into the Metadata section of the footer. + * + * @param builder the Metadata section of the file + * @param stripeCount the number of stripes in the file + * @param stats the stripe statistics + */ + static void setUnencryptedStripeStatistics( + OrcProto.Metadata.Builder builder, + int stripeCount, + List[] stats) { + // Make the unencrypted stripe stats into lists of StripeStatistics. + builder.clearStripeStats(); + for (int s = 0; s < stripeCount; ++s) { + OrcProto.StripeStatistics.Builder stripeStats = OrcProto.StripeStatistics.newBuilder(); + for (List col : stats) { + stripeStats.addColStats(col.get(s)); + } + builder.addStripeStats(stripeStats.build()); + } + } + + static void setEncryptionStatistics( + OrcProto.Encryption.Builder encryption, + int stripeNumber, + Collection variants) + throws IOException { + int v = 0; + for (VariantTracker variant : variants) { + OrcProto.EncryptionVariant.Builder variantBuilder = encryption.getVariantsBuilder(v++); + + // Add the stripe statistics streams to the variant description. + variantBuilder.clearStripeStatistics(); + variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams); + + // Serialize and encrypt the file statistics. + OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder(); + for (OrcProto.ColumnStatistics col : variant.fileStats) { + file.addColumn(col); + } + StreamOptions options = new StreamOptions(variant.options); + options.modifyIv( + CryptoUtils.modifyIvForStream( + variant.rootColumn, + OrcProto.Stream.Kind.FILE_STATISTICS, + stripeNumber + 1)); + BufferedStream buffer = new BufferedStream(); + OutStream stream = new OutStream("stats for " + variant, options, buffer); + file.build().writeTo(stream); + stream.flush(); + variantBuilder.setFileStatistics(buffer.getBytes()); + } + } + + @Override + public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { + long stripeStatisticsStart = rawWriter.getPos(); + for (VariantTracker variant : variants.values()) { + writeEncryptedStripeStatistics(rawStream, stripeNumber, variant); + } + setUnencryptedStripeStatistics(builder, stripeNumber, unencrypted.stripeStats); + long metadataStart = rawWriter.getPos(); + builder.build().writeTo(codedCompressStream); + codedCompressStream.flush(); + compressStream.flush(); + this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart); + this.metadataLength = (int) (rawWriter.getPos() - metadataStart); + } + + static void addUnencryptedStatistics( + OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] stats) { + for (OrcProto.ColumnStatistics stat : stats) { + builder.addStatistics(stat); + } + } + + @Override + public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { + if (variants.size() > 0) { + OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder(); + setEncryptionStatistics(encryption, stripeNumber, variants.values()); + } + addUnencryptedStatistics(builder, unencrypted.fileStats); + long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength; + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + long startPosn = rawWriter.getPos(); + OrcProto.Footer footer = builder.build(); + footer.writeTo(codedCompressStream); + codedCompressStream.flush(); + compressStream.flush(); + this.footerLength = (int) (rawWriter.getPos() - startPosn); + } + + @Override + public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { + builder.setFooterLength(footerLength); + builder.setMetadataLength(metadataLength); + if (variants.size() > 0) { + builder.setStripeStatisticsLength(stripeStatisticsLength); + } + OrcProto.PostScript ps = builder.build(); + // need to write this uncompressed + long startPosn = rawWriter.getPos(); + ps.writeTo(rawWriter); + long length = rawWriter.getPos() - startPosn; + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); + } + rawWriter.writeByte((int) length); + return rawWriter.getPos(); + } + + @Override + public void close() throws IOException { + // We don't use the codec directly but do give it out codec in getCompressionCodec; + // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that + // would get rid of this pattern require cross-project interface changes, so just return the + // codec for now. + CompressionCodec codec = compress.getCodec(); + if (codec != null) { + OrcCodecPool.returnCodec(codec.getKind(), codec); + } + compress.withCodec(null, null); + rawWriter.close(); + rawWriter = null; + } + + @Override + public void flush() throws IOException { + rawWriter.hflush(); + } + + @Override + public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + long start = rawWriter.getPos(); + int length = buffer.remaining(); + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (length < blockSize && length > availBlockSpace && addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info("Padding ORC by {} bytes while merging", availBlockSpace); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); + dirEntry.setOffset(start); + stripeNumber += 1; + } + + /** + * This class is used to hold the contents of streams as they are buffered. The TreeWriters + * write to the outStream and the codec compresses the data as buffers fill up and stores them + * in the output list. When the stripe is being written, the whole stream is written to the + * file. + */ + static final class BufferedStream implements OutputReceiver { + private boolean isSuppressed = false; + private final List output = new ArrayList<>(); + + @Override + public void output(ByteBuffer buffer) { + if (!isSuppressed) { + output.add(buffer); + } + } + + @Override + public void suppress() { + isSuppressed = true; + output.clear(); + } + + /** + * Write any saved buffers to the OutputStream if needed, and clears all the buffers. + * + * @return true if the stream was written + */ + boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException { + if (!isSuppressed) { + for (ByteBuffer buffer : output) { + raw.write( + buffer.array(), + buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + output.clear(); + return true; + } + isSuppressed = false; + return false; + } + + /** + * Get the buffer as a protobuf ByteString and clears the BufferedStream. + * + * @return the bytes + */ + ByteString getBytes() { + int len = output.size(); + if (len == 0) { + return ByteString.EMPTY; + } else { + ByteString result = ByteString.copyFrom(output.get(0)); + for (int i = 1; i < output.size(); ++i) { + result = result.concat(ByteString.copyFrom(output.get(i))); + } + output.clear(); + return result; + } + } + + /** + * Get the stream as a ByteBuffer and clear it. + * + * @return a single ByteBuffer with the contents of the stream + */ + ByteBuffer getByteBuffer() { + ByteBuffer result; + if (output.size() == 1) { + result = output.get(0); + } else { + result = ByteBuffer.allocate((int) getOutputSize()); + for (ByteBuffer buffer : output) { + result.put(buffer); + } + output.clear(); + result.flip(); + } + return result; + } + + /** + * Get the number of bytes that will be written to the output. + * + *

Assumes the stream writing into this receiver has already been flushed. + * + * @return number of bytes + */ + public long getOutputSize() { + long result = 0; + for (ByteBuffer buffer : output) { + result += buffer.remaining(); + } + return result; + } + } + + static class SizeCounters { + long index = 0; + long data = 0; + + long total() { + return index + data; + } + } + + void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, SizeCounters sizes) + throws IOException { + footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, sizes)); + final long unencryptedIndexSize = sizes.index; + int v = 0; + for (VariantTracker variant : variants.values()) { + OrcProto.StripeEncryptionVariant.Builder builder = + footerBuilder.getEncryptionBuilder(v++); + builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, sizes)); + } + if (sizes.index != unencryptedIndexSize) { + // add a placeholder that covers the hole where the encrypted indexes are + footerBuilder.addStreams( + OrcProto.Stream.newBuilder() + .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX) + .setLength(sizes.index - unencryptedIndexSize)); + } + footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, sizes)); + final long unencryptedDataSize = sizes.data; + v = 0; + for (VariantTracker variant : variants.values()) { + OrcProto.StripeEncryptionVariant.Builder builder = + footerBuilder.getEncryptionBuilder(v++); + builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, sizes)); + } + if (sizes.data != unencryptedDataSize) { + // add a placeholder that covers the hole where the encrypted indexes are + footerBuilder.addStreams( + OrcProto.Stream.newBuilder() + .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA) + .setLength(sizes.data - unencryptedDataSize)); + } + } + + @Override + public void finalizeStripe( + OrcProto.StripeFooter.Builder footerBuilder, + OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + SizeCounters sizes = new SizeCounters(); + buildStreamList(footerBuilder, sizes); + + OrcProto.StripeFooter footer = footerBuilder.build(); + + // Do we need to pad the file so the stripe doesn't straddle a block boundary? + padStripe(sizes.total() + footer.getSerializedSize()); + + // write the unencrypted index streams + unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter); + // write the encrypted index streams + for (VariantTracker variant : variants.values()) { + variant.writeStreams(StreamName.Area.INDEX, rawWriter); + } + + // write the unencrypted data streams + unencrypted.writeStreams(StreamName.Area.DATA, rawWriter); + // write out the unencrypted data streams + for (VariantTracker variant : variants.values()) { + variant.writeStreams(StreamName.Area.DATA, rawWriter); + } + + // Write out the footer. + writeStripeFooter(footer, sizes, dirEntry); + + // fill in the data sizes + dirEntry.setDataLength(sizes.data); + dirEntry.setIndexLength(sizes.index); + + stripeNumber += 1; + } + + @Override + public void writeHeader() throws IOException { + rawWriter.writeBytes(OrcFile.MAGIC); + headerLength = rawWriter.getPos(); + } + + @Override + public BufferedStream createDataStream(StreamName name) { + VariantTracker variant = getVariant(name.getEncryption()); + BufferedStream result = variant.streams.get(name); + if (result == null) { + result = new BufferedStream(); + variant.streams.put(name, result); + } + return result; + } + + private StreamOptions getOptions(OrcProto.Stream.Kind kind) { + return SerializationUtils.getCustomizedCodec(compress, compressionStrategy, kind); + } + + protected OutputStream createIndexStream(StreamName name) { + BufferedStream buffer = createDataStream(name); + VariantTracker tracker = getVariant(name.getEncryption()); + StreamOptions options = + SerializationUtils.getCustomizedCodec( + tracker.options, compressionStrategy, name.getKind()); + if (options.isEncrypted()) { + if (options == tracker.options) { + options = new StreamOptions(options); + } + options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1)); + } + return new OutStream(name.toString(), options, buffer); + } + + @Override + public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) throws IOException { + OutputStream stream = createIndexStream(name); + index.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom) + throws IOException { + OutputStream stream = createIndexStream(name); + bloom.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder statistics) { + VariantTracker tracker = getVariant(name.getEncryption()); + if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) { + tracker.fileStats[name.getColumn() - tracker.rootColumn] = statistics.build(); + } else { + tracker.stripeStats[name.getColumn() - tracker.rootColumn].add(statistics.build()); + } + } + + @Override + public String toString() { + if (path != null) { + return path.toString(); + } else { + return ByteString.EMPTY.toString(); + } + } +} diff --git a/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java new file mode 100644 index 000000000000..bbbc8f03172d --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java @@ -0,0 +1,1063 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import com.github.luben.zstd.util.Native; +import com.google.protobuf.ByteString; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.compress.lzo.LzoCompressor; +import io.airlift.compress.lzo.LzoDecompressor; +import io.airlift.compress.zstd.ZstdCompressor; +import io.airlift.compress.zstd.ZstdDecompressor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; +import org.apache.orc.DataMask; +import org.apache.orc.MemoryManager; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.PhysicalWriter; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.writer.StreamOptions; +import org.apache.orc.impl.writer.TreeWriter; +import org.apache.orc.impl.writer.WriterContext; +import org.apache.orc.impl.writer.WriterEncryptionKey; +import org.apache.orc.impl.writer.WriterEncryptionVariant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TimeZone; +import java.util.TreeMap; + +/** + * An ORC file writer. The file is divided into stripes, which is the natural unit of work when + * reading. Each stripe is buffered in memory until the memory reaches the stripe size and then it + * is written out broken down by columns. Each column is written by a TreeWriter that is specific to + * that type of column. TreeWriters may have children TreeWriters that handle the sub-types. Each of + * the TreeWriters writes the column's data as a set of streams. + * + *

This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and + * all access to a single instance has to be from a single thread. + * + *

There are no known cases where these happen between different threads today. + * + *

Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a + * single thread as well. + */ +public class WriterImpl implements WriterInternal, MemoryManager.Callback { + + private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class); + + private static final int MIN_ROW_INDEX_STRIDE = 1000; + + private final Path path; + private final long stripeSize; + private final long stripeRowCount; + private final int rowIndexStride; + private final TypeDescription schema; + private final PhysicalWriter physicalWriter; + private final OrcFile.WriterVersion writerVersion; + private final StreamOptions unencryptedOptions; + + private long rowCount = 0; + private long rowsInStripe = 0; + private long rawDataSize = 0; + private int rowsInIndex = 0; + private long lastFlushOffset = 0; + private int stripesAtLastFlush = -1; + private final List stripes = new ArrayList<>(); + private final Map userMetadata = new TreeMap<>(); + private final TreeWriter treeWriter; + private final boolean buildIndex; + private final MemoryManager memoryManager; + private long previousAllocation = -1; + private long memoryLimit; + private final long rowsPerCheck; + private long rowsSinceCheck = 0; + private final OrcFile.Version version; + private final Configuration conf; + private final OrcFile.WriterCallback callback; + private final OrcFile.WriterContext callbackContext; + private final OrcFile.EncodingStrategy encodingStrategy; + private final OrcFile.CompressionStrategy compressionStrategy; + private final boolean[] bloomFilterColumns; + private final double bloomFilterFpp; + private final OrcFile.BloomFilterVersion bloomFilterVersion; + private final boolean writeTimeZone; + private final boolean useUTCTimeZone; + private final double dictionaryKeySizeThreshold; + private final boolean[] directEncodingColumns; + private final List unencryptedEncodings = new ArrayList<>(); + + // the list of maskDescriptions, keys, and variants + private SortedMap maskDescriptions = new TreeMap<>(); + private SortedMap keys = new TreeMap<>(); + private final WriterEncryptionVariant[] encryption; + // the mapping of columns to maskDescriptions + private final MaskDescriptionImpl[] columnMaskDescriptions; + // the mapping of columns to EncryptionVariants + private final WriterEncryptionVariant[] columnEncryption; + private KeyProvider keyProvider; + // do we need to include the current encryption keys in the next stripe + // information + private boolean needKeyFlush; + private final boolean useProlepticGregorian; + private boolean isClose = false; + + public WriterImpl(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException { + this.path = path; + this.conf = opts.getConfiguration(); + // clone it so that we can annotate it with encryption + this.schema = opts.getSchema().clone(); + int numColumns = schema.getMaximumId() + 1; + if (!opts.isEnforceBufferSize()) { + opts.bufferSize( + getEstimatedBufferSize(opts.getStripeSize(), numColumns, opts.getBufferSize())); + } + + // Annotate the schema with the column encryption + schema.annotateEncryption(opts.getEncryption(), opts.getMasks()); + columnEncryption = new WriterEncryptionVariant[numColumns]; + columnMaskDescriptions = new MaskDescriptionImpl[numColumns]; + encryption = setupEncryption(opts.getKeyProvider(), schema, opts.getKeyOverrides()); + needKeyFlush = encryption.length > 0; + + this.directEncodingColumns = + OrcUtils.includeColumns(opts.getDirectEncodingColumns(), opts.getSchema()); + dictionaryKeySizeThreshold = OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + + this.callback = opts.getCallback(); + if (callback != null) { + callbackContext = () -> WriterImpl.this; + } else { + callbackContext = null; + } + + this.useProlepticGregorian = opts.getProlepticGregorian(); + this.writeTimeZone = hasTimestamp(schema); + this.useUTCTimeZone = opts.getUseUTCTimestamp(); + + this.encodingStrategy = opts.getEncodingStrategy(); + this.compressionStrategy = opts.getCompressionStrategy(); + + if (opts.getRowIndexStride() >= 0) { + this.rowIndexStride = opts.getRowIndexStride(); + } else { + this.rowIndexStride = 0; + } + + // ORC-1343: We ignore `opts.isBuildIndex` due to the lack of reader support + this.buildIndex = rowIndexStride > 0; + if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { + throw new IllegalArgumentException( + "Row stride must be at least " + MIN_ROW_INDEX_STRIDE); + } + + this.writerVersion = opts.getWriterVersion(); + this.version = opts.getVersion(); + if (version == OrcFile.Version.FUTURE) { + throw new IllegalArgumentException("Can not write in a unknown version."); + } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) { + LOG.warn( + "ORC files written in " + + version.getName() + + " will not be" + + " readable by other versions of the software. It is only for" + + " developer testing."); + } + + this.bloomFilterVersion = opts.getBloomFilterVersion(); + this.bloomFilterFpp = opts.getBloomFilterFpp(); + /* do not write bloom filters for ORC v11 */ + if (!buildIndex || version == OrcFile.Version.V_0_11) { + this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; + } else { + this.bloomFilterColumns = OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema); + } + + // ensure that we are able to handle callbacks before we register ourselves + rowsPerCheck = + Math.min(opts.getStripeRowCountValue(), OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf)); + this.stripeRowCount = opts.getStripeRowCountValue(); + this.stripeSize = opts.getStripeSize(); + memoryLimit = stripeSize; + memoryManager = opts.getMemoryManager(); + memoryManager.addWriter(path, stripeSize, this); + + // Set up the physical writer + this.physicalWriter = + opts.getPhysicalWriter() == null + ? new PhysicalFsWriter(fs, path, opts, encryption) + : opts.getPhysicalWriter(); + physicalWriter.writeHeader(); + unencryptedOptions = physicalWriter.getStreamOptions(); + OutStream.assertBufferSizeValid(unencryptedOptions.getBufferSize()); + + treeWriter = TreeWriter.Factory.create(schema, null, new StreamFactory()); + + LOG.info( + "ORC writer created for path: {} with stripeSize: {} options: {}", + path, + stripeSize, + unencryptedOptions); + } + + // @VisibleForTesting + public static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) { + // The worst case is that there are 2 big streams per a column and + // we want to guarantee that each stream gets ~10 buffers. + // This keeps buffers small enough that we don't get really small stripe + // sizes. + int estBufferSize = (int) (stripeSize / (20L * numColumns)); + estBufferSize = getClosestBufferSize(estBufferSize); + return Math.min(estBufferSize, bs); + } + + @Override + public void increaseCompressionSize(int newSize) { + if (newSize > unencryptedOptions.getBufferSize()) { + unencryptedOptions.bufferSize(newSize); + } + } + + /** + * Given a buffer size, return the nearest superior power of 2. Min value is 4Kib, Max value is + * 256Kib. + * + * @param size Proposed buffer size + * @return the suggested buffer size + */ + private static int getClosestBufferSize(int size) { + final int kb4 = 4 * 1024; + final int kb256 = 256 * 1024; + final int pow2 = size == 1 ? 1 : Integer.highestOneBit(size - 1) * 2; + return Math.min(kb256, Math.max(kb4, pow2)); + } + + static { + try { + if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) { + Native.load(); + } + } catch (UnsatisfiedLinkError | ExceptionInInitializerError e) { + LOG.warn( + "Unable to load zstd-jni library for your platform. " + + "Using builtin-java classes where applicable"); + } + } + + public static CompressionCodec createCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return null; + case ZLIB: + return new ZlibCodec(); + case SNAPPY: + return new SnappyCodec(); + case LZO: + return new AircompressorCodec(kind, new LzoCompressor(), new LzoDecompressor()); + case LZ4: + return new AircompressorCodec(kind, new Lz4Compressor(), new Lz4Decompressor()); + case ZSTD: + if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) { + return new AircompressorCodec( + kind, new ZstdCompressor(), new ZstdDecompressor()); + } + if (Native.isLoaded()) { + return new ZstdCodec(); + } else { + return new AircompressorCodec( + kind, new ZstdCompressor(), new ZstdDecompressor()); + } + default: + throw new IllegalArgumentException("Unknown compression codec: " + kind); + } + } + + @Override + public boolean checkMemory(double newScale) throws IOException { + memoryLimit = Math.round(stripeSize * newScale); + return checkMemory(); + } + + private boolean checkMemory() throws IOException { + if (rowsSinceCheck >= rowsPerCheck) { + rowsSinceCheck = 0; + long size = treeWriter.estimateMemory(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "ORC writer " + + physicalWriter + + " size = " + + size + + " memoryLimit = " + + memoryLimit + + " rowsInStripe = " + + rowsInStripe + + " stripeRowCountLimit = " + + stripeRowCount); + } + if (size > memoryLimit || rowsInStripe >= stripeRowCount) { + flushStripe(); + return true; + } + } + return false; + } + + /** + * Interface from the Writer to the TreeWriters. This limits the visibility that the TreeWriters + * have into the Writer. + */ + private class StreamFactory implements WriterContext { + + /** + * Create a stream to store part of a column. + * + * @param name the name for the stream + * @return The output outStream that the section needs to be written to. + */ + @Override + public OutStream createStream(StreamName name) throws IOException { + StreamOptions options = + SerializationUtils.getCustomizedCodec( + unencryptedOptions, compressionStrategy, name.getKind()); + WriterEncryptionVariant encryption = (WriterEncryptionVariant) name.getEncryption(); + if (encryption != null) { + if (options == unencryptedOptions) { + options = new StreamOptions(options); + } + options.withEncryption( + encryption.getKeyDescription().getAlgorithm(), + encryption.getFileFooterKey()) + .modifyIv(CryptoUtils.modifyIvForStream(name, 1)); + } + return new OutStream(name, options, physicalWriter.createDataStream(name)); + } + + /** Get the stride rate of the row index. */ + @Override + public int getRowIndexStride() { + return rowIndexStride; + } + + /** + * Should be building the row index. + * + * @return true if we are building the index + */ + @Override + public boolean buildIndex() { + return buildIndex; + } + + /** + * Is the ORC file compressed? + * + * @return are the streams compressed + */ + @Override + public boolean isCompressed() { + return unencryptedOptions.getCodec() != null; + } + + /** + * Get the encoding strategy to use. + * + * @return encoding strategy + */ + @Override + public OrcFile.EncodingStrategy getEncodingStrategy() { + return encodingStrategy; + } + + /** + * Get the bloom filter columns. + * + * @return bloom filter columns + */ + @Override + public boolean[] getBloomFilterColumns() { + return bloomFilterColumns; + } + + /** + * Get bloom filter false positive percentage. + * + * @return fpp + */ + @Override + public double getBloomFilterFPP() { + return bloomFilterFpp; + } + + /** + * Get the writer's configuration. + * + * @return configuration + */ + @Override + public Configuration getConfiguration() { + return conf; + } + + /** Get the version of the file to write. */ + @Override + public OrcFile.Version getVersion() { + return version; + } + + /** + * Get the PhysicalWriter. + * + * @return the file's physical writer. + */ + @Override + public PhysicalWriter getPhysicalWriter() { + return physicalWriter; + } + + @Override + public OrcFile.BloomFilterVersion getBloomFilterVersion() { + return bloomFilterVersion; + } + + @Override + public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) + throws IOException { + physicalWriter.writeIndex(name, index); + } + + @Override + public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom) + throws IOException { + physicalWriter.writeBloomFilter(name, bloom); + } + + @Override + public WriterEncryptionVariant getEncryption(int columnId) { + return columnId < columnEncryption.length ? columnEncryption[columnId] : null; + } + + @Override + public DataMask getUnencryptedMask(int columnId) { + if (columnMaskDescriptions != null) { + MaskDescriptionImpl descr = columnMaskDescriptions[columnId]; + if (descr != null) { + return DataMask.Factory.build( + descr, + schema.findSubtype(columnId), + (type) -> columnMaskDescriptions[type.getId()]); + } + } + return null; + } + + @Override + public void setEncoding( + int column, WriterEncryptionVariant encryption, OrcProto.ColumnEncoding encoding) { + if (encryption == null) { + unencryptedEncodings.add(encoding); + } else { + encryption.addEncoding(encoding); + } + } + + @Override + public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder stats) + throws IOException { + physicalWriter.writeStatistics(name, stats); + } + + @Override + public boolean getUseUTCTimestamp() { + return useUTCTimeZone; + } + + @Override + public double getDictionaryKeySizeThreshold(int columnId) { + return directEncodingColumns[columnId] ? 0.0 : dictionaryKeySizeThreshold; + } + + @Override + public boolean getProlepticGregorian() { + return useProlepticGregorian; + } + } + + private static void writeTypes(OrcProto.Footer.Builder builder, TypeDescription schema) { + builder.addAllTypes(OrcUtils.getOrcTypes(schema)); + } + + private void createRowIndexEntry() throws IOException { + treeWriter.createRowIndexEntry(); + rowsInIndex = 0; + } + + /** + * Write the encrypted keys into the StripeInformation along with the stripe id, so that the + * readers can decrypt the data. + * + * @param dirEntry the entry to modify + */ + private void addEncryptedKeys(OrcProto.StripeInformation.Builder dirEntry) { + for (WriterEncryptionVariant variant : encryption) { + dirEntry.addEncryptedLocalKeys( + ByteString.copyFrom(variant.getMaterial().getEncryptedKey())); + } + dirEntry.setEncryptStripeId(1 + stripes.size()); + } + + private void flushStripe() throws IOException { + if (buildIndex && rowsInIndex != 0) { + createRowIndexEntry(); + } + if (rowsInStripe != 0) { + if (callback != null) { + callback.preStripeWrite(callbackContext); + } + // finalize the data for the stripe + int requiredIndexEntries = + rowIndexStride == 0 + ? 0 + : (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); + OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); + if (writeTimeZone) { + if (useUTCTimeZone) { + builder.setWriterTimezone("UTC"); + } else { + builder.setWriterTimezone(TimeZone.getDefault().getID()); + } + } + treeWriter.flushStreams(); + treeWriter.writeStripe(requiredIndexEntries); + // update the encodings + builder.addAllColumns(unencryptedEncodings); + unencryptedEncodings.clear(); + for (WriterEncryptionVariant writerEncryptionVariant : encryption) { + OrcProto.StripeEncryptionVariant.Builder encrypt = + OrcProto.StripeEncryptionVariant.newBuilder(); + encrypt.addAllEncoding(writerEncryptionVariant.getEncodings()); + writerEncryptionVariant.clearEncodings(); + builder.addEncryption(encrypt); + } + OrcProto.StripeInformation.Builder dirEntry = + OrcProto.StripeInformation.newBuilder().setNumberOfRows(rowsInStripe); + if (encryption.length > 0 && needKeyFlush) { + addEncryptedKeys(dirEntry); + needKeyFlush = false; + } + physicalWriter.finalizeStripe(builder, dirEntry); + + stripes.add(dirEntry.build()); + rowCount += rowsInStripe; + rowsInStripe = 0; + } + } + + private long computeRawDataSize() { + return treeWriter.getRawDataSize(); + } + + private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { + switch (kind) { + case NONE: + return OrcProto.CompressionKind.NONE; + case ZLIB: + return OrcProto.CompressionKind.ZLIB; + case SNAPPY: + return OrcProto.CompressionKind.SNAPPY; + case LZO: + return OrcProto.CompressionKind.LZO; + case LZ4: + return OrcProto.CompressionKind.LZ4; + case ZSTD: + return OrcProto.CompressionKind.ZSTD; + default: + throw new IllegalArgumentException("Unknown compression " + kind); + } + } + + private void writeMetadata() throws IOException { + // The physical writer now has the stripe statistics, so we pass a + // new builder in here. + physicalWriter.writeFileMetadata(OrcProto.Metadata.newBuilder()); + } + + private long writePostScript() throws IOException { + OrcProto.PostScript.Builder builder = + OrcProto.PostScript.newBuilder() + .setMagic(OrcFile.MAGIC) + .addVersion(version.getMajor()) + .addVersion(version.getMinor()) + .setWriterVersion(writerVersion.getId()); + CompressionCodec codec = unencryptedOptions.getCodec(); + if (codec == null) { + builder.setCompression(OrcProto.CompressionKind.NONE); + } else { + builder.setCompression(writeCompressionKind(codec.getKind())) + .setCompressionBlockSize(unencryptedOptions.getBufferSize()); + } + return physicalWriter.writePostScript(builder); + } + + private OrcProto.EncryptionKey.Builder writeEncryptionKey(WriterEncryptionKey key) { + OrcProto.EncryptionKey.Builder result = OrcProto.EncryptionKey.newBuilder(); + HadoopShims.KeyMetadata meta = key.getMetadata(); + result.setKeyName(meta.getKeyName()); + result.setKeyVersion(meta.getVersion()); + result.setAlgorithm( + OrcProto.EncryptionAlgorithm.valueOf(meta.getAlgorithm().getSerialization())); + return result; + } + + private OrcProto.EncryptionVariant.Builder writeEncryptionVariant( + WriterEncryptionVariant variant) { + OrcProto.EncryptionVariant.Builder result = OrcProto.EncryptionVariant.newBuilder(); + result.setRoot(variant.getRoot().getId()); + result.setKey(variant.getKeyDescription().getId()); + result.setEncryptedKey(ByteString.copyFrom(variant.getMaterial().getEncryptedKey())); + return result; + } + + private OrcProto.Encryption.Builder writeEncryptionFooter() { + OrcProto.Encryption.Builder encrypt = OrcProto.Encryption.newBuilder(); + for (MaskDescriptionImpl mask : maskDescriptions.values()) { + OrcProto.DataMask.Builder maskBuilder = OrcProto.DataMask.newBuilder(); + maskBuilder.setName(mask.getName()); + for (String param : mask.getParameters()) { + maskBuilder.addMaskParameters(param); + } + for (TypeDescription column : mask.getColumns()) { + maskBuilder.addColumns(column.getId()); + } + encrypt.addMask(maskBuilder); + } + for (WriterEncryptionKey key : keys.values()) { + encrypt.addKey(writeEncryptionKey(key)); + } + for (WriterEncryptionVariant variant : encryption) { + encrypt.addVariants(writeEncryptionVariant(variant)); + } + encrypt.setKeyProvider(OrcProto.KeyProviderKind.valueOf(keyProvider.getKind().getValue())); + return encrypt; + } + + private long writeFooter() throws IOException { + writeMetadata(); + OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); + builder.setNumberOfRows(rowCount); + builder.setRowIndexStride(rowIndexStride); + rawDataSize = computeRawDataSize(); + // serialize the types + writeTypes(builder, schema); + builder.setCalendar( + useProlepticGregorian + ? OrcProto.CalendarKind.PROLEPTIC_GREGORIAN + : OrcProto.CalendarKind.JULIAN_GREGORIAN); + // add the stripe information + for (OrcProto.StripeInformation stripe : stripes) { + builder.addStripes(stripe); + } + // add the column statistics + treeWriter.writeFileStatistics(); + // add all of the user metadata + for (Map.Entry entry : userMetadata.entrySet()) { + builder.addMetadata( + OrcProto.UserMetadataItem.newBuilder() + .setName(entry.getKey()) + .setValue(entry.getValue())); + } + if (encryption.length > 0) { + builder.setEncryption(writeEncryptionFooter()); + } + builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId()); + builder.setSoftwareVersion(OrcUtils.getOrcVersion()); + physicalWriter.writeFileFooter(builder); + return writePostScript(); + } + + @Override + public TypeDescription getSchema() { + return schema; + } + + @Override + public void addUserMetadata(String name, ByteBuffer value) { + userMetadata.put(name, ByteString.copyFrom(value)); + } + + @Override + public void addRowBatch(VectorizedRowBatch batch) throws IOException { + try { + // If this is the first set of rows in this stripe, tell the tree writers + // to prepare the stripe. + if (batch.size != 0 && rowsInStripe == 0) { + treeWriter.prepareStripe(stripes.size() + 1); + } + if (buildIndex) { + // Batch the writes up to the rowIndexStride so that we can get the + // right size indexes. + int posn = 0; + while (posn < batch.size) { + int chunkSize = Math.min(batch.size - posn, rowIndexStride - rowsInIndex); + if (batch.isSelectedInUse()) { + // find the longest chunk that is continuously selected from posn + for (int len = 1; len < chunkSize; ++len) { + if (batch.selected[posn + len] - batch.selected[posn] != len) { + chunkSize = len; + break; + } + } + treeWriter.writeRootBatch(batch, batch.selected[posn], chunkSize); + } else { + treeWriter.writeRootBatch(batch, posn, chunkSize); + } + posn += chunkSize; + rowsInIndex += chunkSize; + rowsInStripe += chunkSize; + if (rowsInIndex >= rowIndexStride) { + createRowIndexEntry(); + } + } + } else { + if (batch.isSelectedInUse()) { + int posn = 0; + while (posn < batch.size) { + int chunkSize = 1; + while (posn + chunkSize < batch.size) { + // find the longest chunk that is continuously selected from posn + if (batch.selected[posn + chunkSize] - batch.selected[posn] + != chunkSize) { + break; + } + ++chunkSize; + } + treeWriter.writeRootBatch(batch, batch.selected[posn], chunkSize); + posn += chunkSize; + } + } else { + treeWriter.writeRootBatch(batch, 0, batch.size); + } + rowsInStripe += batch.size; + } + rowsSinceCheck += batch.size; + previousAllocation = memoryManager.checkMemory(previousAllocation, this); + checkMemory(); + } catch (Throwable t) { + try { + close(); + } catch (Throwable ignore) { + // ignore + } + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Problem adding row to " + path, t); + } + } + } + + @Override + public void close() throws IOException { + if (!isClose) { + try { + if (callback != null) { + callback.preFooterWrite(callbackContext); + } + // remove us from the memory manager so that we don't get any callbacks + memoryManager.removeWriter(path); + // actually close the file + flushStripe(); + lastFlushOffset = writeFooter(); + physicalWriter.close(); + } finally { + isClose = true; + } + } + } + + /** + * Raw data size will be compute when writing the file footer. Hence raw data size value will be + * available only after closing the writer. + */ + @Override + public long getRawDataSize() { + return rawDataSize; + } + + /** + * Row count gets updated when flushing the stripes. To get accurate row count call this method + * after writer is closed. + */ + @Override + public long getNumberOfRows() { + return rowCount; + } + + @Override + public long writeIntermediateFooter() throws IOException { + // flush any buffered rows + flushStripe(); + // write a footer + if (stripesAtLastFlush != stripes.size()) { + if (callback != null) { + callback.preFooterWrite(callbackContext); + } + lastFlushOffset = writeFooter(); + stripesAtLastFlush = stripes.size(); + physicalWriter.flush(); + } + return lastFlushOffset; + } + + private static void checkArgument(boolean expression, String message) { + if (!expression) { + throw new IllegalArgumentException(message); + } + } + + @Override + public void appendStripe( + byte[] stripe, + int offset, + int length, + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) + throws IOException { + appendStripe( + stripe, + offset, + length, + stripeInfo, + new StripeStatistics[] { + new StripeStatisticsImpl( + schema, stripeStatistics.getColStatsList(), false, false) + }); + } + + @Override + public void appendStripe( + byte[] stripe, + int offset, + int length, + StripeInformation stripeInfo, + StripeStatistics[] stripeStatistics) + throws IOException { + checkArgument(stripe != null, "Stripe must not be null"); + checkArgument( + length <= stripe.length, + "Specified length must not be greater specified array length"); + checkArgument(stripeInfo != null, "Stripe information must not be null"); + checkArgument(stripeStatistics != null, "Stripe statistics must not be null"); + + // If we have buffered rows, flush them + if (rowsInStripe > 0) { + flushStripe(); + } + rowsInStripe = stripeInfo.getNumberOfRows(); + // update stripe information + OrcProto.StripeInformation.Builder dirEntry = + OrcProto.StripeInformation.newBuilder() + .setNumberOfRows(rowsInStripe) + .setIndexLength(stripeInfo.getIndexLength()) + .setDataLength(stripeInfo.getDataLength()) + .setFooterLength(stripeInfo.getFooterLength()); + // If this is the first stripe of the original file, we need to copy the + // encryption information. + if (stripeInfo.hasEncryptionStripeId()) { + dirEntry.setEncryptStripeId(stripeInfo.getEncryptionStripeId()); + for (byte[] key : stripeInfo.getEncryptedLocalKeys()) { + dirEntry.addEncryptedLocalKeys(ByteString.copyFrom(key)); + } + } + physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length), dirEntry); + + // since we have already written the stripe, just update stripe statistics + treeWriter.addStripeStatistics(stripeStatistics); + + stripes.add(dirEntry.build()); + + // reset it after writing the stripe + rowCount += rowsInStripe; + rowsInStripe = 0; + needKeyFlush = encryption.length > 0; + } + + @Override + public void appendUserMetadata(List userMetadata) { + if (userMetadata != null) { + for (OrcProto.UserMetadataItem item : userMetadata) { + this.userMetadata.put(item.getName(), item.getValue()); + } + } + } + + @Override + public ColumnStatistics[] getStatistics() { + // get the column statistics + final ColumnStatistics[] result = new ColumnStatistics[schema.getMaximumId() + 1]; + // Get the file statistics, preferring the encrypted one. + treeWriter.getCurrentStatistics(result); + return result; + } + + @Override + public List getStripes() throws IOException { + return Collections.unmodifiableList(OrcUtils.convertProtoStripesToStripes(stripes)); + } + + public CompressionCodec getCompressionCodec() { + return unencryptedOptions.getCodec(); + } + + private static boolean hasTimestamp(TypeDescription schema) { + if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) { + return true; + } + List children = schema.getChildren(); + if (children != null) { + for (TypeDescription child : children) { + if (hasTimestamp(child)) { + return true; + } + } + } + return false; + } + + private WriterEncryptionKey getKey(String keyName, KeyProvider provider) throws IOException { + WriterEncryptionKey result = keys.get(keyName); + if (result == null) { + result = new WriterEncryptionKey(provider.getCurrentKeyVersion(keyName)); + keys.put(keyName, result); + } + return result; + } + + private MaskDescriptionImpl getMask(String maskString) { + // if it is already there, get the earlier object + MaskDescriptionImpl result = maskDescriptions.get(maskString); + if (result == null) { + result = ParserUtils.buildMaskDescription(maskString); + maskDescriptions.put(maskString, result); + } + return result; + } + + private int visitTypeTree(TypeDescription schema, boolean encrypted, KeyProvider provider) + throws IOException { + int result = 0; + String keyName = schema.getAttributeValue(TypeDescription.ENCRYPT_ATTRIBUTE); + String maskName = schema.getAttributeValue(TypeDescription.MASK_ATTRIBUTE); + if (keyName != null) { + if (provider == null) { + throw new IllegalArgumentException("Encryption requires a KeyProvider."); + } + if (encrypted) { + throw new IllegalArgumentException("Nested encryption type: " + schema); + } + encrypted = true; + result += 1; + WriterEncryptionKey key = getKey(keyName, provider); + HadoopShims.KeyMetadata metadata = key.getMetadata(); + WriterEncryptionVariant variant = + new WriterEncryptionVariant(key, schema, provider.createLocalKey(metadata)); + key.addRoot(variant); + } + if (encrypted && (keyName != null || maskName != null)) { + MaskDescriptionImpl mask = getMask(maskName == null ? "nullify" : maskName); + mask.addColumn(schema); + } + List children = schema.getChildren(); + if (children != null) { + for (TypeDescription child : children) { + result += visitTypeTree(child, encrypted, provider); + } + } + return result; + } + + /** + * Iterate through the encryption options given by the user and set up our data structures. + * + * @param provider the KeyProvider to use to generate keys + * @param schema the type tree that we search for annotations + * @param keyOverrides user specified key overrides + */ + private WriterEncryptionVariant[] setupEncryption( + KeyProvider provider, + TypeDescription schema, + Map keyOverrides) + throws IOException { + keyProvider = + provider != null ? provider : CryptoUtils.getKeyProvider(conf, new SecureRandom()); + // Load the overrides into the cache so that we use the required key versions. + for (HadoopShims.KeyMetadata key : keyOverrides.values()) { + keys.put(key.getKeyName(), new WriterEncryptionKey(key)); + } + int variantCount = visitTypeTree(schema, false, keyProvider); + + // Now that we have de-duped the keys and maskDescriptions, make the arrays + int nextId = 0; + if (variantCount > 0) { + for (MaskDescriptionImpl mask : maskDescriptions.values()) { + mask.setId(nextId++); + for (TypeDescription column : mask.getColumns()) { + this.columnMaskDescriptions[column.getId()] = mask; + } + } + } + nextId = 0; + int nextVariantId = 0; + WriterEncryptionVariant[] result = new WriterEncryptionVariant[variantCount]; + for (WriterEncryptionKey key : keys.values()) { + key.setId(nextId++); + key.sortRoots(); + for (WriterEncryptionVariant variant : key.getEncryptionRoots()) { + result[nextVariantId] = variant; + columnEncryption[variant.getRoot().getId()] = variant; + variant.setId(nextVariantId++); + } + } + return result; + } + + @Override + public long estimateMemory() { + return this.treeWriter.estimateMemory(); + } +} diff --git a/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java b/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java new file mode 100644 index 000000000000..2eeb723c4ac5 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** orc ZstdCodec. */ +public class ZstdCodec implements CompressionCodec, DirectDecompressionCodec { + private ZstdOptions zstdOptions = null; + private ZstdCompressCtx zstdCompressCtx = null; + + public ZstdCodec(int level, int windowLog) { + this.zstdOptions = new ZstdOptions(level, windowLog); + } + + public ZstdCodec() { + this(3, 0); + } + + public ZstdOptions getZstdOptions() { + return zstdOptions; + } + + // Thread local buffer + private static final ThreadLocal threadBuffer = ThreadLocal.withInitial(() -> null); + + protected static byte[] getBuffer(int size) { + byte[] result = threadBuffer.get(); + if (result == null || result.length < size || result.length > size * 2) { + result = new byte[size]; + threadBuffer.set(result); + } + return result; + } + + static class ZstdOptions implements Options { + private int level; + private int windowLog; + + ZstdOptions(int level, int windowLog) { + this.level = level; + this.windowLog = windowLog; + } + + @Override + public ZstdOptions copy() { + return new ZstdOptions(level, windowLog); + } + + @Override + public Options setSpeed(SpeedModifier newValue) { + return this; + } + + /** + * Sets the Zstandard long mode maximum back-reference distance, expressed as a power of 2. + * + *

The value must be between ZSTD_WINDOWLOG_MIN (10) and ZSTD_WINDOWLOG_MAX (30 and 31 on + * 32/64-bit architectures, respectively). + * + *

A value of 0 is a special value indicating to use the default + * ZSTD_WINDOWLOG_LIMIT_DEFAULT of 27, which corresponds to back-reference window size of + * 128MiB. + * + * @param newValue The desired power-of-2 value back-reference distance. + * @return ZstdOptions + */ + public ZstdOptions setWindowLog(int newValue) { + if ((newValue < Zstd.windowLogMin() || newValue > Zstd.windowLogMax()) + && newValue != 0) { + throw new IllegalArgumentException( + String.format( + "Zstd compression window size should be in the range %d to %d," + + " or set to the default value of 0.", + Zstd.windowLogMin(), Zstd.windowLogMax())); + } + windowLog = newValue; + return this; + } + + /** + * Sets the Zstandard compression codec compression level directly using the integer + * setting. This value is typically between 0 and 22, with larger numbers indicating more + * aggressive compression and lower speed. + * + *

This method provides additional granularity beyond the setSpeed method so that users + * can select a specific level. + * + * @param newValue The level value of compression to set. + * @return ZstdOptions + */ + public ZstdOptions setLevel(int newValue) { + if (newValue < Zstd.minCompressionLevel() || newValue > Zstd.maxCompressionLevel()) { + throw new IllegalArgumentException( + String.format( + "Zstd compression level should be in the range %d to %d", + Zstd.minCompressionLevel(), Zstd.maxCompressionLevel())); + } + level = newValue; + return this; + } + + @Override + public ZstdOptions setData(DataKind newValue) { + return this; // We don't support setting DataKind in ZstdCodec. + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ZstdOptions that = (ZstdOptions) o; + + if (level != that.level) { + return false; + } + return windowLog == that.windowLog; + } + + @Override + public int hashCode() { + int result = level; + result = 31 * result + windowLog; + return result; + } + } + + private static final ZstdOptions DEFAULT_OPTIONS = new ZstdOptions(3, 0); + + @Override + public Options getDefaultOptions() { + return DEFAULT_OPTIONS; + } + + /** + * Compresses an input ByteBuffer into an output ByteBuffer using Zstandard compression. If the + * maximum bound of the number of output bytes exceeds the output ByteBuffer size, the remaining + * bytes are written to the overflow ByteBuffer. + * + * @param in the bytes to compress + * @param out the compressed bytes + * @param overflow put any additional bytes here + * @param options the options to control compression + * @return ZstdOptions + */ + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow, Options options) + throws IOException { + int inBytes = in.remaining(); + // Skip with minimum ZStandard format size: + // https://datatracker.ietf.org/doc/html/rfc8878#name-zstandard-frames + // Magic Number (4 bytes) + Frame Header (2 bytes) + Data Block Header (3 bytes) + if (inBytes < 10) { + return false; + } + + ZstdOptions zso = (ZstdOptions) options; + + zstdCompressCtx = new ZstdCompressCtx(); + zstdCompressCtx.setLevel(zso.level); + zstdCompressCtx.setLong(zso.windowLog); + zstdCompressCtx.setChecksum(false); + + try { + byte[] compressed = getBuffer((int) Zstd.compressBound(inBytes)); + + int outBytes = + zstdCompressCtx.compressByteArray( + compressed, + 0, + compressed.length, + in.array(), + in.arrayOffset() + in.position(), + inBytes); + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy( + compressed, + 0, + out.array(), + out.arrayOffset() + out.position(), + outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy( + compressed, + 0, + out.array(), + out.arrayOffset() + out.position(), + remaining); + out.position(out.limit()); + System.arraycopy( + compressed, + remaining, + overflow.array(), + overflow.arrayOffset(), + outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } finally { + zstdCompressCtx.close(); + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + if (in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + + int srcOffset = in.arrayOffset() + in.position(); + int srcSize = in.remaining(); + int dstOffset = out.arrayOffset() + out.position(); + int dstSize = out.remaining() - dstOffset; + + long decompressOut = + Zstd.decompressByteArray( + out.array(), dstOffset, dstSize, in.array(), srcOffset, srcSize); + in.position(in.limit()); + out.position(dstOffset + (int) decompressOut); + out.flip(); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException { + Zstd.decompress(out, in); + out.flip(); + } + + @Override + public void reset() {} + + @Override + public void destroy() { + if (zstdCompressCtx != null) { + zstdCompressCtx.close(); + } + } + + @Override + public CompressionKind getKind() { + return CompressionKind.ZSTD; + } + + @Override + public void close() { + OrcCodecPool.returnCodec(CompressionKind.ZSTD, this); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index ac33991c3062..1a925c7250f3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -79,7 +79,7 @@ public OrcFileFormat(FormatContext formatContext) { } @VisibleForTesting - Properties orcProperties() { + public Properties orcProperties() { return orcProperties; } diff --git a/paimon-format/src/main/resources/META-INF/NOTICE b/paimon-format/src/main/resources/META-INF/NOTICE index f6e607ef3502..dae8d5fec19a 100644 --- a/paimon-format/src/main/resources/META-INF/NOTICE +++ b/paimon-format/src/main/resources/META-INF/NOTICE @@ -6,8 +6,8 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.orc:orc-core:1.8.3 -- org.apache.orc:orc-shims:1.8.3 +- org.apache.orc:orc-core:1.9.2 +- org.apache.orc:orc-shims:1.9.2 - org.apache.hive:hive-storage-api:2.8.1 - io.airlift:aircompressor:0.21 - commons-lang:commons-lang:2.6 @@ -32,5 +32,5 @@ You find it under licenses/LICENSE.protobuf, licenses/LICENSE.zstd-jni and licenses/LICENSE.threeten-extra - com.google.protobuf:protobuf-java:3.17.3 -- com.github.luben:zstd-jni:1.5.0-1 +- com.github.luben:zstd-jni:1.5.5-11 - org.threeten:threeten-extra:1.7.1 diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java new file mode 100644 index 000000000000..d5f8f8bc7cc7 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.orc.writer; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.orc.OrcFileFormat; +import org.apache.paimon.format.orc.OrcFileFormatFactory; +import org.apache.paimon.format.orc.OrcWriterFactory; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import com.github.luben.zstd.ZstdException; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.impl.ZstdCodec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +import static org.apache.paimon.format.orc.OrcFileFormatFactory.IDENTIFIER; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +class OrcZstdTest { + + @Test + void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOException { + Options options = new Options(); + options.set("compress", "zstd"); + options.set("stripe.size", "31457280"); + options.set("compression.zstd.level", "1"); + OrcFileFormat orc = + new OrcFileFormatFactory() + .create(new FileFormatFactory.FormatContext(options, 1024)); + Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class); + + Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")) + .isEqualTo("zstd"); + Assertions.assertThat( + orc.orcProperties().getProperty(IDENTIFIER + ".compression.zstd.level", "")) + .isEqualTo("1"); + Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".stripe.size", "")) + .isEqualTo("31457280"); + + RowType rowType = + RowType.builder() + .field("a", DataTypes.INT()) + .field("b", DataTypes.STRING()) + .field("c", DataTypes.STRING()) + .field("d", DataTypes.STRING()) + .build(); + FormatWriterFactory writerFactory = orc.createWriterFactory(rowType); + Assertions.assertThat(writerFactory).isInstanceOf(OrcWriterFactory.class); + + Path path = new Path(tempDir.toUri().toString(), "1.orc"); + PositionOutputStream out = LocalFileIO.create().newOutputStream(path, true); + FormatWriter formatWriter = writerFactory.create(out, "zstd"); + + Assertions.assertThat(formatWriter).isInstanceOf(OrcBulkWriter.class); + + Options optionsWithLowLevel = new Options(); + optionsWithLowLevel.set("compress", "zstd"); + optionsWithLowLevel.set("stripe.size", "31457280"); + optionsWithLowLevel.set("compression.zstd.level", "1"); + + Random random = new Random(); + for (int i = 0; i < 1000; i++) { + GenericRow element = + GenericRow.of( + random.nextInt(), + BinaryString.fromString( + UUID.randomUUID().toString() + random.nextInt()), + BinaryString.fromString( + UUID.randomUUID().toString() + random.nextInt()), + BinaryString.fromString( + UUID.randomUUID().toString() + random.nextInt())); + formatWriter.addElement(element); + } + formatWriter.finish(); + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration()); + Reader reader = + OrcFile.createReader(new org.apache.hadoop.fs.Path(path.toString()), readerOptions); + Assertions.assertThat(reader.getNumberOfRows()).isEqualTo(1000); + Assertions.assertThat(reader.getCompressionKind()).isEqualTo(CompressionKind.ZSTD); + Assertions.assertThat(com.github.luben.zstd.util.Native.isLoaded()).isEqualTo(true); + } + + @Test + public void testCorrupt() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1000); + buf.put(new byte[] {127, 125, 1, 99, 98, 1}); + buf.flip(); + try (CompressionCodec codec = new ZstdCodec()) { + ByteBuffer out = ByteBuffer.allocate(1000); + codec.decompress(buf, out); + fail(); + } catch (ZstdException ioe) { + // EXPECTED + } + } + + @Test + public void testZstdDirectDecompress() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(10000); + ByteBuffer directOut = ByteBuffer.allocateDirect(10000); + ByteBuffer directResult = ByteBuffer.allocateDirect(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + try (ZstdCodec zstdCodec = new ZstdCodec()) { + // write bytes to heap buffer. + assertTrue(zstdCodec.compress(in, out, null, zstdCodec.getDefaultOptions())); + int position = out.position(); + out.flip(); + // copy heap buffer to direct buffer. + directOut.put(out.array()); + directOut.flip(); + directOut.limit(position); + + zstdCodec.decompress(directOut, directResult); + + // copy result from direct buffer to heap. + byte[] heapBytes = new byte[in.array().length]; + directResult.get(heapBytes, 0, directResult.limit()); + + assertArrayEquals(in.array(), heapBytes); + } catch (Exception e) { + fail(e); + } + } +}