diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index cab6e731e851..e0d55b2dbac3 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -92,5 +92,17 @@
String |
The warehouse root path of catalog. |
+
+ encryption.mechanism |
+ plaintext |
+ Enum |
+ Encryption mechanism for paimon, the default value is plaintext, which means it is not encrypted.
Possible values:- "plaintext": Do not encrypt the data files.
- "envelope": Encrypt data file using envelope encryption mechanism.
|
+
+
+ encryption.kms-client |
+ (none) |
+ Enum |
+ The kms client for encryption, if the user has enabled encryption, the kms client must be specified.
Possible values:- "memory": Use memory kms for encryption, this is only for test, can not be used in production environment.
- "hadoop": Use hadoop kms for encryption, parameters prefixed with `hadoop.security.` will be used to build hadoop kms client, the `hadoop.security.key.provider.path` is required. The hadoop parameters can be configured from catalog or environment,please refer to `https://paimon.apache.org/docs/master/filesystems/hdfs/#hdfs-configuration`.
|
+
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 5959b6b4f096..ce1a72ff661b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -726,5 +726,23 @@
Integer |
The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. |
+
+ encryption.table.key-id |
+ (none) |
+ String |
+ Specify the master key id for encryption. |
+
+
+ encryption.columns |
+ (none) |
+ String |
+ Specify the partial columns to be encrypted, separated by commas. If this parameter is not specified, all columns will be encrypted. |
+
+
+ encryption.algorithm |
+ (none) |
+ String |
+ Encryption algorithm for encrypting data files, for parquet format, the value can be `AES_GCM_V1` or `AES_GCM_CTR_V1`, and the default value is `AES_GCM_CTR_V1`. And can not specify algorithm for orc format now. |
+
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cbe5f45c5b50..daa09e6276b7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1076,6 +1076,26 @@ public class CoreOptions implements Serializable {
+ "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
+ "the config can be set to size.");
+ public static final ConfigOption ENCRYPTION_TABLE_KEY_ID =
+ key("encryption.table.key-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the master key id for encryption.");
+
+ public static final ConfigOption ENCRYPTION_COLUMNS =
+ key("encryption.columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the partial columns to be encrypted, separated by commas. If this parameter is not specified, all columns will be encrypted.");
+
+ public static final ConfigOption ENCRYPTION_ALGORITHM =
+ key("encryption.algorithm")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Encryption algorithm for encrypting data files, for parquet format, the value can be `AES_GCM_V1` or `AES_GCM_CTR_V1`, and the default value is `AES_GCM_CTR_V1`. And can not specify algorithm for orc format now.");
+
public static final ConfigOption SORT_COMPACTION_SAMPLE_MAGNIFICATION =
key("sort-compaction.local-sample.magnification")
.intType()
@@ -1690,6 +1710,14 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}
+ public String encryptionAlgorithm() {
+ return options.get(ENCRYPTION_ALGORITHM);
+ }
+
+ public String encryptionColumns() {
+ return options.get(ENCRYPTION_COLUMNS);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java b/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java
new file mode 100644
index 000000000000..47101e1ef867
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.options.Options;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+/** KMS client interface, provide common operations for KMS. */
+public interface KmsClient extends Serializable, Closeable {
+ void configure(Options options);
+
+ String identifier();
+
+ byte[] wrapKey(byte[] unWrappedKey, String masterKeyId);
+
+ byte[] unwrapKey(byte[] wrappedKey, String masterKeyId);
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index d3d8bb114914..a16e72b4a359 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -76,7 +76,12 @@ public Optional createStatsExtractor(
@VisibleForTesting
public static FileFormat fromIdentifier(String identifier, Options options) {
- return fromIdentifier(identifier, new FormatContext(options, 1024));
+ FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ return fromIdentifier(identifier, formatContext);
}
/** Create a {@link FileFormat} from format identifier and format options. */
@@ -105,8 +110,11 @@ private static Optional fromIdentifier(
public static FileFormat getFileFormat(Options options, String formatIdentifier) {
int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
- return FileFormat.fromIdentifier(
- formatIdentifier,
- new FormatContext(options.removePrefix(formatIdentifier + "."), readBatchSize));
+ FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options.removePrefix(formatIdentifier + "."))
+ .readBatchSize(readBatchSize)
+ .build();
+ return FileFormat.fromIdentifier(formatIdentifier, formatContext);
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
index 71e10a242391..210b6e4d4db5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
@@ -27,14 +27,34 @@ public interface FileFormatFactory {
FileFormat create(FormatContext formatContext);
- /** the format context. */
+ /** The format context for reader and writer. */
class FormatContext {
private final Options formatOptions;
private final int readBatchSize;
+ private final String compression;
+ private final String encryptionTableKeyId;
+ private final byte[] plaintextDataKey;
+ private final byte[] dataAADPrefix;
+ private final String encryptionAlgorithm;
+ private final String encryptionColumns;
- public FormatContext(Options formatOptions, int readBatchSize) {
+ private FormatContext(
+ Options formatOptions,
+ int readBatchSize,
+ String compression,
+ String encryptionTableKeyId,
+ byte[] plaintextDataKey,
+ byte[] dataAADPrefix,
+ String encryptionAlgorithm,
+ String encryptionColumns) {
this.formatOptions = formatOptions;
this.readBatchSize = readBatchSize;
+ this.compression = compression;
+ this.encryptionTableKeyId = encryptionTableKeyId;
+ this.plaintextDataKey = plaintextDataKey;
+ this.dataAADPrefix = dataAADPrefix;
+ this.encryptionAlgorithm = encryptionAlgorithm;
+ this.encryptionColumns = encryptionColumns;
}
public Options formatOptions() {
@@ -44,5 +64,112 @@ public Options formatOptions() {
public int readBatchSize() {
return readBatchSize;
}
+
+ public String compression() {
+ return compression;
+ }
+
+ public String encryptionTableId() {
+ return encryptionTableKeyId;
+ }
+
+ public byte[] plaintextDataKey() {
+ return plaintextDataKey;
+ }
+
+ public byte[] dataAADPrefix() {
+ return dataAADPrefix;
+ }
+
+ public String encryptionAlgorithm() {
+ return encryptionAlgorithm;
+ }
+
+ public String encryptionColumns() {
+ return encryptionColumns;
+ }
+
+ public FormatContext newDataKey(byte[] dataKey) {
+ return new FormatContext(
+ formatOptions,
+ readBatchSize,
+ compression,
+ encryptionTableKeyId,
+ dataKey,
+ dataAADPrefix,
+ encryptionAlgorithm,
+ encryptionColumns);
+ }
+ }
+
+ /** Format context builder. */
+ class FormatContextBuilder {
+
+ private Options formatOptions;
+ private int readBatchSize;
+ private String compression;
+ private String encryptionTableId;
+ private byte[] plaintextDataKey;
+ private byte[] dataAADPrefix;
+ private String encryptionAlgorithm;
+ private String encryptionColumns;
+
+ private FormatContextBuilder() {}
+
+ public FormatContextBuilder formatOptions(Options formatOptions) {
+ this.formatOptions = formatOptions;
+ return this;
+ }
+
+ public FormatContextBuilder readBatchSize(int readBatchSize) {
+ this.readBatchSize = readBatchSize;
+ return this;
+ }
+
+ public FormatContextBuilder compression(String compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ public FormatContextBuilder withEncryptionTableId(String keyId) {
+ this.encryptionTableId = keyId;
+ return this;
+ }
+
+ public FormatContextBuilder withPlaintextDataKey(byte[] plaintextDataKey) {
+ this.plaintextDataKey = plaintextDataKey;
+ return this;
+ }
+
+ public FormatContextBuilder withAADPrefix(byte[] dataAADPrefix) {
+ this.dataAADPrefix = dataAADPrefix;
+ return this;
+ }
+
+ public FormatContextBuilder withEncryptionAlgorithm(String encryptionAlgorithm) {
+ this.encryptionAlgorithm = encryptionAlgorithm;
+ return this;
+ }
+
+ public FormatContextBuilder withEncryptionColumns(String encryptionColumns) {
+ this.encryptionColumns = encryptionColumns;
+ return this;
+ }
+
+ public FormatContext build() {
+ return new FormatContext(
+ formatOptions,
+ readBatchSize,
+ compression,
+ encryptionTableId,
+ plaintextDataKey,
+ dataAADPrefix,
+ encryptionAlgorithm,
+ encryptionColumns);
+ }
+ }
+
+ static FormatContextBuilder formatContextBuilder() {
+ return new FormatContextBuilder();
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f00a35a75094..5f1722297eee 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,13 +18,17 @@
package org.apache.paimon.options;
+import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
+import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.table.TableType;
import java.time.Duration;
+import static org.apache.paimon.options.CatalogOptions.EncryptionMechanism.PLAINTEXT;
import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.options.description.TextElement.text;
/** Options for catalog. */
public class CatalogOptions {
@@ -110,4 +114,72 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());
+
+ public static final ConfigOption ENCRYPTION_MECHANISM =
+ key("encryption.mechanism")
+ .enumType(EncryptionMechanism.class)
+ .defaultValue(PLAINTEXT)
+ .withDescription(
+ "Encryption mechanism for paimon, the default value is plaintext, which means it is not encrypted.");
+
+ public static final ConfigOption ENCRYPTION_KMS_CLIENT =
+ key("encryption.kms-client")
+ .enumType(EncryptionKmsClient.class)
+ .noDefaultValue()
+ .withDescription(
+ "The kms client for encryption, if the user has enabled encryption, the kms client must be specified.");
+
+ /** The encryption mechanism for paimon. */
+ public enum EncryptionMechanism implements DescribedEnum {
+ PLAINTEXT("plaintext", "Do not encrypt the data files."),
+ ENVELOPE("envelope", "Encrypt data file using envelope encryption mechanism.");
+
+ private final String value;
+ private final String description;
+
+ EncryptionMechanism(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** The kms client for encryption. */
+ public enum EncryptionKmsClient implements DescribedEnum {
+ MEMORY(
+ "memory",
+ "Use memory kms for encryption, this is only for test, can not be used in production environment."),
+ HADOOP(
+ "hadoop",
+ "Use hadoop kms for encryption, parameters prefixed with `hadoop.security.` will be used to build hadoop kms client, "
+ + "the `hadoop.security.key.provider.path` is required. The hadoop parameters can be configured from catalog or environment,"
+ + "please refer to `https://paimon.apache.org/docs/master/filesystems/hdfs/#hdfs-configuration`.");
+
+ private final String value;
+ private final String description;
+
+ EncryptionKmsClient(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java
new file mode 100644
index 000000000000..e10883acb304
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.security.GeneralSecurityException;
+
+/** The encryption utils. */
+public class EncryptionUtils {
+
+ private static final int NONCE_LENGTH = 12;
+ private static final int GCM_TAG_LENGTH = 16;
+ private static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;
+
+ public static byte[] encryptKeyLocally(byte[] plaintext, byte[] masterKey) {
+ AesGcmEncryptor aesGcmEncryptor = new AesGcmEncryptor(masterKey);
+ return aesGcmEncryptor.encrypt(plaintext);
+ }
+
+ public static byte[] decryptKeyLocally(byte[] encryptedKey, byte[] masterKey) {
+ AesGcmDecryptor aesGcmDecryptor = new AesGcmDecryptor(masterKey);
+ return aesGcmDecryptor.encrypt(encryptedKey);
+ }
+
+ private static class AesGcmEncryptor {
+
+ private final Cipher cipher;
+
+ public AesGcmEncryptor(byte[] masterKey) {
+ try {
+ cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ SecretKeySpec secretKeySpec = new SecretKeySpec(masterKey, "AES");
+ byte[] nonce = new byte[NONCE_LENGTH];
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce);
+ cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, spec);
+ } catch (GeneralSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public byte[] encrypt(byte[] plaintext) {
+ try {
+ return cipher.doFinal(plaintext);
+ } catch (IllegalBlockSizeException | BadPaddingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class AesGcmDecryptor {
+
+ private final Cipher cipher;
+
+ public AesGcmDecryptor(byte[] masterKey) {
+ try {
+ cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ SecretKeySpec secretKeySpec = new SecretKeySpec(masterKey, "AES");
+ byte[] nonce = new byte[NONCE_LENGTH];
+ GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce);
+ cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, spec);
+ } catch (GeneralSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public byte[] encrypt(byte[] plaintext) {
+ try {
+ return cipher.doFinal(plaintext);
+ } catch (IllegalBlockSizeException | BadPaddingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java b/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java
new file mode 100644
index 000000000000..a0ae0e7a86c9
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.EncryptionUtils;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** This is only for test, not for production. */
+public class MemoryKMS implements KmsClient {
+
+ private static final String IDENTIFIER = "memory";
+
+ private final Map keyMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void configure(Options options) {}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public byte[] wrapKey(byte[] dataKey, String masterKeyId) {
+ byte[] masterKey = keyMap.get(masterKeyId);
+ if (null == masterKey) {
+ throw new RuntimeException("Key not found: " + masterKeyId);
+ }
+ return EncryptionUtils.encryptKeyLocally(dataKey, masterKey);
+ }
+
+ @Override
+ public byte[] unwrapKey(byte[] wrappedKey, String masterKeyId) {
+ byte[] masterKey = keyMap.get(masterKeyId);
+ if (null == masterKey) {
+ throw new RuntimeException("Key not found: " + masterKeyId);
+ }
+ return EncryptionUtils.decryptKeyLocally(wrappedKey, masterKey);
+ }
+
+ @Override
+ public void close() throws IOException {
+ keyMap.clear();
+ }
+}
diff --git a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient
new file mode 100644
index 000000000000..f53f0777ae74
--- /dev/null
+++ b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+org.apache.paimon.encryption.MemoryKMS
\ No newline at end of file
diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java
new file mode 100644
index 000000000000..7a22df7cad51
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/** Module for encrypting and decrypting table data files, it must be serializable. */
+public interface EncryptionManager extends Serializable {
+
+ String identifier();
+
+ void configure(Options options);
+
+ FileIO decrypt(EnvelopeEncryptionManager.EncryptedFileIO encryptedFileIO);
+
+ EnvelopeEncryptionManager.EncryptedFileIO encrypt(FileIO fileIO);
+
+ default Map discoverKmsClient() {
+ Iterator iterator =
+ ServiceLoader.load(KmsClient.class, EncryptionManager.class.getClassLoader())
+ .iterator();
+
+ Map map = new HashMap<>();
+ while (iterator.hasNext()) {
+ KmsClient kmsClient = iterator.next();
+ map.put(kmsClient.identifier(), kmsClient);
+ }
+
+ return map;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java
new file mode 100644
index 000000000000..1514a10a1330
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.EncryptionUtils;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Encrypt data file using envelope encryption mechanism. */
+public class EnvelopeEncryptionManager implements EncryptionManager {
+
+ private static final String IDENTIFIER = "envelope";
+ private static final int KEY_ENCRYPTION_KEY_LENGTH = 16;
+ private static final SecureRandom RANDOM = new SecureRandom();
+ // kekId -> kek
+ private static final Map KEY_MAP = new HashMap<>();
+ private static KmsClient kmsClient;
+
+ @Override
+ public void configure(Options options) {
+ Map map = discoverKmsClient();
+ kmsClient = map.get(options.get(CatalogOptions.ENCRYPTION_MECHANISM.key()));
+ kmsClient.configure(options);
+ }
+
+ @Override
+ public FileIO decrypt(EncryptedFileIO encryptedFileIO) {
+ return encryptedFileIO.fileIO();
+ }
+
+ @Override
+ public EncryptedFileIO encrypt(FileIO fileIO) {
+ return new EncryptedFileIO(fileIO);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ /** Wrapped encrypted FileIO. */
+ public static class EncryptedFileIO {
+ private final FileIO fileIO;
+ private KeyMetadata keyMetadata;
+ private FileFormatFactory.FormatContext formatContext;
+
+ public EncryptedFileIO(FileIO fileIO) {
+ this.fileIO = fileIO;
+ }
+
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ /**
+ * when read file, we load the keyMetadata from manifest file, and set it.
+ *
+ * @param keyMetadata the key metadata
+ */
+ public void keyMetadata(KeyMetadata keyMetadata) {
+ this.keyMetadata = keyMetadata;
+ }
+
+ /**
+ * when write the KeyMetadata to manifest file, we generate the KeyMetadata from
+ * FormatContext.
+ *
+ * @return the KeyMetadata
+ */
+ public KeyMetadata keyMetadata() {
+ if (null == keyMetadata) {
+
+ if (formatContext != null && formatContext.plaintextDataKey() != null) {
+ byte[] kekId = formatContext.dataAADPrefix();
+
+ byte[] kekBytes = new byte[KEY_ENCRYPTION_KEY_LENGTH];
+ RANDOM.nextBytes(kekBytes);
+
+ byte[] wrappedDEK =
+ EncryptionUtils.encryptKeyLocally(
+ formatContext.plaintextDataKey(), kekBytes);
+
+ KeyEncryptionKey kek =
+ KEY_MAP.computeIfAbsent(
+ kekId,
+ bytes -> {
+ byte[] wrappedKEK =
+ kmsClient.wrapKey(
+ kekBytes,
+ formatContext.encryptionTableId());
+ return new KeyEncryptionKey(kekBytes, wrappedKEK);
+ });
+
+ this.keyMetadata =
+ new KeyMetadata(
+ kekId, kek.kek(), wrappedDEK, formatContext.dataAADPrefix());
+ }
+ }
+ return keyMetadata;
+ }
+
+ public void formatContext(FileFormatFactory.FormatContext formatContext) {
+ this.formatContext = formatContext;
+ }
+
+ public FileFormatFactory.FormatContext formatContext() {
+ // when read file, decrypt the plaintext data key from keyMetadata.
+ if (null != keyMetadata && null == formatContext.plaintextDataKey()) {
+ KeyEncryptionKey kek =
+ KEY_MAP.computeIfAbsent(
+ keyMetadata.kekID(),
+ kekIdentifier -> {
+ byte[] kekBytes =
+ kmsClient.unwrapKey(
+ keyMetadata.wrappedKEK(),
+ formatContext.encryptionTableId());
+ return new KeyEncryptionKey(kekBytes, keyMetadata.wrappedKEK());
+ });
+
+ byte[] unWrappedDEK =
+ EncryptionUtils.decryptKeyLocally(keyMetadata.wrappedDEK(), kek.kek());
+ formatContext = formatContext.newDataKey(unWrappedDEK);
+ }
+
+ return formatContext;
+ }
+ }
+
+ static class KeyEncryptionKey {
+ private final byte[] kek;
+ private final byte[] wrappedKEK;
+
+ public KeyEncryptionKey(byte[] kek, byte[] wrappedKEK) {
+ this.kek = kek;
+ this.wrappedKEK = wrappedKEK;
+ }
+
+ public byte[] kek() {
+ return kek;
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java
new file mode 100644
index 000000000000..143f948cd796
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+
+/** Wrap information about the key. */
+public class KeyMetadata {
+
+ private final byte[] kekID; // kek id, used to cache kek
+ private final byte[] wrappedKEK; // encrypted kek
+ private final byte[] wrappedDEK; // encrypted data key
+ private final byte[] aadPrefix; // aad prefix, only used for parquet format
+
+ private KeyMetadata() {
+ this(new byte[0], new byte[0], new byte[0], new byte[0]);
+ }
+
+ public KeyMetadata(byte[] kekID, byte[] wrappedKEK, byte[] wrappedDEK, byte[] aadPrefix) {
+ this.kekID = kekID;
+ this.wrappedKEK = wrappedKEK;
+ this.wrappedDEK = wrappedDEK;
+ this.aadPrefix = aadPrefix;
+ }
+
+ public static KeyMetadata emptyKeyMetadata() {
+ return new KeyMetadata();
+ }
+
+ public static RowType schema() {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(1, "_KEK_ID", newBytesType(false)));
+ fields.add(new DataField(1, "_WRAPPED_KEK", newBytesType(false)));
+ fields.add(new DataField(1, "_WRAPPED_DEK", newBytesType(false)));
+ fields.add(new DataField(2, "_AAD_PREFIX", newBytesType(false)));
+ return new RowType(fields);
+ }
+
+ public byte[] aadPrefix() {
+ return aadPrefix;
+ }
+
+ public byte[] kekID() {
+ return kekID;
+ }
+
+ public byte[] wrappedKEK() {
+ return wrappedKEK;
+ }
+
+ public byte[] wrappedDEK() {
+ return wrappedDEK;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof KeyMetadata)) {
+ return false;
+ }
+ KeyMetadata that = (KeyMetadata) o;
+ return Arrays.equals(kekID, that.kekID)
+ && Arrays.equals(wrappedKEK, that.wrappedKEK)
+ && Arrays.equals(wrappedDEK, that.wrappedDEK)
+ && Arrays.equals(aadPrefix, that.aadPrefix);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ Arrays.hashCode(kekID),
+ Arrays.hashCode(wrappedKEK),
+ Arrays.hashCode(wrappedDEK),
+ Arrays.hashCode(aadPrefix));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java
new file mode 100644
index 000000000000..984f59426d61
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serializer for {@link KeyMetadata}. */
+public class KeyMetadataSerializer extends ObjectSerializer {
+
+ public KeyMetadataSerializer() {
+ super(KeyMetadata.schema());
+ }
+
+ @Override
+ public InternalRow toRow(KeyMetadata record) {
+ return GenericRow.of(
+ record.kekID(), record.wrappedKEK(), record.wrappedDEK(), record.aadPrefix());
+ }
+
+ @Override
+ public KeyMetadata fromRow(InternalRow rowData) {
+ return new KeyMetadata(
+ rowData.getBinary(0),
+ rowData.getBinary(1),
+ rowData.getBinary(2),
+ rowData.getBinary(3));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java
new file mode 100644
index 000000000000..0270b4a364de
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+
+/** Do not encrypt the data files. */
+public class PlaintextEncryptionManager implements EncryptionManager {
+
+ public static final String IDENTIFIER = "plaintext";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public void configure(Options options) {}
+
+ @Override
+ public FileIO decrypt(EnvelopeEncryptionManager.EncryptedFileIO encryptedFileIO) {
+ return encryptedFileIO.fileIO();
+ }
+
+ @Override
+ public EnvelopeEncryptionManager.EncryptedFileIO encrypt(FileIO fileIO) {
+ return new EnvelopeEncryptionManager.EncryptedFileIO(fileIO);
+ }
+}
diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager
new file mode 100644
index 000000000000..d8a843eea72a
--- /dev/null
+++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+org.apache.paimon.encryption.PlaintextEncryptionManager
+org.apache.paimon.encryption.EnvelopeEncryptionManager
\ No newline at end of file
diff --git a/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java
new file mode 100644
index 000000000000..d69665180246
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.encryption;
+
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.ObjectSerializerTestBase;
+
+/** Tests for {@link KeyMetadataSerializer}. */
+public class KeyMetadataSerializerTest extends ObjectSerializerTestBase {
+
+ @Override
+ protected ObjectSerializer serializer() {
+ return new KeyMetadataSerializer();
+ }
+
+ @Override
+ protected KeyMetadata object() {
+ return new KeyMetadata(
+ "kekID".getBytes(),
+ "wrappedKEK".getBytes(),
+ "wrappedDEK".getBytes(),
+ "aadPrefix".getBytes());
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
index d9927d66f4d1..a31d935deaef 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
@@ -35,10 +35,12 @@ public String identifier() {
@Override
public OrcFileFormat create(FormatContext formatContext) {
- return new OrcFileFormat(
- new FormatContext(
- supplyDefaultOptions(formatContext.formatOptions()),
- formatContext.readBatchSize()));
+ FormatContext context =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(supplyDefaultOptions(formatContext.formatOptions()))
+ .readBatchSize(formatContext.readBatchSize())
+ .build();
+ return new OrcFileFormat(context);
}
private Options supplyDefaultOptions(Options options) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
index da1cfab74237..8f003af150fc 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
@@ -37,10 +37,12 @@ public String identifier() {
@Override
public ParquetFileFormat create(FormatContext formatContext) {
- return new ParquetFileFormat(
- new FormatContext(
- supplyDefaultOptions(formatContext.formatOptions()),
- formatContext.readBatchSize()));
+ FormatContext context =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(supplyDefaultOptions(formatContext.formatOptions()))
+ .readBatchSize(formatContext.readBatchSize())
+ .build();
+ return new ParquetFileFormat(context);
}
private Options supplyDefaultOptions(Options options) {
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
index fa80ec11da0d..9588146c3c8e 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.orc;
+import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
@@ -39,7 +40,12 @@ public class OrcFileFormatTest {
public void testAbsent() {
Options options = new Options();
options.setString("haha", "1");
- OrcFileFormat orc = new OrcFileFormatFactory().create(new FormatContext(options, 1024));
+ FormatContext context =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ OrcFileFormat orc = new OrcFileFormatFactory().create(context);
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("lz4");
}
@@ -49,15 +55,24 @@ public void testPresent() {
Options options = new Options();
options.setString("haha", "1");
options.setString("compress", "zlib");
- OrcFileFormat orc = new OrcFileFormatFactory().create(new FormatContext(options, 1024));
+ FormatContext context =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ OrcFileFormat orc = new OrcFileFormatFactory().create(context);
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1");
assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("zlib");
}
@Test
public void testSupportedDataTypes() {
- OrcFileFormat orc =
- new OrcFileFormatFactory().create(new FormatContext(new Options(), 1024));
+ FormatContext context =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(new Options())
+ .readBatchSize(1024)
+ .build();
+ OrcFileFormat orc = new OrcFileFormatFactory().create(context);
int index = 0;
List dataFields = new ArrayList();
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
index 3e68058622ed..18b3422061aa 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
@@ -32,6 +32,11 @@ protected OrcFormatReadWriteTest() {
@Override
protected FileFormat fileFormat() {
- return new OrcFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024));
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(new Options())
+ .readBatchSize(1024)
+ .build();
+ return new OrcFileFormat(formatContext);
}
}
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java
index 0bf90b38a4a7..831264860afb 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java
@@ -62,9 +62,13 @@ void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOExceptio
options.set("compress", "zstd");
options.set("stripe.size", "31457280");
options.set("compression.zstd.level", "1");
- OrcFileFormat orc =
- new OrcFileFormatFactory()
- .create(new FileFormatFactory.FormatContext(options, 1024));
+
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ OrcFileFormat orc = new OrcFileFormatFactory().create(formatContext);
Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class);
Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", ""))
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index 7b687ec27e4f..b4302217d187 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -18,7 +18,7 @@
package org.apache.paimon.format.parquet;
-import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
@@ -46,8 +46,12 @@ public class ParquetFileFormatTest {
@Test
public void testAbsent() {
Options options = new Options();
- ParquetFileFormat parquet =
- new ParquetFileFormatFactory().create(new FormatContext(options, 1024));
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext);
assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("absent");
}
@@ -55,8 +59,12 @@ public void testAbsent() {
public void testPresent() {
Options options = new Options();
options.setString(KEY1.key(), "v1");
- ParquetFileFormat parquet =
- new ParquetFileFormatFactory().create(new FormatContext(options, 1024));
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(options)
+ .readBatchSize(1024)
+ .build();
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext);
assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("v1");
}
@@ -85,8 +93,12 @@ public void testFileCompressionHigherPreference() {
@Test
public void testSupportedDataFields() {
- ParquetFileFormat parquet =
- new ParquetFileFormatFactory().create(new FormatContext(new Options(), 1024));
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(new Options())
+ .readBatchSize(1024)
+ .build();
+ ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext);
int index = 0;
List dataFields = new ArrayList();
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 4cfcffe220f8..b623cbc5eaf4 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -32,6 +32,12 @@ protected ParquetFormatReadWriteTest() {
@Override
protected FileFormat fileFormat() {
- return new ParquetFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024));
+ FileFormatFactory.FormatContext formatContext =
+ FileFormatFactory.formatContextBuilder()
+ .formatOptions(new Options())
+ .readBatchSize(1024)
+ .build();
+
+ return new ParquetFileFormat(formatContext);
}
}