Skip to content

Commit

Permalink
Spark 3.4: Add utility to load table state reliably (apache#11115)
Browse files Browse the repository at this point in the history
  • Loading branch information
dramaticlly authored Sep 16, 2024
1 parent 5ce7c30 commit d5b21d8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,18 +851,29 @@ private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
.run(item -> io.deleteFile(item.path()));
}

public static Dataset<Row> loadTable(SparkSession spark, Table table, long snapshotId) {
SparkTable sparkTable = new SparkTable(table, snapshotId, false);
DataSourceV2Relation relation = createRelation(sparkTable, ImmutableMap.of());
return Dataset.ofRows(spark, relation);
}

public static Dataset<Row> loadMetadataTable(
SparkSession spark, Table table, MetadataTableType type) {
return loadMetadataTable(spark, table, type, ImmutableMap.of());
}

public static Dataset<Row> loadMetadataTable(
SparkSession spark, Table table, MetadataTableType type, Map<String, String> extraOptions) {
SparkTable metadataTable =
new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
Table metadataTable = MetadataTableUtils.createMetadataTableInstance(table, type);
SparkTable sparkMetadataTable = new SparkTable(metadataTable, false);
DataSourceV2Relation relation = createRelation(sparkMetadataTable, extraOptions);
return Dataset.ofRows(spark, relation);
}

private static DataSourceV2Relation createRelation(
SparkTable sparkTable, Map<String, String> extraOptions) {
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(extraOptions);
return Dataset.ofRows(
spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
return DataSourceV2Relation.create(sparkTable, Option.empty(), Option.empty(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.stats.ThetaSketchAgg;
Expand Down Expand Up @@ -73,13 +74,8 @@ private static Blob toBlob(Types.NestedField field, Sketch sketch, Snapshot snap

private static Row computeNDVSketches(
SparkSession spark, Table table, Snapshot snapshot, List<String> colNames) {
return spark
.read()
.format("iceberg")
.option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId())
.load(table.name())
.select(toAggColumns(colNames))
.first();
Dataset<Row> inputDF = SparkTableUtil.loadTable(spark, table, snapshot.snapshotId());
return inputDF.select(toAggColumns(colNames)).first();
}

private static Column[] toAggColumns(List<String> colNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ public TestComputeTableStatsAction(
super(catalogName, implementation, config);
}

@Test
public void testLoadingTableDirectly() {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
sql("INSERT into %s values(1, 'abcd')", tableName);

Table table = validationCatalog.loadTable(tableIdent);

SparkActions actions = SparkActions.get();
ComputeTableStats.Result results = actions.computeTableStats(table).execute();
StatisticsFile statisticsFile = results.statisticsFile();
assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0);
assertThat(statisticsFile.blobMetadata().size()).isEqualTo(2);
}

@Test
public void testComputeTableStatsAction() throws NoSuchTableException, ParseException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
Expand Down

0 comments on commit d5b21d8

Please sign in to comment.