Skip to content

Commit

Permalink
HIVE-28452: Iceberg: Cache delete files on executors (Denys Kuzmenko,…
Browse files Browse the repository at this point in the history
… reviewed by Butao Zhang)

Closes apache#5397
  • Loading branch information
deniskuzZ authored Aug 23, 2024
1 parent 407ae67 commit 90391c6
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 54 deletions.
4 changes: 3 additions & 1 deletion iceberg/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@
<property name="processJavadoc" value="true"/>
</module>
<module name="UpperEll"/> <!-- Java Style Guide: Numeric Literals -->
<module name="VisibilityModifier"/> <!-- Java Coding Guidelines: Minimize mutability -->
<module name="VisibilityModifier"> <!-- Java Coding Guidelines: Minimize mutability -->
<property name="protectedAllowed" value="true"/>
</module>
<module name="WhitespaceAfter"/> <!-- Java Style Guide: Horizontal whitespace -->
<module name="WhitespaceAround"> <!-- Java Style Guide: Horizontal whitespace -->
<property name="allowEmptyConstructors" value="true"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.data;

import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.InputFile;

public class CachingDeleteLoader extends BaseDeleteLoader {
private final ObjectCache cache;

public CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile, Configuration conf) {
super(loadInputFile);

String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_QUERY_ID);
this.cache = ObjectCacheFactory.getCache(conf, queryId, false);
}

@Override
protected boolean canCache(long size) {
return cache != null;
}

@Override
protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long valueSize) {
try {
return cache.retrieve(key, valueSupplier::get);
} catch (HiveException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.CachingDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
Expand All @@ -40,11 +43,19 @@ public class HiveDeleteFilter extends DeleteFilter<HiveRow> {

private final FileIO io;
private final HiveStructLike asStructLike;
private final Configuration conf;

public HiveDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema) {
public HiveDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema,
Configuration conf) {
super((task.file()).path().toString(), task.deletes(), tableSchema, requestedSchema);
this.io = io;
this.asStructLike = new HiveStructLike(this.requiredSchema().asStruct());
this.conf = conf;
}

@Override
protected DeleteLoader newDeleteLoader() {
return new CachingDeleteLoader(this::loadInputFile, conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
Schema requiredSchema = readSchema;

if (!task.deletes().isEmpty()) {
deleteFilter = new HiveDeleteFilter(table.io(), task, table.schema(), prepareSchemaForDeleteFilter(readSchema));
deleteFilter = new HiveDeleteFilter(table.io(), task, table.schema(), prepareSchemaForDeleteFilter(readSchema),
context.getConfiguration());
requiredSchema = deleteFilter.requiredSchema();
// TODO: take requiredSchema and adjust readColumnIds below accordingly for equality delete cases
// and remove below limitation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
public abstract class AbstractIcebergRecordReader<T> extends RecordReader<Void, T> {

private TaskAttemptContext context;
private Configuration conf;
private Table table;
private Schema expectedSchema;
protected Configuration conf;
protected Table table;
protected Schema expectedSchema;
private String nameMapping;
private boolean reuseContainers;
private boolean caseSensitive;
Expand Down Expand Up @@ -106,26 +106,14 @@ public TaskAttemptContext getContext() {
return context;
}

public Configuration getConf() {
return conf;
}

public boolean isReuseContainers() {
return reuseContainers;
}

public Schema getExpectedSchema() {
return expectedSchema;
}

public String getNameMapping() {
return nameMapping;
}

public Table getTable() {
return table;
}

public InputFormatConfig.InMemoryDataModel getInMemoryDataModel() {
return inMemoryDataModel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) {
}

private CloseableIterator<T> nextTask() {
CloseableIterator<T> closeableIterator = openGeneric(mergeSplit.getContentFile(), getTable().schema()).iterator();
CloseableIterator<T> closeableIterator = openGeneric(mergeSplit.getContentFile(), table.schema()).iterator();
if (mergeSplit.getContentFile() instanceof DeleteFile) {
Schema deleteSchema = IcebergAcidUtil.createSerdeSchemaForDelete(getTable().schema().columns());
Schema deleteSchema = IcebergAcidUtil.createSerdeSchemaForDelete(table.schema().columns());
return new IcebergAcidUtil.MergeTaskVirtualColumnAwareIterator<>(closeableIterator,
deleteSchema, getConf(), mergeSplit.getContentFile(), getTable());
deleteSchema, conf, mergeSplit.getContentFile(), table);
} else {
return closeableIterator;
}
Expand Down Expand Up @@ -95,8 +95,8 @@ private CloseableIterable<T> openGeneric(ContentFile contentFile, Schema readSch
} else if (contentFile instanceof DeleteFile) {
schema = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);
}
InputFile inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
getTable().io().newInputFile(contentFile.path().toString()),
InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
table.io().newInputFile(contentFile.path().toString()),
contentFile.keyMetadata()));
CloseableIterable<T> iterable;
switch (contentFile.format()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -44,7 +43,9 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.CachingDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.avro.DataReader;
Expand All @@ -62,6 +63,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -89,7 +91,7 @@ public final class IcebergRecordReader<T> extends AbstractIcebergRecordReader<T>
}
}

private Iterator<FileScanTask> tasks;
private Iterable<FileScanTask> tasks;
private CloseableIterator<T> currentIterator;
private T current;

Expand All @@ -98,32 +100,28 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
super.initialize(split, newContext);
CombinedScanTask task = ((IcebergSplit) split).task();
this.tasks = task.files().iterator();
this.tasks = task.files();
this.currentIterator = nextTask();
}

private CloseableIterator<T> nextTask() {
CloseableIterator<T> closeableIterator = open(tasks.next(), getExpectedSchema()).iterator();
if (!isFetchVirtualColumns() || Utilities.getIsVectorized(getConf())) {
CloseableIterator<T> closeableIterator = CloseableIterable.concat(
Iterables.transform(tasks, task -> open(task, expectedSchema))).iterator();
if (!isFetchVirtualColumns() || Utilities.getIsVectorized(conf)) {
return closeableIterator;
}
return new IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, getExpectedSchema(),
getConf(), getTable());
return new IcebergAcidUtil.VirtualColumnAwareIterator<>(closeableIterator,
expectedSchema, conf, table);
}

@Override
public boolean nextKeyValue() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
return true;
} else if (tasks.hasNext()) {
currentIterator.close();
this.currentIterator = nextTask();
} else {
currentIterator.close();
return false;
}
if (currentIterator.hasNext()) {
current = currentIterator.next();
return true;
} else {
currentIterator.close();
return false;
}
}

