Skip to content

Commit

Permalink
Fix commit log parsing of Delta tables with delete vector
Browse files Browse the repository at this point in the history
- Correct handling of delete vectors to avoid adding data file paths to both new and removed file
  sets in `FileDiff`
- Detect new file stats in logs that are not supported by XTable and log them. New stats do not
  break parsing of the stats json. For e.g. `tightBounds` property in Delta stats if deletion
  vectors are enabled
  • Loading branch information
ashvina committed Dec 18, 2024
1 parent a360aff commit f676788
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;

import org.apache.xtable.exception.NotSupportedException;
Expand Down Expand Up @@ -106,4 +107,24 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
}
return tableBasePath + Path.SEPARATOR + dataFilePath;
}

/**
* Extracts the representation of the deletion vector information corresponding to an AddFile
* action. Currently, this method extracts and returns the path to the data file for which a
* deletion vector data is present.
*
* @param snapshot the commit snapshot
* @param addFile the add file action
* @return the deletion vector representation (path of data file), or null if no deletion vector
* is present
*/
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}

String dataFilePath = addFile.path();
return getFullPathToFile(snapshot, dataFilePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -99,11 +101,16 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
Set<InternalDataFile> addedFiles = new HashSet<>();
Set<InternalDataFile> removedFiles = new HashSet<>();

// All 3 of the following data structures use data file's absolute path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();

for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
addedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
Expand All @@ -112,19 +119,47 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance()));
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
}
} else if (action instanceof RemoveFile) {
removedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
DeltaPartitionExtractor.getInstance()));
DeltaPartitionExtractor.getInstance());
removedFiles.put(dataFile.getPhysicalPath(), dataFile);
}
}

// In Delta Lake if delete vector information is added for an existing data file, as a result of
// a delete operation, then a new RemoveFile action is added to the commit log to remove the old
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
removedFiles.remove(deletionVector);
} else {
log.warn(
"No Remove action found for the data file for which deletion vector is added {}. This is unexpected.",
deletionVector);
}
}

DataFilesDiff dataFilesDiff =
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
DataFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;

import org.apache.spark.sql.delta.actions.AddFile;

import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.exception.ParseException;
Expand All @@ -56,6 +61,7 @@
* DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging
* {@link DeltaValueConverter}.
*/
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaStatsExtractor {
private static final Set<InternalType> FIELD_TYPES_WITH_STATS_SUPPORT =
Expand All @@ -74,9 +80,13 @@ public class DeltaStatsExtractor {

private static final DeltaStatsExtractor INSTANCE = new DeltaStatsExtractor();

private static final String PATH_DELIMITER = "\\.";
private static final ObjectMapper MAPPER = new ObjectMapper();

/* this data structure collects type names of all unrecognized Delta Lake stats. For instance
data file stats in presence of delete vectors would contain 'tightBounds' stat which is
currently not handled by XTable */
private final Set<String> unsupportedStats = new HashSet<>();

public static DeltaStatsExtractor getInstance() {
return INSTANCE;
}
Expand Down Expand Up @@ -182,6 +192,8 @@ public List<ColumnStat> getColumnStatsForFile(AddFile addFile, List<InternalFiel
// TODO: Additional work needed to track maps & arrays.
try {
DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class);
collectUnsupportedStats(deltaStats.getAdditionalStats());

Map<String, Object> fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues());
Map<String, Object> fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues());
Map<String, Object> fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount());
Expand Down Expand Up @@ -211,6 +223,20 @@ public List<ColumnStat> getColumnStatsForFile(AddFile addFile, List<InternalFiel
}
}

private void collectUnsupportedStats(Map<String, Object> additionalStats) {
if (additionalStats == null || additionalStats.isEmpty()) {
return;
}

additionalStats.keySet().stream()
.filter(key -> !unsupportedStats.contains(key))
.forEach(
key -> {
log.info("Unrecognized/unsupported Delta data file stat: {}", key);
unsupportedStats.add(key);
});
}

