diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java index ec8244a2aeb46..cc64d9549f98b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java @@ -142,6 +142,7 @@ public static RowType bootstrapType(TableSchema schema) { new ArrayList<>( schema.projectedLogicalRowType( Stream.concat(primaryKeys.stream(), partitionKeys.stream()) + .distinct() .collect(Collectors.toList())) .getFields()); bootstrapFields.add( diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index bbb1abfd35591..d9d8e1f69bcfe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -27,10 +27,12 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.Test; @@ -43,6 +45,7 @@ import java.util.List; import java.util.function.Consumer; +import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD; import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; @@ -53,20 +56,7 @@ public class IndexBootstrapTest extends TableTestBase { @Test public void testBoostrap() throws Exception { - Identifier identifier = identifier("T"); - Options options = new Options(); - options.set(CoreOptions.BUCKET, -1); - Schema schema = - Schema.newBuilder() - .column("pt", DataTypes.INT()) - .column("col", DataTypes.INT()) - .column("pk", DataTypes.INT()) - .primaryKey("pk") - .partitionKeys("pt") - .options(options.toMap()) - .build(); - catalog.createTable(identifier, schema, true); - Table table = catalog.getTable(identifier); + Table table = createTable(); write( table, @@ -106,6 +96,32 @@ public void testBoostrap() throws Exception { Thread.sleep(1000); } + private Table createTable() throws Exception { + Identifier identifier = identifier("T"); + Options options = new Options(); + options.set(CoreOptions.BUCKET, -1); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("col", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .primaryKey("pk") + .partitionKeys("pt") + .options(options.toMap()) + .build(); + catalog.createTable(identifier, schema, true); + return catalog.getTable(identifier); + } + + @Test + public void testBootstrapType() throws Exception { + FileStoreTable table = (FileStoreTable) createTable(); + RowType indexRowType = IndexBootstrap.bootstrapType(table.schema()); + assertThat(indexRowType.getFieldNames()).contains(BUCKET_FIELD); + assertThat(indexRowType.getFieldIndex(BUCKET_FIELD)) + .isEqualTo(2); // the last field is bucket, which is not in table schema + } + @Test public void testFilterSplit() { assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50, 230)).isTrue(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java index c908efc4efed5..aaf28b3de7295 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java @@ -18,12 +18,21 @@ package org.apache.paimon.flink; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.crosspartition.IndexBootstrap; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; + import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; +import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD; import static org.assertj.core.api.Assertions.assertThat; /** ITCase for batch file store. */ @@ -161,4 +170,27 @@ public void testLargeRecords() { sql("insert into large_t select * from src"); assertThat(sql("select k, count(*) from large_t group by k having count(*) > 1")).isEmpty(); } + + @Test + public void testBootstrapType() throws Exception { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(path))); + FileStoreTable t = (FileStoreTable) catalog.getTable(Identifier.create("default", "T")); + FileStoreTable partialUpdateT = + (FileStoreTable) catalog.getTable(Identifier.create("default", "partial_update_t")); + FileStoreTable firstRowT = + (FileStoreTable) catalog.getTable(Identifier.create("default", "first_row_t")); + assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldNames()).contains(BUCKET_FIELD); + assertThat(IndexBootstrap.bootstrapType(partialUpdateT.schema()).getFieldNames()) + .contains(BUCKET_FIELD); + assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldNames()) + .contains(BUCKET_FIELD); + assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldIndex(BUCKET_FIELD)) + .isEqualTo(2); + assertThat( + IndexBootstrap.bootstrapType(partialUpdateT.schema()) + .getFieldIndex(BUCKET_FIELD)) + .isEqualTo(2); + assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldIndex(BUCKET_FIELD)) + .isEqualTo(2); + } }