Skip to content

Commit

Permalink
[core] Fix partition table get index RowType error in IndexBootstrap (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored and wxplovecc committed Aug 6, 2024
1 parent 0487f65 commit 1fd9ec4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public static RowType bootstrapType(TableSchema schema) {
new ArrayList<>(
schema.projectedLogicalRowType(
Stream.concat(primaryKeys.stream(), partitionKeys.stream())
.distinct()
.collect(Collectors.toList()))
.getFields());
bootstrapFields.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;
Expand All @@ -43,6 +45,7 @@
import java.util.List;
import java.util.function.Consumer;

import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
Expand All @@ -53,20 +56,7 @@ public class IndexBootstrapTest extends TableTestBase {

@Test
public void testBoostrap() throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.BUCKET, -1);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("col", DataTypes.INT())
.column("pk", DataTypes.INT())
.primaryKey("pk")
.partitionKeys("pt")
.options(options.toMap())
.build();
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Table table = createTable();

write(
table,
Expand Down Expand Up @@ -106,6 +96,32 @@ public void testBoostrap() throws Exception {
Thread.sleep(1000);
}

private Table createTable() throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.BUCKET, -1);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("col", DataTypes.INT())
.column("pk", DataTypes.INT())
.primaryKey("pk")
.partitionKeys("pt")
.options(options.toMap())
.build();
catalog.createTable(identifier, schema, true);
return catalog.getTable(identifier);
}

@Test
public void testBootstrapType() throws Exception {
FileStoreTable table = (FileStoreTable) createTable();
RowType indexRowType = IndexBootstrap.bootstrapType(table.schema());
assertThat(indexRowType.getFieldNames()).contains(BUCKET_FIELD);
assertThat(indexRowType.getFieldIndex(BUCKET_FIELD))
.isEqualTo(2); // the last field is bucket, which is not in table schema
}

@Test
public void testFilterSplit() {
assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50, 230)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@

package org.apache.paimon.flink;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
import static org.assertj.core.api.Assertions.assertThat;

/** ITCase for batch file store. */
Expand Down Expand Up @@ -161,4 +170,27 @@ public void testLargeRecords() {
sql("insert into large_t select * from src");
assertThat(sql("select k, count(*) from large_t group by k having count(*) > 1")).isEmpty();
}

@Test
public void testBootstrapType() throws Exception {
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(path)));
FileStoreTable t = (FileStoreTable) catalog.getTable(Identifier.create("default", "T"));
FileStoreTable partialUpdateT =
(FileStoreTable) catalog.getTable(Identifier.create("default", "partial_update_t"));
FileStoreTable firstRowT =
(FileStoreTable) catalog.getTable(Identifier.create("default", "first_row_t"));
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldNames()).contains(BUCKET_FIELD);
assertThat(IndexBootstrap.bootstrapType(partialUpdateT.schema()).getFieldNames())
.contains(BUCKET_FIELD);
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldNames())
.contains(BUCKET_FIELD);
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldIndex(BUCKET_FIELD))
.isEqualTo(2);
assertThat(
IndexBootstrap.bootstrapType(partialUpdateT.schema())
.getFieldIndex(BUCKET_FIELD))
.isEqualTo(2);
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldIndex(BUCKET_FIELD))
.isEqualTo(2);
}
}

0 comments on commit 1fd9ec4

Please sign in to comment.