Skip to content

Commit

Permalink
Fix commit log parsing of Delta tables with delete vector
Browse files Browse the repository at this point in the history
- Add support for `tightBounds` property in Delta stats representation
- Correct handling of delete vectors to avoid adding data file paths to both new and removed file sets in `FileDiff`
  • Loading branch information
ashvina committed Dec 9, 2024
1 parent 3b67fae commit b190e66
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;

import org.apache.xtable.exception.NotSupportedException;
Expand Down Expand Up @@ -106,4 +107,24 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
}
return tableBasePath + Path.SEPARATOR + dataFilePath;
}

/**
* Extracts the representation of the deletion vector information corresponding to an AddFile
* action. Currently, this method extracts and returns the path to the data file for which a
* deletion vector data is present.
*
* @param snapshot the commit snapshot
* @param addFile the add file action
* @return the deletion vector representation (path of data file), or null if no deletion vector
* is present
*/
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}

String dataFilePath = addFile.path();
return getFullPathToFile(snapshot, dataFilePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -99,11 +101,16 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
Set<InternalDataFile> addedFiles = new HashSet<>();
Set<InternalDataFile> removedFiles = new HashSet<>();

// All 3 of the following maps use data file path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();

for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
addedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
Expand All @@ -112,19 +119,47 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance()));
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
}
} else if (action instanceof RemoveFile) {
removedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
DeltaPartitionExtractor.getInstance()));
DeltaPartitionExtractor.getInstance());
removedFiles.put(dataFile.getPhysicalPath(), dataFile);
}
}

// In Delta Lake if delete vector information is added for an existing data file, as a result of
// a delete operation, then a new RemoveFile action is added to the commit log to remove the old
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
removedFiles.remove(deletionVector);
} else {
log.warn(
"No Remove action found for the data file for which deletion vector is added {}. This is unexpected.",
deletionVector);
}
}

DataFilesDiff dataFilesDiff =
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
DataFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ private static class DeltaStats {
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;
boolean tightBounds;
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -42,7 +43,13 @@

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.ValidationTestHelper;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.TableFormat;

public class ITDeltaDeleteVectorConvert {
@TempDir private static Path tempDir;
Expand Down Expand Up @@ -147,18 +154,32 @@ public void testInsertsUpsertsAndDeletes() {
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(228L, testSparkDeltaTable.getNumRows());

// TODO conversion fails if delete vectors are enabled, this is because of missing handlers for
// deletion files.
// TODO pending for another PR
// SourceTable tableConfig =
// SourceTable.builder()
// .name(testSparkDeltaTable.getTableName())
// .basePath(testSparkDeltaTable.getBasePath())
// .formatName(TableFormat.DELTA)
// .build();
// DeltaConversionSource conversionSource =
// conversionSourceProvider.getConversionSourceInstance(tableConfig);
// InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
SourceTable tableConfig =
SourceTable.builder()
.name(testSparkDeltaTable.getTableName())
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();

// validateDeltaPartitioning(internalSnapshot);
ValidationTestHelper.validateSnapshot(
internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));

// Get changes in incremental format.
InstantsForIncrementalSync instantsForIncrementalSync =
InstantsForIncrementalSync.builder()
.lastSyncInstant(Instant.ofEpochMilli(timestamp1))
.build();
CommitsBacklog<Long> commitsBacklog =
conversionSource.getCommitsBacklog(instantsForIncrementalSync);
for (Long version : commitsBacklog.getCommitsToProcess()) {
TableChange tableChange = conversionSource.getTableChangeForCommit(version);
allTableChanges.add(tableChange);
}
ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
}

private void validateDeletedRecordCount(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.delta;

import java.net.URISyntaxException;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;

import scala.Option;

class TestDeltaActionsConverter {

@Test
void extractDeletionVector() throws URISyntaxException {
DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance();

int size = 123;
long time = 234L;
boolean dataChange = true;
String stats = "";
String filePath = "file:///file_path";
Snapshot snapshot = Mockito.mock(Snapshot.class);
DeltaLog deltaLog = Mockito.mock(DeltaLog.class);

DeletionVectorDescriptor deletionVector = null;
AddFile addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);
Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));

deletionVector =
DeletionVectorDescriptor.onDiskWithAbsolutePath(
filePath, size, 42, Option.empty(), Option.empty());

addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);

Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
Mockito.when(deltaLog.dataPath()).thenReturn(new Path("file:///"));
Assertions.assertEquals(
filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));
}
}

0 comments on commit b190e66

Please sign in to comment.