From c8857711dcdef7007b034e60de59115c383ed409 Mon Sep 17 00:00:00 2001 From: zhangjun Date: Sun, 17 Sep 2023 00:32:32 +0800 Subject: [PATCH] introduce encryption interface --- .../generated/catalog_configuration.html | 12 ++ .../generated/core_configuration.html | 18 ++ .../java/org/apache/paimon/CoreOptions.java | 28 +++ .../apache/paimon/encryption/KmsClient.java | 35 ++++ .../org/apache/paimon/format/FileFormat.java | 16 +- .../paimon/format/FileFormatFactory.java | 131 +++++++++++++- .../apache/paimon/options/CatalogOptions.java | 72 ++++++++ .../apache/paimon/utils/EncryptionUtils.java | 95 ++++++++++ .../apache/paimon/encryption/MemoryKMS.java | 65 +++++++ .../org.apache.paimon.encryption.KmsClient | 16 ++ .../paimon/encryption/EncryptionManager.java | 54 ++++++ .../encryption/EnvelopeEncryptionManager.java | 164 ++++++++++++++++++ .../apache/paimon/encryption/KeyMetadata.java | 99 +++++++++++ .../encryption/KeyMetadataSerializer.java | 46 +++++ .../PlaintextEncryptionManager.java | 46 +++++ ...apache.paimon.encryption.EncryptionManager | 17 ++ .../encryption/KeyMetadataSerializerTest.java | 40 +++++ .../format/orc/OrcFileFormatFactory.java | 10 +- .../parquet/ParquetFileFormatFactory.java | 10 +- .../paimon/format/orc/OrcFileFormatTest.java | 23 ++- .../format/orc/OrcFormatReadWriteTest.java | 7 +- .../paimon/format/orc/writer/OrcZstdTest.java | 10 +- .../format/parquet/ParquetFileFormatTest.java | 26 ++- .../parquet/ParquetFormatReadWriteTest.java | 8 +- 24 files changed, 1018 insertions(+), 30 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java create mode 100644 paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient create mode 100644 paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java create mode 100644 paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager create mode 100644 paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index cab6e731e851..e0d55b2dbac3 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -92,5 +92,17 @@ String The warehouse root path of catalog. + +
encryption.mechanism
+ plaintext + Enum + Encryption mechanism for paimon, the default value is plaintext, which means it is not encrypted.

Possible values: + + +
encryption.kms-client
+ (none) + Enum + The kms client for encryption, if the user has enabled encryption, the kms client must be specified.

