Skip to content

Commit

Permalink
[AMORO-2401] Support reading encrypted iceberg data files (apache#2402)
Browse files Browse the repository at this point in the history
Support reading iceberg encrypted files
  • Loading branch information
liaoyt authored Mar 8, 2024
1 parent 168688b commit a294917
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
Expand All @@ -46,10 +49,13 @@ public class BaseIcebergPosDeleteReader {
POS_DELETE_SCHEMA.accessorForField(MetadataColumns.DELETE_FILE_POS.fieldId());

protected final ArcticFileIO fileIO;
protected final EncryptionManager encryptionManager;
protected final List<DeleteFile> posDeleteFiles;

public BaseIcebergPosDeleteReader(ArcticFileIO fileIO, List<DeleteFile> posDeleteFiles) {
public BaseIcebergPosDeleteReader(
ArcticFileIO fileIO, EncryptionManager encryptionManager, List<DeleteFile> posDeleteFiles) {
this.fileIO = fileIO;
this.encryptionManager = encryptionManager;
this.posDeleteFiles = posDeleteFiles;
}

Expand All @@ -67,7 +73,11 @@ public Long readPos(Record record) {
}

private CloseableIterable<Record> readDelete(DeleteFile deleteFile) {
InputFile input = fileIO.newInputFile(deleteFile.path().toString());
EncryptedInputFile encryptedInput =
EncryptedFiles.encryptedInput(
fileIO.newInputFile(deleteFile.path().toString()), deleteFile.keyMetadata());
InputFile input = encryptionManager.decrypt(encryptedInput);

switch (deleteFile.format()) {
case PARQUET:
Parquet.ReadBuilder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public boolean isFilterEqDelete() {
return filterEqDelete;
}

protected abstract InputFile getInputFile(String location);
protected abstract InputFile getInputFile(ContentFile<?> contentFile);

protected abstract ArcticFileIO getArcticFileIo();

Expand Down Expand Up @@ -387,7 +387,7 @@ private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
}

private CloseableIterable<Record> openFile(ContentFile<?> contentFile, Schema deleteSchema) {
InputFile input = getInputFile(contentFile.path().toString());
InputFile input = getInputFile(contentFile);
switch (contentFile.format()) {
case AVRO:
return Avro.read(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.optimizing.RewriteFilesInput;
import com.netease.arctic.scan.CombinedIcebergScanTask;
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -32,6 +33,9 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -61,6 +65,7 @@ public class GenericCombinedIcebergDataReader implements OptimizingDataReader {
protected final String nameMapping;
protected final boolean caseSensitive;
protected final ArcticFileIO fileIO;
protected final EncryptionManager encryptionManager;
protected final BiFunction<Type, Object, Object> convertConstant;
protected final boolean reuseContainer;
protected CombinedDeleteFilter<Record> deleteFilter;
Expand All @@ -73,6 +78,7 @@ public GenericCombinedIcebergDataReader(
ArcticFileIO fileIO,
Schema tableSchema,
PartitionSpec spec,
EncryptionManager encryptionManager,
String nameMapping,
boolean caseSensitive,
BiFunction<Type, Object, Object> convertConstant,
Expand All @@ -81,6 +87,7 @@ public GenericCombinedIcebergDataReader(
RewriteFilesInput rewriteFilesInput) {
this.tableSchema = tableSchema;
this.spec = spec;
this.encryptionManager = encryptionManager;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.fileIO = fileIO;
Expand Down Expand Up @@ -164,7 +171,10 @@ private CloseableIterable<Record> openFile(

private CloseableIterable<Record> openFile(
DataFile dataFile, Schema fileProjection, Map<Integer, ?> idToConstant) {
InputFile input = fileIO.newInputFile(dataFile.path().toString());
EncryptedInputFile encryptedInput =
EncryptedFiles.encryptedInput(
fileIO.newInputFile(dataFile.path().toString()), dataFile.keyMetadata());
InputFile input = encryptionManager.decrypt(encryptedInput);

switch (dataFile.format()) {
case AVRO:
Expand Down Expand Up @@ -296,8 +306,11 @@ public GenericDeleteFilter(
}

@Override
protected InputFile getInputFile(String location) {
return fileIO.newInputFile(location);
protected InputFile getInputFile(ContentFile<?> contentFile) {
EncryptedInputFile encryptedInput =
EncryptedFiles.encryptedInput(
fileIO.newInputFile(contentFile.path().toString()), contentFile.keyMetadata());
return encryptionManager.decrypt(encryptedInput);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected OptimizingDataReader dataReader() {
io,
table.schema(),
table.spec(),
table.asUnkeyedTable().encryption(),
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING),
false,
IdentityPartitionConverters::convertConstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public void readAllData() throws IOException {
getArcticTable().io(),
getArcticTable().schema(),
getArcticTable().spec(),
getArcticTable().asUnkeyedTable().encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand All @@ -212,6 +213,7 @@ public void readAllDataNegate() throws IOException {
getArcticTable().io(),
getArcticTable().schema(),
getArcticTable().spec(),
getArcticTable().asUnkeyedTable().encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand All @@ -235,6 +237,7 @@ public void readOnlyData() throws IOException {
getArcticTable().io(),
getArcticTable().schema(),
getArcticTable().spec(),
getArcticTable().asUnkeyedTable().encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand All @@ -254,6 +257,7 @@ public void readOnlyDataNegate() throws IOException {
getArcticTable().io(),
getArcticTable().schema(),
getArcticTable().spec(),
getArcticTable().asUnkeyedTable().encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand All @@ -274,6 +278,7 @@ public void readDataEnableFilterEqDelete() throws IOException {
getArcticTable().io(),
getArcticTable().schema(),
getArcticTable().spec(),
getArcticTable().asUnkeyedTable().encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void valid() throws IOException {
table.io(),
table.schema(),
table.spec(),
table.encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand Down Expand Up @@ -189,6 +190,7 @@ public void readDataEnableFilterEqDelete() throws IOException {
table.io(),
table.schema(),
table.spec(),
table.encryption(),
null,
false,
IdentityPartitionConverters::convertConstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void testReadPosDelete() {
BaseIcebergPosDeleteReader baseIcebergPosDeleteReader =
new BaseIcebergPosDeleteReader(
getArcticTable().asKeyedTable().io(),
getArcticTable().asKeyedTable().baseTable().encryption(),
Collections.singletonList(deleteFileOfPositionDelete));
ImmutableList.Builder<Record> builder = ImmutableList.builder();
baseIcebergPosDeleteReader.readDeletes().forEach(record -> builder.add(record.copy()));
Expand Down

0 comments on commit a294917

Please sign in to comment.