diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index fbee89f4e..40b822dfb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import org.apache.spark.sql.delta.actions.RemoveFile; import org.apache.xtable.exception.NotSupportedException; @@ -106,4 +107,24 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) { } return tableBasePath + Path.SEPARATOR + dataFilePath; } + + /** + * Extracts the representation of the deletion vector information corresponding to an AddFile + * action. Currently, this method extracts and returns the path to the data file for which a + * deletion vector data is present. + * + * @param snapshot the commit snapshot + * @param addFile the add file action + * @return the deletion vector representation (path of data file), or null if no deletion vector + * is present + */ + public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { + DeletionVectorDescriptor deletionVector = addFile.deletionVector(); + if (deletionVector == null) { + return null; + } + + String dataFilePath = addFile.path(); + return getFullPathToFile(snapshot, dataFilePath); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 19ecc02c6..a5937b022 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -21,8 +21,10 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -99,11 +101,16 @@ public TableChange getTableChangeForCommit(Long versionNumber) { Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty()); FileFormat fileFormat = actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider()); - Set addedFiles = new HashSet<>(); - Set removedFiles = new HashSet<>(); + + // All 3 of the following data structures use data file's absolute path as the key + Map addedFiles = new HashMap<>(); + Map removedFiles = new HashMap<>(); + // Set of data file paths for which deletion vectors exists. + Set deletionVectors = new HashSet<>(); + for (Action action : actionsForVersion) { if (action instanceof AddFile) { - addedFiles.add( + InternalDataFile dataFile = actionsConverter.convertAddActionToInternalDataFile( (AddFile) action, snapshotAtVersion, @@ -112,19 +119,47 @@ public TableChange getTableChangeForCommit(Long versionNumber) { tableAtVersion.getReadSchema().getFields(), true, DeltaPartitionExtractor.getInstance(), - DeltaStatsExtractor.getInstance())); + DeltaStatsExtractor.getInstance()); + addedFiles.put(dataFile.getPhysicalPath(), dataFile); + String deleteVectorPath = + actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); + if (deleteVectorPath != null) { + deletionVectors.add(deleteVectorPath); + } } else if (action instanceof RemoveFile) { - removedFiles.add( + InternalDataFile dataFile = actionsConverter.convertRemoveActionToInternalDataFile( (RemoveFile) action, snapshotAtVersion, fileFormat, tableAtVersion.getPartitioningFields(), - DeltaPartitionExtractor.getInstance())); + DeltaPartitionExtractor.getInstance()); + removedFiles.put(dataFile.getPhysicalPath(), dataFile); + } + } + + // In Delta Lake if delete vector information is added for an existing data file, as a result of + // a delete operation, then a new RemoveFile action is added to the commit log to remove the old + // entry which is replaced by a new entry, AddFile with delete vector information. Since the + // same data file is removed and added, we need to remove it from the added and removed file + // maps which are used to track actual added and removed data files. + for (String deletionVector : deletionVectors) { + // validate that a Remove action is also added for the data file + if (removedFiles.containsKey(deletionVector)) { + addedFiles.remove(deletionVector); + removedFiles.remove(deletionVector); + } else { + log.warn( + "No Remove action found for the data file for which deletion vector is added {}. This is unexpected.", + deletionVector); } } + DataFilesDiff dataFilesDiff = - DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build(); + DataFilesDiff.builder() + .filesAdded(addedFiles.values()) + .filesRemoved(removedFiles.values()) + .build(); return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java index a6f74cee0..75ecce331 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java @@ -34,15 +34,20 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Value; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.delta.actions.AddFile; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.exception.ParseException; @@ -56,6 +61,7 @@ * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging * {@link DeltaValueConverter}. */ +@Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaStatsExtractor { private static final Set FIELD_TYPES_WITH_STATS_SUPPORT = @@ -74,9 +80,13 @@ public class DeltaStatsExtractor { private static final DeltaStatsExtractor INSTANCE = new DeltaStatsExtractor(); - private static final String PATH_DELIMITER = "\\."; private static final ObjectMapper MAPPER = new ObjectMapper(); + /* this data structure collects type names of all unrecognized Delta Lake stats. For instance + data file stats in presence of delete vectors would contain 'tightBounds' stat which is + currently not handled by XTable */ + private final Set unsupportedStats = new HashSet<>(); + public static DeltaStatsExtractor getInstance() { return INSTANCE; } @@ -182,6 +192,8 @@ public List getColumnStatsForFile(AddFile addFile, List fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues()); Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount()); @@ -211,6 +223,20 @@ public List getColumnStatsForFile(AddFile addFile, List additionalStats) { + if (additionalStats == null || additionalStats.isEmpty()) { + return; + } + + additionalStats.keySet().stream() + .filter(key -> !unsupportedStats.contains(key)) + .forEach( + key -> { + log.info("Unrecognized/unsupported Delta data file stat: {}", key); + unsupportedStats.add(key); + }); + } + /** * Takes the input map which represents a json object and flattens it. * @@ -239,6 +265,17 @@ private Map flattenStatMap(Map statMap) { return result; } + /** + * Returns the names of all unsupported stats that have been discovered during the parsing of + * Delta Lake stats. + * + * @return set of unsupported stats + */ + @VisibleForTesting + Set getUnsupportedStats() { + return Collections.unmodifiableSet(unsupportedStats); + } + @Builder @Value private static class DeltaStats { @@ -246,6 +283,16 @@ private static class DeltaStats { Map minValues; Map maxValues; Map nullCount; + + /* this is a catch-all for any additional stats that are not explicitly handled */ + @JsonIgnore + @Getter(lazy = true) + Map additionalStats = new HashMap<>(); + + @JsonAnySetter + public void setAdditionalStat(String key, Object value) { + getAdditionalStats().put(key, value); + } } @Value diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index d1d33bf82..ed02893e3 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -42,7 +43,13 @@ import org.apache.xtable.GenericTable; import org.apache.xtable.TestSparkDeltaTable; +import org.apache.xtable.ValidationTestHelper; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.model.CommitsBacklog; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.TableFormat; public class ITDeltaDeleteVectorConvert { @TempDir private static Path tempDir; @@ -147,18 +154,32 @@ public void testInsertsUpsertsAndDeletes() { allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); assertEquals(228L, testSparkDeltaTable.getNumRows()); - // TODO conversion fails if delete vectors are enabled, this is because of missing handlers for - // deletion files. - // TODO pending for another PR - // SourceTable tableConfig = - // SourceTable.builder() - // .name(testSparkDeltaTable.getTableName()) - // .basePath(testSparkDeltaTable.getBasePath()) - // .formatName(TableFormat.DELTA) - // .build(); - // DeltaConversionSource conversionSource = - // conversionSourceProvider.getConversionSourceInstance(tableConfig); - // InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) + .build(); + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); + + // validateDeltaPartitioning(internalSnapshot); + ValidationTestHelper.validateSnapshot( + internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + + // Get changes in incremental format. + InstantsForIncrementalSync instantsForIncrementalSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(Instant.ofEpochMilli(timestamp1)) + .build(); + CommitsBacklog commitsBacklog = + conversionSource.getCommitsBacklog(instantsForIncrementalSync); + for (Long version : commitsBacklog.getCommitsToProcess()) { + TableChange tableChange = conversionSource.getTableChangeForCommit(version); + allTableChanges.add(tableChange); + } + ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); } private void validateDeletedRecordCount( diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java new file mode 100644 index 000000000..e62e93414 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.net.URISyntaxException; + +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.Snapshot; +import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; + +import scala.Option; + +class TestDeltaActionsConverter { + + @Test + void extractDeletionVector() throws URISyntaxException { + DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance(); + + int size = 123; + long time = 234L; + boolean dataChange = true; + String stats = ""; + String filePath = "https://container.blob.core.windows.net/tablepath/file_path"; + Snapshot snapshot = Mockito.mock(Snapshot.class); + DeltaLog deltaLog = Mockito.mock(DeltaLog.class); + + DeletionVectorDescriptor deletionVector = null; + AddFile addFileAction = + new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); + Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + + deletionVector = + DeletionVectorDescriptor.onDiskWithAbsolutePath( + filePath, size, 42, Option.empty(), Option.empty()); + + addFileAction = + new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); + + Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); + Mockito.when(deltaLog.dataPath()) + .thenReturn(new Path("https://container.blob.core.windows.net/tablepath")); + Assertions.assertEquals( + filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java index dc313b674..db685883b 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java @@ -20,6 +20,7 @@ import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.Arrays; @@ -150,10 +151,16 @@ void convertStatsToInternalRepresentation() throws IOException { deltaStats.put("maxValues", maxValues); deltaStats.put("nullCount", nullValues); deltaStats.put("numRecords", 100); + deltaStats.put("tightBounds", Boolean.TRUE); + deltaStats.put("nonExisting", minValues); String stats = MAPPER.writeValueAsString(deltaStats); AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, stats, null, null); DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance(); List actual = extractor.getColumnStatsForFile(addFile, fields); + Set unsupportedStats = extractor.getUnsupportedStats(); + assertEquals(2, unsupportedStats.size()); + assertTrue(unsupportedStats.contains("tightBounds")); + assertTrue(unsupportedStats.contains("nonExisting")); List expected = Arrays.asList(