Possible values: + diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5959b6b4f096..ce1a72ff661b 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -726,5 +726,23 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. + +
encryption.table.key-id
+ (none) + String + Specify the master key id for encryption. + + +
encryption.columns
+ (none) + String + Specify the partial columns to be encrypted, separated by commas. If this parameter is not specified, all columns will be encrypted. + + +
encryption.algorithm
+ (none) + String + Encryption algorithm for encrypting data files, for parquet format, the value can be `AES_GCM_V1` or `AES_GCM_CTR_V1`, and the default value is `AES_GCM_CTR_V1`. And can not specify algorithm for orc format now. + diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index cbe5f45c5b50..daa09e6276b7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1076,6 +1076,26 @@ public class CoreOptions implements Serializable { + "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, " + "the config can be set to size."); + public static final ConfigOption ENCRYPTION_TABLE_KEY_ID = + key("encryption.table.key-id") + .stringType() + .noDefaultValue() + .withDescription("Specify the master key id for encryption."); + + public static final ConfigOption ENCRYPTION_COLUMNS = + key("encryption.columns") + .stringType() + .noDefaultValue() + .withDescription( + "Specify the partial columns to be encrypted, separated by commas. If this parameter is not specified, all columns will be encrypted."); + + public static final ConfigOption ENCRYPTION_ALGORITHM = + key("encryption.algorithm") + .stringType() + .noDefaultValue() + .withDescription( + "Encryption algorithm for encrypting data files, for parquet format, the value can be `AES_GCM_V1` or `AES_GCM_CTR_V1`, and the default value is `AES_GCM_CTR_V1`. And can not specify algorithm for orc format now."); + public static final ConfigOption SORT_COMPACTION_SAMPLE_MAGNIFICATION = key("sort-compaction.local-sample.magnification") .intType() @@ -1690,6 +1710,14 @@ public boolean deletionVectorsEnabled() { return options.get(DELETION_VECTORS_ENABLED); } + public String encryptionAlgorithm() { + return options.get(ENCRYPTION_ALGORITHM); + } + + public String encryptionColumns() { + return options.get(ENCRYPTION_COLUMNS); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java b/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java new file mode 100644 index 000000000000..47101e1ef867 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/encryption/KmsClient.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.options.Options; + +import java.io.Closeable; +import java.io.Serializable; + +/** KMS client interface, provide common operations for KMS. */ +public interface KmsClient extends Serializable, Closeable { + void configure(Options options); + + String identifier(); + + byte[] wrapKey(byte[] unWrappedKey, String masterKeyId); + + byte[] unwrapKey(byte[] wrappedKey, String masterKeyId); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index d3d8bb114914..a16e72b4a359 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -76,7 +76,12 @@ public Optional createStatsExtractor( @VisibleForTesting public static FileFormat fromIdentifier(String identifier, Options options) { - return fromIdentifier(identifier, new FormatContext(options, 1024)); + FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + return fromIdentifier(identifier, formatContext); } /** Create a {@link FileFormat} from format identifier and format options. */ @@ -105,8 +110,11 @@ private static Optional fromIdentifier( public static FileFormat getFileFormat(Options options, String formatIdentifier) { int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE); - return FileFormat.fromIdentifier( - formatIdentifier, - new FormatContext(options.removePrefix(formatIdentifier + "."), readBatchSize)); + FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(options.removePrefix(formatIdentifier + ".")) + .readBatchSize(readBatchSize) + .build(); + return FileFormat.fromIdentifier(formatIdentifier, formatContext); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index 71e10a242391..210b6e4d4db5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -27,14 +27,34 @@ public interface FileFormatFactory { FileFormat create(FormatContext formatContext); - /** the format context. */ + /** The format context for reader and writer. */ class FormatContext { private final Options formatOptions; private final int readBatchSize; + private final String compression; + private final String encryptionTableKeyId; + private final byte[] plaintextDataKey; + private final byte[] dataAADPrefix; + private final String encryptionAlgorithm; + private final String encryptionColumns; - public FormatContext(Options formatOptions, int readBatchSize) { + private FormatContext( + Options formatOptions, + int readBatchSize, + String compression, + String encryptionTableKeyId, + byte[] plaintextDataKey, + byte[] dataAADPrefix, + String encryptionAlgorithm, + String encryptionColumns) { this.formatOptions = formatOptions; this.readBatchSize = readBatchSize; + this.compression = compression; + this.encryptionTableKeyId = encryptionTableKeyId; + this.plaintextDataKey = plaintextDataKey; + this.dataAADPrefix = dataAADPrefix; + this.encryptionAlgorithm = encryptionAlgorithm; + this.encryptionColumns = encryptionColumns; } public Options formatOptions() { @@ -44,5 +64,112 @@ public Options formatOptions() { public int readBatchSize() { return readBatchSize; } + + public String compression() { + return compression; + } + + public String encryptionTableId() { + return encryptionTableKeyId; + } + + public byte[] plaintextDataKey() { + return plaintextDataKey; + } + + public byte[] dataAADPrefix() { + return dataAADPrefix; + } + + public String encryptionAlgorithm() { + return encryptionAlgorithm; + } + + public String encryptionColumns() { + return encryptionColumns; + } + + public FormatContext newDataKey(byte[] dataKey) { + return new FormatContext( + formatOptions, + readBatchSize, + compression, + encryptionTableKeyId, + dataKey, + dataAADPrefix, + encryptionAlgorithm, + encryptionColumns); + } + } + + /** Format context builder. */ + class FormatContextBuilder { + + private Options formatOptions; + private int readBatchSize; + private String compression; + private String encryptionTableId; + private byte[] plaintextDataKey; + private byte[] dataAADPrefix; + private String encryptionAlgorithm; + private String encryptionColumns; + + private FormatContextBuilder() {} + + public FormatContextBuilder formatOptions(Options formatOptions) { + this.formatOptions = formatOptions; + return this; + } + + public FormatContextBuilder readBatchSize(int readBatchSize) { + this.readBatchSize = readBatchSize; + return this; + } + + public FormatContextBuilder compression(String compression) { + this.compression = compression; + return this; + } + + public FormatContextBuilder withEncryptionTableId(String keyId) { + this.encryptionTableId = keyId; + return this; + } + + public FormatContextBuilder withPlaintextDataKey(byte[] plaintextDataKey) { + this.plaintextDataKey = plaintextDataKey; + return this; + } + + public FormatContextBuilder withAADPrefix(byte[] dataAADPrefix) { + this.dataAADPrefix = dataAADPrefix; + return this; + } + + public FormatContextBuilder withEncryptionAlgorithm(String encryptionAlgorithm) { + this.encryptionAlgorithm = encryptionAlgorithm; + return this; + } + + public FormatContextBuilder withEncryptionColumns(String encryptionColumns) { + this.encryptionColumns = encryptionColumns; + return this; + } + + public FormatContext build() { + return new FormatContext( + formatOptions, + readBatchSize, + compression, + encryptionTableId, + plaintextDataKey, + dataAADPrefix, + encryptionAlgorithm, + encryptionColumns); + } + } + + static FormatContextBuilder formatContextBuilder() { + return new FormatContextBuilder(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f00a35a75094..5f1722297eee 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -18,13 +18,17 @@ package org.apache.paimon.options; +import org.apache.paimon.options.description.DescribedEnum; import org.apache.paimon.options.description.Description; +import org.apache.paimon.options.description.InlineElement; import org.apache.paimon.options.description.TextElement; import org.apache.paimon.table.TableType; import java.time.Duration; +import static org.apache.paimon.options.CatalogOptions.EncryptionMechanism.PLAINTEXT; import static org.apache.paimon.options.ConfigOptions.key; +import static org.apache.paimon.options.description.TextElement.text; /** Options for catalog. */ public class CatalogOptions { @@ -110,4 +114,72 @@ public class CatalogOptions { TextElement.text( "\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.")) .build()); + + public static final ConfigOption ENCRYPTION_MECHANISM = + key("encryption.mechanism") + .enumType(EncryptionMechanism.class) + .defaultValue(PLAINTEXT) + .withDescription( + "Encryption mechanism for paimon, the default value is plaintext, which means it is not encrypted."); + + public static final ConfigOption ENCRYPTION_KMS_CLIENT = + key("encryption.kms-client") + .enumType(EncryptionKmsClient.class) + .noDefaultValue() + .withDescription( + "The kms client for encryption, if the user has enabled encryption, the kms client must be specified."); + + /** The encryption mechanism for paimon. */ + public enum EncryptionMechanism implements DescribedEnum { + PLAINTEXT("plaintext", "Do not encrypt the data files."), + ENVELOPE("envelope", "Encrypt data file using envelope encryption mechanism."); + + private final String value; + private final String description; + + EncryptionMechanism(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } + + /** The kms client for encryption. */ + public enum EncryptionKmsClient implements DescribedEnum { + MEMORY( + "memory", + "Use memory kms for encryption, this is only for test, can not be used in production environment."), + HADOOP( + "hadoop", + "Use hadoop kms for encryption, parameters prefixed with `hadoop.security.` will be used to build hadoop kms client, " + + "the `hadoop.security.key.provider.path` is required. The hadoop parameters can be configured from catalog or environment," + + "please refer to `https://paimon.apache.org/docs/master/filesystems/hdfs/#hdfs-configuration`."); + + private final String value; + private final String description; + + EncryptionKmsClient(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java new file mode 100644 index 000000000000..e10883acb304 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/EncryptionUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import java.security.GeneralSecurityException; + +/** The encryption utils. */ +public class EncryptionUtils { + + private static final int NONCE_LENGTH = 12; + private static final int GCM_TAG_LENGTH = 16; + private static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH; + + public static byte[] encryptKeyLocally(byte[] plaintext, byte[] masterKey) { + AesGcmEncryptor aesGcmEncryptor = new AesGcmEncryptor(masterKey); + return aesGcmEncryptor.encrypt(plaintext); + } + + public static byte[] decryptKeyLocally(byte[] encryptedKey, byte[] masterKey) { + AesGcmDecryptor aesGcmDecryptor = new AesGcmDecryptor(masterKey); + return aesGcmDecryptor.encrypt(encryptedKey); + } + + private static class AesGcmEncryptor { + + private final Cipher cipher; + + public AesGcmEncryptor(byte[] masterKey) { + try { + cipher = Cipher.getInstance("AES/GCM/NoPadding"); + SecretKeySpec secretKeySpec = new SecretKeySpec(masterKey, "AES"); + byte[] nonce = new byte[NONCE_LENGTH]; + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce); + cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, spec); + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + public byte[] encrypt(byte[] plaintext) { + try { + return cipher.doFinal(plaintext); + } catch (IllegalBlockSizeException | BadPaddingException e) { + throw new RuntimeException(e); + } + } + } + + private static class AesGcmDecryptor { + + private final Cipher cipher; + + public AesGcmDecryptor(byte[] masterKey) { + try { + cipher = Cipher.getInstance("AES/GCM/NoPadding"); + SecretKeySpec secretKeySpec = new SecretKeySpec(masterKey, "AES"); + byte[] nonce = new byte[NONCE_LENGTH]; + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce); + cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, spec); + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + public byte[] encrypt(byte[] plaintext) { + try { + return cipher.doFinal(plaintext); + } catch (IllegalBlockSizeException | BadPaddingException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java b/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java new file mode 100644 index 000000000000..a0ae0e7a86c9 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/encryption/MemoryKMS.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.EncryptionUtils; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** This is only for test, not for production. */ +public class MemoryKMS implements KmsClient { + + private static final String IDENTIFIER = "memory"; + + private final Map keyMap = new ConcurrentHashMap<>(); + + @Override + public void configure(Options options) {} + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public byte[] wrapKey(byte[] dataKey, String masterKeyId) { + byte[] masterKey = keyMap.get(masterKeyId); + if (null == masterKey) { + throw new RuntimeException("Key not found: " + masterKeyId); + } + return EncryptionUtils.encryptKeyLocally(dataKey, masterKey); + } + + @Override + public byte[] unwrapKey(byte[] wrappedKey, String masterKeyId) { + byte[] masterKey = keyMap.get(masterKeyId); + if (null == masterKey) { + throw new RuntimeException("Key not found: " + masterKeyId); + } + return EncryptionUtils.decryptKeyLocally(wrappedKey, masterKey); + } + + @Override + public void close() throws IOException { + keyMap.clear(); + } +} diff --git a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient new file mode 100644 index 000000000000..f53f0777ae74 --- /dev/null +++ b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.encryption.KmsClient @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +org.apache.paimon.encryption.MemoryKMS \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java new file mode 100644 index 000000000000..7a22df7cad51 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/encryption/EncryptionManager.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +/** Module for encrypting and decrypting table data files, it must be serializable. */ +public interface EncryptionManager extends Serializable { + + String identifier(); + + void configure(Options options); + + FileIO decrypt(EnvelopeEncryptionManager.EncryptedFileIO encryptedFileIO); + + EnvelopeEncryptionManager.EncryptedFileIO encrypt(FileIO fileIO); + + default Map discoverKmsClient() { + Iterator iterator = + ServiceLoader.load(KmsClient.class, EncryptionManager.class.getClassLoader()) + .iterator(); + + Map map = new HashMap<>(); + while (iterator.hasNext()) { + KmsClient kmsClient = iterator.next(); + map.put(kmsClient.identifier(), kmsClient); + } + + return map; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java new file mode 100644 index 000000000000..1514a10a1330 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/encryption/EnvelopeEncryptionManager.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.EncryptionUtils; + +import java.security.SecureRandom; +import java.util.HashMap; +import java.util.Map; + +/** Encrypt data file using envelope encryption mechanism. */ +public class EnvelopeEncryptionManager implements EncryptionManager { + + private static final String IDENTIFIER = "envelope"; + private static final int KEY_ENCRYPTION_KEY_LENGTH = 16; + private static final SecureRandom RANDOM = new SecureRandom(); + // kekId -> kek + private static final Map KEY_MAP = new HashMap<>(); + private static KmsClient kmsClient; + + @Override + public void configure(Options options) { + Map map = discoverKmsClient(); + kmsClient = map.get(options.get(CatalogOptions.ENCRYPTION_MECHANISM.key())); + kmsClient.configure(options); + } + + @Override + public FileIO decrypt(EncryptedFileIO encryptedFileIO) { + return encryptedFileIO.fileIO(); + } + + @Override + public EncryptedFileIO encrypt(FileIO fileIO) { + return new EncryptedFileIO(fileIO); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + /** Wrapped encrypted FileIO. */ + public static class EncryptedFileIO { + private final FileIO fileIO; + private KeyMetadata keyMetadata; + private FileFormatFactory.FormatContext formatContext; + + public EncryptedFileIO(FileIO fileIO) { + this.fileIO = fileIO; + } + + public FileIO fileIO() { + return fileIO; + } + + /** + * when read file, we load the keyMetadata from manifest file, and set it. + * + * @param keyMetadata the key metadata + */ + public void keyMetadata(KeyMetadata keyMetadata) { + this.keyMetadata = keyMetadata; + } + + /** + * when write the KeyMetadata to manifest file, we generate the KeyMetadata from + * FormatContext. + * + * @return the KeyMetadata + */ + public KeyMetadata keyMetadata() { + if (null == keyMetadata) { + + if (formatContext != null && formatContext.plaintextDataKey() != null) { + byte[] kekId = formatContext.dataAADPrefix(); + + byte[] kekBytes = new byte[KEY_ENCRYPTION_KEY_LENGTH]; + RANDOM.nextBytes(kekBytes); + + byte[] wrappedDEK = + EncryptionUtils.encryptKeyLocally( + formatContext.plaintextDataKey(), kekBytes); + + KeyEncryptionKey kek = + KEY_MAP.computeIfAbsent( + kekId, + bytes -> { + byte[] wrappedKEK = + kmsClient.wrapKey( + kekBytes, + formatContext.encryptionTableId()); + return new KeyEncryptionKey(kekBytes, wrappedKEK); + }); + + this.keyMetadata = + new KeyMetadata( + kekId, kek.kek(), wrappedDEK, formatContext.dataAADPrefix()); + } + } + return keyMetadata; + } + + public void formatContext(FileFormatFactory.FormatContext formatContext) { + this.formatContext = formatContext; + } + + public FileFormatFactory.FormatContext formatContext() { + // when read file, decrypt the plaintext data key from keyMetadata. + if (null != keyMetadata && null == formatContext.plaintextDataKey()) { + KeyEncryptionKey kek = + KEY_MAP.computeIfAbsent( + keyMetadata.kekID(), + kekIdentifier -> { + byte[] kekBytes = + kmsClient.unwrapKey( + keyMetadata.wrappedKEK(), + formatContext.encryptionTableId()); + return new KeyEncryptionKey(kekBytes, keyMetadata.wrappedKEK()); + }); + + byte[] unWrappedDEK = + EncryptionUtils.decryptKeyLocally(keyMetadata.wrappedDEK(), kek.kek()); + formatContext = formatContext.newDataKey(unWrappedDEK); + } + + return formatContext; + } + } + + static class KeyEncryptionKey { + private final byte[] kek; + private final byte[] wrappedKEK; + + public KeyEncryptionKey(byte[] kek, byte[] wrappedKEK) { + this.kek = kek; + this.wrappedKEK = wrappedKEK; + } + + public byte[] kek() { + return kek; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java new file mode 100644 index 000000000000..143f948cd796 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadata.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.utils.SerializationUtils.newBytesType; + +/** Wrap information about the key. */ +public class KeyMetadata { + + private final byte[] kekID; // kek id, used to cache kek + private final byte[] wrappedKEK; // encrypted kek + private final byte[] wrappedDEK; // encrypted data key + private final byte[] aadPrefix; // aad prefix, only used for parquet format + + private KeyMetadata() { + this(new byte[0], new byte[0], new byte[0], new byte[0]); + } + + public KeyMetadata(byte[] kekID, byte[] wrappedKEK, byte[] wrappedDEK, byte[] aadPrefix) { + this.kekID = kekID; + this.wrappedKEK = wrappedKEK; + this.wrappedDEK = wrappedDEK; + this.aadPrefix = aadPrefix; + } + + public static KeyMetadata emptyKeyMetadata() { + return new KeyMetadata(); + } + + public static RowType schema() { + List fields = new ArrayList<>(); + fields.add(new DataField(1, "_KEK_ID", newBytesType(false))); + fields.add(new DataField(1, "_WRAPPED_KEK", newBytesType(false))); + fields.add(new DataField(1, "_WRAPPED_DEK", newBytesType(false))); + fields.add(new DataField(2, "_AAD_PREFIX", newBytesType(false))); + return new RowType(fields); + } + + public byte[] aadPrefix() { + return aadPrefix; + } + + public byte[] kekID() { + return kekID; + } + + public byte[] wrappedKEK() { + return wrappedKEK; + } + + public byte[] wrappedDEK() { + return wrappedDEK; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof KeyMetadata)) { + return false; + } + KeyMetadata that = (KeyMetadata) o; + return Arrays.equals(kekID, that.kekID) + && Arrays.equals(wrappedKEK, that.wrappedKEK) + && Arrays.equals(wrappedDEK, that.wrappedDEK) + && Arrays.equals(aadPrefix, that.aadPrefix); + } + + @Override + public int hashCode() { + return Objects.hash( + Arrays.hashCode(kekID), + Arrays.hashCode(wrappedKEK), + Arrays.hashCode(wrappedDEK), + Arrays.hashCode(aadPrefix)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java new file mode 100644 index 000000000000..984f59426d61 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/encryption/KeyMetadataSerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link KeyMetadata}. */ +public class KeyMetadataSerializer extends ObjectSerializer { + + public KeyMetadataSerializer() { + super(KeyMetadata.schema()); + } + + @Override + public InternalRow toRow(KeyMetadata record) { + return GenericRow.of( + record.kekID(), record.wrappedKEK(), record.wrappedDEK(), record.aadPrefix()); + } + + @Override + public KeyMetadata fromRow(InternalRow rowData) { + return new KeyMetadata( + rowData.getBinary(0), + rowData.getBinary(1), + rowData.getBinary(2), + rowData.getBinary(3)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java b/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java new file mode 100644 index 000000000000..0270b4a364de --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/encryption/PlaintextEncryptionManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +/** Do not encrypt the data files. */ +public class PlaintextEncryptionManager implements EncryptionManager { + + public static final String IDENTIFIER = "plaintext"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public void configure(Options options) {} + + @Override + public FileIO decrypt(EnvelopeEncryptionManager.EncryptedFileIO encryptedFileIO) { + return encryptedFileIO.fileIO(); + } + + @Override + public EnvelopeEncryptionManager.EncryptedFileIO encrypt(FileIO fileIO) { + return new EnvelopeEncryptionManager.EncryptedFileIO(fileIO); + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager new file mode 100644 index 000000000000..d8a843eea72a --- /dev/null +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.encryption.EncryptionManager @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +org.apache.paimon.encryption.PlaintextEncryptionManager +org.apache.paimon.encryption.EnvelopeEncryptionManager \ No newline at end of file diff --git a/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java new file mode 100644 index 000000000000..d69665180246 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/encryption/KeyMetadataSerializerTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.encryption; + +import org.apache.paimon.utils.ObjectSerializer; +import org.apache.paimon.utils.ObjectSerializerTestBase; + +/** Tests for {@link KeyMetadataSerializer}. */ +public class KeyMetadataSerializerTest extends ObjectSerializerTestBase { + + @Override + protected ObjectSerializer serializer() { + return new KeyMetadataSerializer(); + } + + @Override + protected KeyMetadata object() { + return new KeyMetadata( + "kekID".getBytes(), + "wrappedKEK".getBytes(), + "wrappedDEK".getBytes(), + "aadPrefix".getBytes()); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java index d9927d66f4d1..a31d935deaef 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java @@ -35,10 +35,12 @@ public String identifier() { @Override public OrcFileFormat create(FormatContext formatContext) { - return new OrcFileFormat( - new FormatContext( - supplyDefaultOptions(formatContext.formatOptions()), - formatContext.readBatchSize())); + FormatContext context = + FileFormatFactory.formatContextBuilder() + .formatOptions(supplyDefaultOptions(formatContext.formatOptions())) + .readBatchSize(formatContext.readBatchSize()) + .build(); + return new OrcFileFormat(context); } private Options supplyDefaultOptions(Options options) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java index da1cfab74237..8f003af150fc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java @@ -37,10 +37,12 @@ public String identifier() { @Override public ParquetFileFormat create(FormatContext formatContext) { - return new ParquetFileFormat( - new FormatContext( - supplyDefaultOptions(formatContext.formatOptions()), - formatContext.readBatchSize())); + FormatContext context = + FileFormatFactory.formatContextBuilder() + .formatOptions(supplyDefaultOptions(formatContext.formatOptions())) + .readBatchSize(formatContext.readBatchSize()) + .build(); + return new ParquetFileFormat(context); } private Options supplyDefaultOptions(Options options) { diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index fa80ec11da0d..9588146c3c8e 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.format.orc; +import org.apache.paimon.format.FileFormatFactory; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; @@ -39,7 +40,12 @@ public class OrcFileFormatTest { public void testAbsent() { Options options = new Options(); options.setString("haha", "1"); - OrcFileFormat orc = new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + FormatContext context = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + OrcFileFormat orc = new OrcFileFormatFactory().create(context); assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1"); assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("lz4"); } @@ -49,15 +55,24 @@ public void testPresent() { Options options = new Options(); options.setString("haha", "1"); options.setString("compress", "zlib"); - OrcFileFormat orc = new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + FormatContext context = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + OrcFileFormat orc = new OrcFileFormatFactory().create(context); assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1"); assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("zlib"); } @Test public void testSupportedDataTypes() { - OrcFileFormat orc = - new OrcFileFormatFactory().create(new FormatContext(new Options(), 1024)); + FormatContext context = + FileFormatFactory.formatContextBuilder() + .formatOptions(new Options()) + .readBatchSize(1024) + .build(); + OrcFileFormat orc = new OrcFileFormatFactory().create(context); int index = 0; List dataFields = new ArrayList(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java index 3e68058622ed..18b3422061aa 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java @@ -32,6 +32,11 @@ protected OrcFormatReadWriteTest() { @Override protected FileFormat fileFormat() { - return new OrcFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024)); + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(new Options()) + .readBatchSize(1024) + .build(); + return new OrcFileFormat(formatContext); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java index 0bf90b38a4a7..831264860afb 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcZstdTest.java @@ -62,9 +62,13 @@ void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOExceptio options.set("compress", "zstd"); options.set("stripe.size", "31457280"); options.set("compression.zstd.level", "1"); - OrcFileFormat orc = - new OrcFileFormatFactory() - .create(new FileFormatFactory.FormatContext(options, 1024)); + + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + OrcFileFormat orc = new OrcFileFormatFactory().create(formatContext); Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class); Assertions.assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")) diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java index 7b687ec27e4f..b4302217d187 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java @@ -18,7 +18,7 @@ package org.apache.paimon.format.parquet; -import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FileFormatFactory; import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; @@ -46,8 +46,12 @@ public class ParquetFileFormatTest { @Test public void testAbsent() { Options options = new Options(); - ParquetFileFormat parquet = - new ParquetFileFormatFactory().create(new FormatContext(options, 1024)); + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext); assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("absent"); } @@ -55,8 +59,12 @@ public void testAbsent() { public void testPresent() { Options options = new Options(); options.setString(KEY1.key(), "v1"); - ParquetFileFormat parquet = - new ParquetFileFormatFactory().create(new FormatContext(options, 1024)); + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(options) + .readBatchSize(1024) + .build(); + ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext); assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("v1"); } @@ -85,8 +93,12 @@ public void testFileCompressionHigherPreference() { @Test public void testSupportedDataFields() { - ParquetFileFormat parquet = - new ParquetFileFormatFactory().create(new FormatContext(new Options(), 1024)); + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(new Options()) + .readBatchSize(1024) + .build(); + ParquetFileFormat parquet = new ParquetFileFormatFactory().create(formatContext); int index = 0; List dataFields = new ArrayList(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index 4cfcffe220f8..b623cbc5eaf4 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -32,6 +32,12 @@ protected ParquetFormatReadWriteTest() { @Override protected FileFormat fileFormat() { - return new ParquetFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024)); + FileFormatFactory.FormatContext formatContext = + FileFormatFactory.formatContextBuilder() + .formatOptions(new Options()) + .readBatchSize(1024) + .build(); + + return new ParquetFileFormat(formatContext); } }