Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix commit log parsing of Delta tables with delete vector #596

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;

import org.apache.xtable.exception.NotSupportedException;
Expand Down Expand Up @@ -106,4 +107,24 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
}
return tableBasePath + Path.SEPARATOR + dataFilePath;
}

/**
* Extracts the representation of the deletion vector information corresponding to an AddFile
* action. Currently, this method extracts and returns the path to the data file for which a
* deletion vector data is present.
*
* @param snapshot the commit snapshot
* @param addFile the add file action
* @return the deletion vector representation (path of data file), or null if no deletion vector
* is present
*/
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}

String dataFilePath = addFile.path();
return getFullPathToFile(snapshot, dataFilePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -99,11 +101,16 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
Set<InternalDataFile> addedFiles = new HashSet<>();
Set<InternalDataFile> removedFiles = new HashSet<>();

// All 3 of the following data structures use data file's absolute path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();

for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
addedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
Expand All @@ -112,19 +119,47 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
tableAtVersion.getReadSchema().getFields(),
true,
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance()));
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
}
} else if (action instanceof RemoveFile) {
removedFiles.add(
InternalDataFile dataFile =
actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
tableAtVersion.getPartitioningFields(),
DeltaPartitionExtractor.getInstance()));
DeltaPartitionExtractor.getInstance());
removedFiles.put(dataFile.getPhysicalPath(), dataFile);
}
}

// In Delta Lake if delete vector information is added for an existing data file, as a result of
// a delete operation, then a new RemoveFile action is added to the commit log to remove the old
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
removedFiles.remove(deletionVector);
} else {
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we error out in this case? Just wondering if this will lead to data consistency issues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think XTable should terminate translation if a RemoveFile action corresponding to a AddFile+DeleteVector action is not present. Mainly because I'm not certain if adding a file for the first time along with an associated delete vector violates the Delta spec. I believe the Delta API would allow it, but I need to validate this. It may be allowed to support use cases like a long-running transaction which first adds a file and then remove some entries from it, but I'm not sure. If the spec allows it, XTable should not terminate.

"No Remove action found for the data file for which deletion vector is added {}. This is unexpected.",
deletionVector);
}
}

DataFilesDiff dataFilesDiff =
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
DataFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;

import org.apache.spark.sql.delta.actions.AddFile;

import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.exception.ParseException;
Expand All @@ -56,6 +61,7 @@
* DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging
* {@link DeltaValueConverter}.
*/
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaStatsExtractor {
private static final Set<InternalType> FIELD_TYPES_WITH_STATS_SUPPORT =
Expand All @@ -74,9 +80,13 @@ public class DeltaStatsExtractor {

private static final DeltaStatsExtractor INSTANCE = new DeltaStatsExtractor();

private static final String PATH_DELIMITER = "\\.";
private static final ObjectMapper MAPPER = new ObjectMapper();

/* this data structure collects type names of all unrecognized Delta Lake stats. For instance
data file stats in presence of delete vectors would contain 'tightBounds' stat which is
currently not handled by XTable */
private final Set<String> unsupportedStats = new HashSet<>();

public static DeltaStatsExtractor getInstance() {
return INSTANCE;
}
Expand Down Expand Up @@ -182,6 +192,8 @@ public List<ColumnStat> getColumnStatsForFile(AddFile addFile, List<InternalFiel
// TODO: Additional work needed to track maps & arrays.
try {
DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class);
collectUnsupportedStats(deltaStats.getAdditionalStats());

Map<String, Object> fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues());
Map<String, Object> fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues());
Map<String, Object> fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount());
Expand Down Expand Up @@ -211,6 +223,20 @@ public List<ColumnStat> getColumnStatsForFile(AddFile addFile, List<InternalFiel
}
}

private void collectUnsupportedStats(Map<String, Object> additionalStats) {
if (additionalStats == null || additionalStats.isEmpty()) {
return;
}

additionalStats.keySet().stream()
.filter(key -> !unsupportedStats.contains(key))
.forEach(
key -> {
log.info("Unrecognized/unsupported Delta data file stat: {}", key);
unsupportedStats.add(key);
});
}

