diff --git a/iceberg/checkstyle/checkstyle.xml b/iceberg/checkstyle/checkstyle.xml
index 7a595e6cbe17..8f2cb2f9a1bb 100644
--- a/iceberg/checkstyle/checkstyle.xml
+++ b/iceberg/checkstyle/checkstyle.xml
@@ -423,7 +423,9 @@
-
+
+
+
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/CachingDeleteLoader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/CachingDeleteLoader.java
new file mode 100644
index 000000000000..aeef6b38602c
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/CachingDeleteLoader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.io.InputFile;
+
+public class CachingDeleteLoader extends BaseDeleteLoader {
+ private final ObjectCache cache;
+
+ public CachingDeleteLoader(Function loadInputFile, Configuration conf) {
+ super(loadInputFile);
+
+ String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_QUERY_ID);
+ this.cache = ObjectCacheFactory.getCache(conf, queryId, false);
+ }
+
+ @Override
+ protected boolean canCache(long size) {
+ return cache != null;
+ }
+
+ @Override
+ protected V getOrLoad(String key, Supplier valueSupplier, long valueSize) {
+ try {
+ return cache.retrieve(key, valueSupplier::get);
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
index a87470ae4d95..8ed185e79960 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
@@ -22,12 +22,15 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.NoSuchElementException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.CachingDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
@@ -40,11 +43,19 @@ public class HiveDeleteFilter extends DeleteFilter {
private final FileIO io;
private final HiveStructLike asStructLike;
+ private final Configuration conf;
- public HiveDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+ public HiveDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema,
+ Configuration conf) {
super((task.file()).path().toString(), task.deletes(), tableSchema, requestedSchema);
this.io = io;
this.asStructLike = new HiveStructLike(this.requiredSchema().asStruct());
+ this.conf = conf;
+ }
+
+ @Override
+ protected DeleteLoader newDeleteLoader() {
+ return new CachingDeleteLoader(this::loadInputFile, conf);
}
@Override
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index c92b4617cbdd..dc13857d1969 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -88,7 +88,8 @@ public static CloseableIterable reader(Table table, Path path,
Schema requiredSchema = readSchema;
if (!task.deletes().isEmpty()) {
- deleteFilter = new HiveDeleteFilter(table.io(), task, table.schema(), prepareSchemaForDeleteFilter(readSchema));
+ deleteFilter = new HiveDeleteFilter(table.io(), task, table.schema(), prepareSchemaForDeleteFilter(readSchema),
+ context.getConfiguration());
requiredSchema = deleteFilter.requiredSchema();
// TODO: take requiredSchema and adjust readColumnIds below accordingly for equality delete cases
// and remove below limitation
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
index 53ee0d870f8b..e9a86f40e62f 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
@@ -39,9 +39,9 @@
public abstract class AbstractIcebergRecordReader extends RecordReader {
private TaskAttemptContext context;
- private Configuration conf;
- private Table table;
- private Schema expectedSchema;
+ protected Configuration conf;
+ protected Table table;
+ protected Schema expectedSchema;
private String nameMapping;
private boolean reuseContainers;
private boolean caseSensitive;
@@ -106,26 +106,14 @@ public TaskAttemptContext getContext() {
return context;
}
- public Configuration getConf() {
- return conf;
- }
-
public boolean isReuseContainers() {
return reuseContainers;
}
- public Schema getExpectedSchema() {
- return expectedSchema;
- }
-
public String getNameMapping() {
return nameMapping;
}
- public Table getTable() {
- return table;
- }
-
public InputFormatConfig.InMemoryDataModel getInMemoryDataModel() {
return inMemoryDataModel;
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
index 7871e8325d34..17808c77f7cc 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
@@ -55,11 +55,11 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) {
}
private CloseableIterator nextTask() {
- CloseableIterator closeableIterator = openGeneric(mergeSplit.getContentFile(), getTable().schema()).iterator();
+ CloseableIterator closeableIterator = openGeneric(mergeSplit.getContentFile(), table.schema()).iterator();
if (mergeSplit.getContentFile() instanceof DeleteFile) {
- Schema deleteSchema = IcebergAcidUtil.createSerdeSchemaForDelete(getTable().schema().columns());
+ Schema deleteSchema = IcebergAcidUtil.createSerdeSchemaForDelete(table.schema().columns());
return new IcebergAcidUtil.MergeTaskVirtualColumnAwareIterator<>(closeableIterator,
- deleteSchema, getConf(), mergeSplit.getContentFile(), getTable());
+ deleteSchema, conf, mergeSplit.getContentFile(), table);
} else {
return closeableIterator;
}
@@ -95,8 +95,8 @@ private CloseableIterable openGeneric(ContentFile contentFile, Schema readSch
} else if (contentFile instanceof DeleteFile) {
schema = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);
}
- InputFile inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
- getTable().io().newInputFile(contentFile.path().toString()),
+ InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
+ table.io().newInputFile(contentFile.path().toString()),
contentFile.keyMetadata()));
CloseableIterable iterable;
switch (contentFile.format()) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
index 43efb4fc552e..916082fe469d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Collections;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -44,7 +43,9 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.data.CachingDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.avro.DataReader;
@@ -62,6 +63,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
@@ -89,7 +91,7 @@ public final class IcebergRecordReader extends AbstractIcebergRecordReader
}
}
- private Iterator tasks;
+ private Iterable tasks;
private CloseableIterator currentIterator;
private T current;
@@ -98,32 +100,28 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
super.initialize(split, newContext);
CombinedScanTask task = ((IcebergSplit) split).task();
- this.tasks = task.files().iterator();
+ this.tasks = task.files();
this.currentIterator = nextTask();
}
private CloseableIterator nextTask() {
- CloseableIterator closeableIterator = open(tasks.next(), getExpectedSchema()).iterator();
- if (!isFetchVirtualColumns() || Utilities.getIsVectorized(getConf())) {
+ CloseableIterator closeableIterator = CloseableIterable.concat(
+ Iterables.transform(tasks, task -> open(task, expectedSchema))).iterator();
+ if (!isFetchVirtualColumns() || Utilities.getIsVectorized(conf)) {
return closeableIterator;
}
- return new IcebergAcidUtil.VirtualColumnAwareIterator(closeableIterator, getExpectedSchema(),
- getConf(), getTable());
+ return new IcebergAcidUtil.VirtualColumnAwareIterator<>(closeableIterator,
+ expectedSchema, conf, table);
}
@Override
public boolean nextKeyValue() throws IOException {
- while (true) {
- if (currentIterator.hasNext()) {
- current = currentIterator.next();
- return true;
- } else if (tasks.hasNext()) {
- currentIterator.close();
- this.currentIterator = nextTask();
- } else {
- currentIterator.close();
- return false;
- }
+ if (currentIterator.hasNext()) {
+ current = currentIterator.next();
+ return true;
+ } else {
+ currentIterator.close();
+ return false;
}
}
@@ -149,24 +147,24 @@ private CloseableIterable openVectorized(FileScanTask task, Schema readSchema
Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration());
// TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
- CloseableIterable iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(getTable(), path, task,
+ CloseableIterable iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
idToConstant, getContext(), residual, readSchema);
return applyResidualFiltering(iterator, residual, readSchema);
}
- private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) {
+ private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) {
if (task.isDataTask()) {
// When querying metadata tables, the currentTask is a DataTask and the data has to
// be fetched from the task instead of reading it from files.
IcebergInternalRecordWrapper wrapper =
- new IcebergInternalRecordWrapper(getTable().schema().asStruct(), readSchema.asStruct());
- return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row));
+ new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct());
+ return CloseableIterable.transform(((DataTask) task).rows(), wrapper::wrap);
}
DataFile file = task.file();
- InputFile inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
- getTable().io().newInputFile(file.path().toString()),
+ InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
+ table.io().newInputFile(file.path().toString()),
file.keyMetadata()));
CloseableIterable iterable;
@@ -197,7 +195,12 @@ private CloseableIterable open(FileScanTask currentTask, Schema readSchema) {
case HIVE:
return openVectorized(currentTask, readSchema);
case GENERIC:
- DeleteFilter deletes = new GenericDeleteFilter(getTable().io(), currentTask, getTable().schema(), readSchema);
+ DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema) {
+ @Override
+ protected DeleteLoader newDeleteLoader() {
+ return new CachingDeleteLoader(this::loadInputFile, conf);
+ }
+ };
Schema requiredSchema = deletes.requiredSchema();
return deletes.filter(openGeneric(currentTask, requiredSchema));
default:
@@ -277,13 +280,13 @@ private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask ta
private Map constantsMap(FileScanTask task, BiFunction converter) {
PartitionSpec spec = task.spec();
Set idColumns = spec.identitySourceIds();
- Schema partitionSchema = TypeUtil.select(getExpectedSchema(), idColumns);
+ Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
- if (getExpectedSchema().findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
- Types.StructType partitionType = Partitioning.partitionType(getTable());
+ if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
+ Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, converter);
} else if (projectsIdentityPartitionColumns) {
- Types.StructType partitionType = Partitioning.partitionType(getTable());
+ Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, converter);
} else {
return Collections.emptyMap();
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java
index fae2e207ba2a..0209c8237a4a 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java
@@ -98,8 +98,9 @@ public void testRecordReaderShouldReuseFooter() throws IOException, InterruptedE
try (MockedStatic mockedParquetFileReader = Mockito.mockStatic(ParquetFileReader.class,
Mockito.CALLS_REAL_METHODS)) {
for (InputSplit split : splits) {
- try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
- reader.initialize(split, context);
+ try (RecordReader r = inputFormat.createRecordReader(split, context)) {
+ r.initialize(split, context);
+ r.nextKeyValue();
}
}
mockedParquetFileReader.verify(times(1), () ->