From c13c0c94daf9f6abc34cb109a1a4fd15c8a47a04 Mon Sep 17 00:00:00 2001 From: Himadri Pal Date: Fri, 8 Mar 2024 23:29:09 -0800 Subject: [PATCH] Spark 3.4, 3.3 : Support output-spec-id in rewrite data files(#9901)(Backport #9803) Co-authored-by: hpal --- .../actions/RewriteDataFilesSparkAction.java | 4 +- .../actions/SparkBinPackDataRewriter.java | 3 +- .../actions/SparkShufflingDataRewriter.java | 22 ++- .../spark/actions/SparkSortDataRewriter.java | 5 + .../actions/SparkZOrderDataRewriter.java | 21 +++ .../actions/TestRewriteDataFilesAction.java | 126 ++++++++++++++++++ .../actions/RewriteDataFilesSparkAction.java | 3 +- .../actions/SparkBinPackDataRewriter.java | 3 +- .../actions/SparkShufflingDataRewriter.java | 20 ++- .../actions/SparkZOrderDataRewriter.java | 16 +++ .../actions/TestRewriteDataFilesAction.java | 125 +++++++++++++++++ 11 files changed, 337 insertions(+), 11 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index a5a69dea959e..ae547e206324 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -80,8 +80,8 @@ public class RewriteDataFilesSparkAction PARTIAL_PROGRESS_MAX_COMMITS, TARGET_FILE_SIZE_BYTES, USE_STARTING_SEQUENCE_NUMBER, - REWRITE_JOB_ORDER); - + REWRITE_JOB_ORDER, + OUTPUT_SPEC_ID); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java index 9a96f44ebda6..d256bf2794e2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java @@ -58,13 +58,14 @@ protected void doRewrite(String groupId, List group) { .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName()) + .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId()) .mode("append") .save(groupId); } // invoke a shuffle if the original spec does not match the output spec private DistributionMode distributionMode(List group) { - boolean requiresRepartition = !group.get(0).spec().equals(table().spec()); + boolean requiresRepartition = !group.get(0).spec().equals(outputSpec()); return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE; } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java index 1add6383c618..63f2c88d6bae 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -61,6 +63,18 @@ protected SparkShufflingDataRewriter(SparkSession spark, Table table) { super(spark, table); } + protected abstract org.apache.iceberg.SortOrder sortOrder(); + + /** + * Retrieves and returns the schema for the rewrite using the current table schema. + * + *

The schema with all columns required for correctly sorting the table. This may include + * additional computed columns which are not written to the table but are used for sorting. + */ + protected Schema sortSchema() { + return table().schema(); + } + protected abstract Dataset sortedDF(Dataset df, List group); @Override @@ -97,6 +111,7 @@ public void doRewrite(String groupId, List group) { .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId()) .mode("append") .save(groupId); } @@ -113,11 +128,12 @@ protected Dataset sort(Dataset df, org.apache.iceberg.SortOrder sortOr protected org.apache.iceberg.SortOrder outputSortOrder( List group, org.apache.iceberg.SortOrder sortOrder) { - boolean includePartitionColumns = !group.get(0).spec().equals(table().spec()); - if (includePartitionColumns) { + PartitionSpec spec = outputSpec(); + boolean requiresRepartitioning = !group.get(0).spec().equals(spec); + if (requiresRepartitioning) { // build in the requirement for partition sorting into our sort order // as the original spec for this group does not match the output spec - return SortOrderUtil.buildSortOrder(table(), sortOrder); + return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder()); } else { return sortOrder; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java index 4615f3cebc92..621e73bc60b0 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java @@ -53,6 +53,11 @@ public String description() { return "SORT"; } + @Override + protected SortOrder sortOrder() { + return sortOrder; + } + @Override protected Dataset sortedDF(Dataset df, List group) { return sort(df, outputSortOrder(group, sortOrder)); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java index 138e126f0ad2..52b3f9e158b9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkUtil; @@ -103,6 +104,26 @@ public void init(Map options) { this.varLengthContribution = varLengthContribution(options); } + @Override + protected SortOrder sortOrder() { + return Z_SORT_ORDER; + } + + /** + * Overrides the sortSchema method to include columns from Z_SCHEMA. + * + *

This method generates a new Schema object which consists of columns from the original table + * schema and Z_SCHEMA. + */ + @Override + protected Schema sortSchema() { + return new Schema( + new ImmutableList.Builder() + .addAll(table().schema().columns()) + .addAll(Z_SCHEMA.columns()) + .build()); + } + @Override protected Dataset sortedDF(Dataset df, List group) { Dataset zValueDF = df.withColumn(Z_COLUMN, zValue(df)); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index f17e8c6ed89f..cfa1c9da951e 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -54,6 +54,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; @@ -105,6 +106,7 @@ import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -1384,6 +1386,130 @@ public void testRewriteJobOrderFilesDesc() { Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); } + @Test + public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .binPack() + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testBinPackRewriterWithSpecificOutputSpec() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .binPack() + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testBinpackRewriteWithInvalidOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + Assertions.assertThatThrownBy( + () -> + actions() + .rewriteDataFiles(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(1234)) + .binPack() + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot use output spec id 1234 because the table does not contain a reference to this spec-id."); + } + + @Test + public void testSortRewriterWithSpecificOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build()) + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testZOrderRewriteWithSpecificOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .zOrder("c2", "c3") + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) { + List rewrittenFiles = currentDataFiles(table); + assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId); + assertThat(rewrittenFiles) + .allMatch( + file -> + ((PartitionData) file.partition()) + .getPartitionType() + .equals(table.specs().get(outputSpecId).partitionType())); + } + + protected List currentDataFiles(Table table) { + return Streams.stream(table.newScan().planFiles()) + .map(FileScanTask::file) + .collect(Collectors.toList()); + } + private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); StructLikeMap>> fileGroupsByPartition = diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 6b5628a1f4b5..7c516b96754a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -81,7 +81,8 @@ public class RewriteDataFilesSparkAction PARTIAL_PROGRESS_MAX_COMMITS, TARGET_FILE_SIZE_BYTES, USE_STARTING_SEQUENCE_NUMBER, - REWRITE_JOB_ORDER); + REWRITE_JOB_ORDER, + OUTPUT_SPEC_ID); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java index 9a96f44ebda6..d256bf2794e2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java @@ -58,13 +58,14 @@ protected void doRewrite(String groupId, List group) { .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName()) + .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId()) .mode("append") .save(groupId); } // invoke a shuffle if the original spec does not match the output spec private DistributionMode distributionMode(List group) { - boolean requiresRepartition = !group.get(0).spec().equals(table().spec()); + boolean requiresRepartition = !group.get(0).spec().equals(outputSpec()); return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE; } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java index c9c962526eb3..ce572c6486cc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java @@ -23,6 +23,8 @@ import java.util.Set; import java.util.function.Function; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -86,6 +88,16 @@ protected SparkShufflingDataRewriter(SparkSession spark, Table table) { protected abstract org.apache.iceberg.SortOrder sortOrder(); + /** + * Retrieves and returns the schema for the rewrite using the current table schema. + * + *

The schema with all columns required for correctly sorting the table. This may include + * additional computed columns which are not written to the table but are used for sorting. + */ + protected Schema sortSchema() { + return table().schema(); + } + protected abstract Dataset sortedDF( Dataset df, Function, Dataset> sortFunc); @@ -122,6 +134,7 @@ public void doRewrite(String groupId, List group) { .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId()) .mode("append") .save(groupId); } @@ -152,11 +165,12 @@ private Dataset transformPlan(Dataset df, Function group) { - boolean includePartitionColumns = !group.get(0).spec().equals(table().spec()); - if (includePartitionColumns) { + PartitionSpec spec = outputSpec(); + boolean requiresRepartitioning = !group.get(0).spec().equals(spec); + if (requiresRepartitioning) { // build in the requirement for partition sorting into our sort order // as the original spec for this group does not match the output spec - return SortOrderUtil.buildSortOrder(table(), sortOrder()); + return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder()); } else { return sortOrder(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java index 9a618661fe40..cc4fb78ebd18 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkUtil; @@ -108,6 +109,21 @@ protected SortOrder sortOrder() { return Z_SORT_ORDER; } + /** + * Overrides the sortSchema method to include columns from Z_SCHEMA. + * + *

This method generates a new Schema object which consists of columns from the original table + * schema and Z_SCHEMA. + */ + @Override + protected Schema sortSchema() { + return new Schema( + new ImmutableList.Builder() + .addAll(table().schema().columns()) + .addAll(Z_SCHEMA.columns()) + .build()); + } + @Override protected Dataset sortedDF(Dataset df, Function, Dataset> sortFunc) { Dataset zValueDF = df.withColumn(Z_COLUMN, zValue(df)); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index bfffa65accac..0eea49532346 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -53,6 +53,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; @@ -1426,6 +1427,130 @@ public void testRewriteJobOrderFilesDesc() { Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); } + @Test + public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .binPack() + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testBinPackRewriterWithSpecificOutputSpec() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .binPack() + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testBinpackRewriteWithInvalidOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + Assertions.assertThatThrownBy( + () -> + actions() + .rewriteDataFiles(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(1234)) + .binPack() + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot use output spec id 1234 because the table does not contain a reference to this spec-id."); + } + + @Test + public void testSortRewriterWithSpecificOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build()) + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + @Test + public void testZOrderRewriteWithSpecificOutputSpecId() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int outputSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.bucket("c3", 2)).commit(); + + long dataSizeBefore = testDataSize(table); + long count = currentData().size(); + + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .zOrder("c2", "c3") + .execute(); + + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + assertThat(currentData().size()).isEqualTo(count); + shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); + } + + protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) { + List rewrittenFiles = currentDataFiles(table); + assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId); + assertThat(rewrittenFiles) + .allMatch( + file -> + ((PartitionData) file.partition()) + .getPartitionType() + .equals(table.specs().get(outputSpecId).partitionType())); + } + + protected List currentDataFiles(Table table) { + return Streams.stream(table.newScan().planFiles()) + .map(FileScanTask::file) + .collect(Collectors.toList()); + } + private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); StructLikeMap>> fileGroupsByPartition =