Expand All @@ -149,24 +147,24 @@ private CloseableIterable<T> openVectorized(FileScanTask task, Schema readSchema
Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration());

// TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(getTable(), path, task,
CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
idToConstant, getContext(), residual, readSchema);

return applyResidualFiltering(iterator, residual, readSchema);
}

private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) {
private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) {
if (task.isDataTask()) {
// When querying metadata tables, the currentTask is a DataTask and the data has to
// be fetched from the task instead of reading it from files.
IcebergInternalRecordWrapper wrapper =
new IcebergInternalRecordWrapper(getTable().schema().asStruct(), readSchema.asStruct());
return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row));
new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct());
return CloseableIterable.transform(((DataTask) task).rows(), wrapper::wrap);
}

DataFile file = task.file();
InputFile inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
getTable().io().newInputFile(file.path().toString()),
InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
table.io().newInputFile(file.path().toString()),
file.keyMetadata()));

CloseableIterable<T> iterable;
Expand Down Expand Up @@ -197,7 +195,12 @@ private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
case HIVE:
return openVectorized(currentTask, readSchema);
case GENERIC:
DeleteFilter deletes = new GenericDeleteFilter(getTable().io(), currentTask, getTable().schema(), readSchema);
DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema) {
@Override
protected DeleteLoader newDeleteLoader() {
return new CachingDeleteLoader(this::loadInputFile, conf);
}
};
Schema requiredSchema = deletes.requiredSchema();
return deletes.filter(openGeneric(currentTask, requiredSchema));
default:
Expand Down Expand Up @@ -277,13 +280,13 @@ private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask ta
private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
Schema partitionSchema = TypeUtil.select(getExpectedSchema(), idColumns);
Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
if (getExpectedSchema().findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
Types.StructType partitionType = Partitioning.partitionType(getTable());
if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, converter);
} else if (projectsIdentityPartitionColumns) {
Types.StructType partitionType = Partitioning.partitionType(getTable());
Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, converter);
} else {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ public void testRecordReaderShouldReuseFooter() throws IOException, InterruptedE
try (MockedStatic<ParquetFileReader> mockedParquetFileReader = Mockito.mockStatic(ParquetFileReader.class,
Mockito.CALLS_REAL_METHODS)) {
for (InputSplit split : splits) {
try (RecordReader<Void, ?> reader = inputFormat.createRecordReader(split, context)) {
reader.initialize(split, context);
try (RecordReader<Void, ?> r = inputFormat.createRecordReader(split, context)) {
r.initialize(split, context);
r.nextKeyValue();
}
}
mockedParquetFileReader.verify(times(1), () ->
Expand Down

0 comments on commit 90391c6

Please sign in to comment.