Skip to content

Commit

Permalink
Spark 3.4, 3.3 : Support output-spec-id in rewrite data files(apache#…
Browse files Browse the repository at this point in the history
…9901)(Backport apache#9803)

Co-authored-by: hpal <[email protected]>
  • Loading branch information
himadripal and hpal authored Mar 9, 2024
1 parent b261edd commit c13c0c9
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER);

REWRITE_JOB_ORDER,
OUTPUT_SPEC_ID);
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ protected void doRewrite(String groupId, List<FileScanTask> group) {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName())
.option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}

// invoke a shuffle if the original spec does not match the output spec
private DistributionMode distributionMode(List<FileScanTask> group) {
boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -61,6 +63,18 @@ protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
super(spark, table);
}

protected abstract org.apache.iceberg.SortOrder sortOrder();

/**
* Retrieves and returns the schema for the rewrite using the current table schema.
*
* <p>The schema with all columns required for correctly sorting the table. This may include
* additional computed columns which are not written to the table but are used for sorting.
*/
protected Schema sortSchema() {
return table().schema();
}

protected abstract Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group);

@Override
Expand Down Expand Up @@ -97,6 +111,7 @@ public void doRewrite(String groupId, List<FileScanTask> group) {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
Expand All @@ -113,11 +128,12 @@ protected Dataset<Row> sort(Dataset<Row> df, org.apache.iceberg.SortOrder sortOr

protected org.apache.iceberg.SortOrder outputSortOrder(
List<FileScanTask> group, org.apache.iceberg.SortOrder sortOrder) {
boolean includePartitionColumns = !group.get(0).spec().equals(table().spec());
if (includePartitionColumns) {
PartitionSpec spec = outputSpec();
boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
if (requiresRepartitioning) {
// build in the requirement for partition sorting into our sort order
// as the original spec for this group does not match the output spec
return SortOrderUtil.buildSortOrder(table(), sortOrder);
return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
} else {
return sortOrder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public String description() {
return "SORT";
}

@Override
protected SortOrder sortOrder() {
return sortOrder;
}

@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
return sort(df, outputSortOrder(group, sortOrder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
Expand Down Expand Up @@ -103,6 +104,26 @@ public void init(Map<String, String> options) {
this.varLengthContribution = varLengthContribution(options);
}

@Override
protected SortOrder sortOrder() {
return Z_SORT_ORDER;
}

/**
* Overrides the sortSchema method to include columns from Z_SCHEMA.
*
* <p>This method generates a new Schema object which consists of columns from the original table
* schema and Z_SCHEMA.
*/
@Override
protected Schema sortSchema() {
return new Schema(
new ImmutableList.Builder<Types.NestedField>()
.addAll(table().schema().columns())
.addAll(Z_SCHEMA.columns())
.build());
}

@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -1384,6 +1386,130 @@ public void testRewriteJobOrderFilesDesc() {
Assert.assertNotEquals("Number of files order should not be ascending", actual, expected);
}

@Test
public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
int outputSpecId = table.spec().specId();
table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();

long dataSizeBefore = testDataSize(table);
long count = currentData().size();

RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId))
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.binPack()
.execute();

assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
assertThat(currentData().size()).isEqualTo(count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}

@Test
public void testBinPackRewriterWithSpecificOutputSpec() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
int outputSpecId = table.spec().specId();
table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();

long dataSizeBefore = testDataSize(table);
long count = currentData().size();

RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId))
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.binPack()
.execute();

assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
assertThat(currentData().size()).isEqualTo(count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}

@Test
public void testBinpackRewriteWithInvalidOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
Assertions.assertThatThrownBy(
() ->
actions()
.rewriteDataFiles(table)
.option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(1234))
.binPack()
.execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot use output spec id 1234 because the table does not contain a reference to this spec-id.");
}

@Test
public void testSortRewriterWithSpecificOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
int outputSpecId = table.spec().specId();
table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();

long dataSizeBefore = testDataSize(table);
long count = currentData().size();

RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId))
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.sort(SortOrder.builderFor(table.schema()).asc("c2").asc("c3").build())
.execute();

assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
assertThat(currentData().size()).isEqualTo(count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}

@Test
public void testZOrderRewriteWithSpecificOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
int outputSpecId = table.spec().specId();
table.updateSpec().addField(Expressions.bucket("c3", 2)).commit();

long dataSizeBefore = testDataSize(table);
long count = currentData().size();

RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(outputSpecId))
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.zOrder("c2", "c3")
.execute();

assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
assertThat(currentData().size()).isEqualTo(count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}

protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) {
List<DataFile> rewrittenFiles = currentDataFiles(table);
assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId);
assertThat(rewrittenFiles)
.allMatch(
file ->
((PartitionData) file.partition())
.getPartitionType()
.equals(table.specs().get(outputSpecId).partitionType()));
}

protected List<DataFile> currentDataFiles(Table table) {
return Streams.stream(table.newScan().planFiles())
.map(FileScanTask::file)
.collect(Collectors.toList());
}

private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER);
REWRITE_JOB_ORDER,
OUTPUT_SPEC_ID);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ protected void doRewrite(String groupId, List<FileScanTask> group) {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName())
.option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}

// invoke a shuffle if the original spec does not match the output spec
private DistributionMode distributionMode(List<FileScanTask> group) {
boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
boolean requiresRepartition = !group.get(0).spec().equals(outputSpec());
return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -86,6 +88,16 @@ protected SparkShufflingDataRewriter(SparkSession spark, Table table) {

protected abstract org.apache.iceberg.SortOrder sortOrder();

/**
* Retrieves and returns the schema for the rewrite using the current table schema.
*
* <p>The schema with all columns required for correctly sorting the table. This may include
* additional computed columns which are not written to the table but are used for sorting.
*/
protected Schema sortSchema() {
return table().schema();
}

protected abstract Dataset<Row> sortedDF(
Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc);

Expand Down Expand Up @@ -122,6 +134,7 @@ public void doRewrite(String groupId, List<FileScanTask> group) {
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, outputSpecId())
.mode("append")
.save(groupId);
}
Expand Down Expand Up @@ -152,11 +165,12 @@ private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, Logica
}

private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> group) {
boolean includePartitionColumns = !group.get(0).spec().equals(table().spec());
if (includePartitionColumns) {
PartitionSpec spec = outputSpec();
boolean requiresRepartitioning = !group.get(0).spec().equals(spec);
if (requiresRepartitioning) {
// build in the requirement for partition sorting into our sort order
// as the original spec for this group does not match the output spec
return SortOrderUtil.buildSortOrder(table(), sortOrder());
return SortOrderUtil.buildSortOrder(sortSchema(), spec, sortOrder());
} else {
return sortOrder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
Expand Down Expand Up @@ -108,6 +109,21 @@ protected SortOrder sortOrder() {
return Z_SORT_ORDER;
}

/**
* Overrides the sortSchema method to include columns from Z_SCHEMA.
*
* <p>This method generates a new Schema object which consists of columns from the original table
* schema and Z_SCHEMA.
*/
@Override
protected Schema sortSchema() {
return new Schema(
new ImmutableList.Builder<Types.NestedField>()
.addAll(table().schema().columns())
.addAll(Z_SCHEMA.columns())
.build());
}

@Override
protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc) {
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
Expand Down
Loading

0 comments on commit c13c0c9

Please sign in to comment.