/**
* Takes the input map which represents a json object and flattens it.
*
Expand Down Expand Up @@ -239,13 +265,34 @@ private Map<String, Object> flattenStatMap(Map<String, Object> statMap) {
return result;
}

/**
* Returns the names of all unsupported stats that have been discovered during the parsing of
* Delta Lake stats.
*
* @return set of unsupported stats
*/
@VisibleForTesting
Set<String> getUnsupportedStats() {
return Collections.unmodifiableSet(unsupportedStats);
}

@Builder
@Value
private static class DeltaStats {
long numRecords;
Map<String, Object> minValues;
Map<String, Object> maxValues;
Map<String, Object> nullCount;

/* this is a catch-all for any additional stats that are not explicitly handled */
@JsonIgnore
@Getter(lazy = true)
Map<String, Object> additionalStats = new HashMap<>();

@JsonAnySetter
public void setAdditionalStat(String key, Object value) {
getAdditionalStats().put(key, value);
}
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -42,7 +43,13 @@

import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.ValidationTestHelper;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.TableFormat;

public class ITDeltaDeleteVectorConvert {
@TempDir private static Path tempDir;
Expand Down Expand Up @@ -147,18 +154,32 @@ public void testInsertsUpsertsAndDeletes() {
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
assertEquals(228L, testSparkDeltaTable.getNumRows());

// TODO conversion fails if delete vectors are enabled, this is because of missing handlers for
// deletion files.
// TODO pending for another PR
// SourceTable tableConfig =
// SourceTable.builder()
// .name(testSparkDeltaTable.getTableName())
// .basePath(testSparkDeltaTable.getBasePath())
// .formatName(TableFormat.DELTA)
// .build();
// DeltaConversionSource conversionSource =
// conversionSourceProvider.getConversionSourceInstance(tableConfig);
// InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
SourceTable tableConfig =
SourceTable.builder()
.name(testSparkDeltaTable.getTableName())
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();

// validateDeltaPartitioning(internalSnapshot);
ValidationTestHelper.validateSnapshot(
internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));

// Get changes in incremental format.
InstantsForIncrementalSync instantsForIncrementalSync =
InstantsForIncrementalSync.builder()
.lastSyncInstant(Instant.ofEpochMilli(timestamp1))
.build();
CommitsBacklog<Long> commitsBacklog =
conversionSource.getCommitsBacklog(instantsForIncrementalSync);
for (Long version : commitsBacklog.getCommitsToProcess()) {
TableChange tableChange = conversionSource.getTableChangeForCommit(version);
allTableChanges.add(tableChange);
}
ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
}

private void validateDeletedRecordCount(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.delta;

import java.net.URISyntaxException;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;

import scala.Option;

class TestDeltaActionsConverter {

@Test
void extractDeletionVector() throws URISyntaxException {
DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance();

int size = 123;
long time = 234L;
boolean dataChange = true;
String stats = "";
String filePath = "https://container.blob.core.windows.net/tablepath/file_path";
Snapshot snapshot = Mockito.mock(Snapshot.class);
DeltaLog deltaLog = Mockito.mock(DeltaLog.class);

DeletionVectorDescriptor deletionVector = null;
AddFile addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);
Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));

deletionVector =
DeletionVectorDescriptor.onDiskWithAbsolutePath(
filePath, size, 42, Option.empty(), Option.empty());

addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);

Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
Mockito.when(deltaLog.dataPath())
.thenReturn(new Path("https://container.blob.core.windows.net/tablepath"));
Assertions.assertEquals(
filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -150,10 +151,16 @@ void convertStatsToInternalRepresentation() throws IOException {
deltaStats.put("maxValues", maxValues);
deltaStats.put("nullCount", nullValues);
deltaStats.put("numRecords", 100);
deltaStats.put("tightBounds", Boolean.TRUE);
deltaStats.put("nonExisting", minValues);
String stats = MAPPER.writeValueAsString(deltaStats);
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, stats, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
Set<String> unsupportedStats = extractor.getUnsupportedStats();
assertEquals(2, unsupportedStats.size());
assertTrue(unsupportedStats.contains("tightBounds"));
assertTrue(unsupportedStats.contains("nonExisting"));

List<ColumnStat> expected =
Arrays.asList(
Expand Down
Loading