/**
* Takes the input map which represents a json object and flattens it.
*
Expand Down Expand Up @@ -239,13 +265,34 @@ private Map<String, Object> flattenStatMap(Map<String, Object> statMap) {
return result;
}

/**
* Returns the names of all unsupported stats that have been discovered during the parsing of
* Delta Lake stats.
*
* @return set of unsupported stats
*/
@VisibleForTesting
Set<String> getUnsupportedStats() {
return Collections.unmodifiableSet(unsupportedStats);
}

@Builder
@Value
private static class DeltaStats {
long numRecords;
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;

/* this is a catch-all for any additional stats that are not explicitly handled */
@JsonIgnore
@Getter(lazy = true)
Map<String, Object> additionalStats = new HashMap<>();

@JsonAnySetter
public void setAdditionalStat(String key, Object value) {
getAdditionalStats().put(key, value);
}
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -42,7 +43,13 @@

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.ValidationTestHelper;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.TableFormat;

public class ITDeltaDeleteVectorConvert {
@TempDir private static Path tempDir;
Expand Down Expand Up @@ -147,18 +154,32 @@ public void testInsertsUpsertsAndDeletes() {
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(228L, testSparkDeltaTable.getNumRows());

// TODO conversion fails if delete vectors are enabled, this is because of missing handlers for
// deletion files.
// TODO pending for another PR
// SourceTable tableConfig =
// SourceTable.builder()
// .name(testSparkDeltaTable.getTableName())
// .basePath(testSparkDeltaTable.getBasePath())
// .formatName(TableFormat.DELTA)
// .build();
// DeltaConversionSource conversionSource =
// conversionSourceProvider.getConversionSourceInstance(tableConfig);
// InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
SourceTable tableConfig =
SourceTable.builder()
.name(testSparkDeltaTable.getTableName())
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();

// validateDeltaPartitioning(internalSnapshot);
ValidationTestHelper.validateSnapshot(
internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));

// Get changes in incremental format.
InstantsForIncrementalSync instantsForIncrementalSync =
InstantsForIncrementalSync.builder()
.lastSyncInstant(Instant.ofEpochMilli(timestamp1))
.build();
CommitsBacklog<Long> commitsBacklog =
conversionSource.getCommitsBacklog(instantsForIncrementalSync);
for (Long version : commitsBacklog.getCommitsToProcess()) {
TableChange tableChange = conversionSource.getTableChangeForCommit(version);
allTableChanges.add(tableChange);
}
ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
}

private void validateDeletedRecordCount(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.delta;

import java.net.URISyntaxException;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;

import scala.Option;

class TestDeltaActionsConverter {

@Test
void extractDeletionVector() throws URISyntaxException {
DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance();

int size = 123;
long time = 234L;
boolean dataChange = true;
String stats = "";
String filePath = "https://container.blob.core.windows.net/tablepath/file_path";
Snapshot snapshot = Mockito.mock(Snapshot.class);
DeltaLog deltaLog = Mockito.mock(DeltaLog.class);

DeletionVectorDescriptor deletionVector = null;
AddFile addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);
Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));

deletionVector =
DeletionVectorDescriptor.onDiskWithAbsolutePath(
filePath, size, 42, Option.empty(), Option.empty());

addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);

Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
Mockito.when(deltaLog.dataPath())
.thenReturn(new Path("https://container.blob.core.windows.net/tablepath"));
Assertions.assertEquals(
filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -150,10 +151,16 @@ void convertStatsToInternalRepresentation() throws IOException {
deltaStats.put("maxValues", maxValues);
deltaStats.put("nullCount", nullValues);
deltaStats.put("numRecords", 100);
deltaStats.put("tightBounds", Boolean.TRUE);
deltaStats.put("nonExisting", minValues);
String stats = MAPPER.writeValueAsString(deltaStats);
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, stats, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
Set<String> unsupportedStats = extractor.getUnsupportedStats();
assertEquals(2, unsupportedStats.size());
assertTrue(unsupportedStats.contains("tightBounds"));
assertTrue(unsupportedStats.contains("nonExisting"));

List<ColumnStat> expected =
Arrays.asList(
Expand Down

0 comments on commit f676788

Please sign in to comment.