diff --git a/docs/content/flink/action-jars.md b/docs/content/flink/action-jars.md index 002d5e683b69..de86d16861d4 100644 --- a/docs/content/flink/action-jars.md +++ b/docs/content/flink/action-jars.md @@ -283,3 +283,24 @@ For more information of drop_partition, see /path/to/paimon-flink-action-{{< version >}}.jar \ drop_partition --help ``` + +## Rewrite File Index + +Run the following command to submit a rewrite_file_index job for the table. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + rewrite_file_index \ + --warehouse \ + --identifier \ + [--catalog_conf [--catalog_conf ...]] +``` + +For more information of rewrite_file_index, see + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + rewrite_file_index --help +``` diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java new file mode 100644 index 000000000000..82dfe38bb7f1 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.RewriteFileIndexProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Rewrite-file-index action for Flink. */ +public class RewriteFileIndexAction extends ActionBase { + + private String identifier; + + public RewriteFileIndexAction( + String warehouse, String identifier, Map catalogConfig) { + super(warehouse, catalogConfig); + this.identifier = identifier; + } + + public void run() throws Exception { + RewriteFileIndexProcedure rewriteFileIndexProcedure = new RewriteFileIndexProcedure(); + rewriteFileIndexProcedure.withCatalog(catalog); + rewriteFileIndexProcedure.call(new DefaultProcedureContext(env), identifier); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java new file mode 100644 index 000000000000..910e7b8d9280 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link RewriteFileIndexAction}. */ +public class RewriteFileIndexActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "rewrite_file_index"; + + private static final String IDENTIFIER_KEY = "identifier"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + String warehouse = params.get(WAREHOUSE); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String identifier = params.get(IDENTIFIER_KEY); + + RewriteFileIndexAction action = + new RewriteFileIndexAction(warehouse, identifier, catalogConfig); + + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println("Action \"rewrite_file_index\" Rewrite the file index for the table."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " rewrite_file_index --warehouse --identifier "); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index e38d6ebd4c08..22a0b564ef92 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -35,6 +35,7 @@ org.apache.paimon.flink.action.CreateBranchActionFactory org.apache.paimon.flink.action.DeleteBranchActionFactory org.apache.paimon.flink.action.FastForwardActionFactory org.apache.paimon.flink.action.RepairActionFactory +org.apache.paimon.flink.action.RewriteFileIndexActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java new file mode 100644 index 000000000000..d9513c17ef1b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fileindex.FileIndexFormat; +import org.apache.paimon.fileindex.FileIndexReader; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** IT cases for {@link RewriteFileIndexAction}. */ +public class RewriteFileIndexActionITCase extends ActionITCaseBase { + @Test + public void testFileIndexAddIndex() throws Exception { + + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + String.format( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", + warehouse)); + tEnv.useCatalog("PAIMON"); + + tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await(); + tEnv.executeSql("USE test_db").await(); + + tEnv.executeSql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'bucket' = '-1'" + + ")"); + tEnv.executeSql( + "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')"); + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); + tEnv.executeSql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')"); + + if (ThreadLocalRandom.current().nextBoolean()) { + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); + createAction( + RewriteFileIndexAction.class, + "rewrite_file_index", + "--warehouse", + warehouse, + "--identifier", + "test_db.T") + .withStreamExecutionEnvironment(env) + .run(); + } else { + callProcedure("CALL sys.rewrite_file_index('test_db.T')"); + } + + FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier("test_db", "T")); + List list = table.store().newScan().plan().files(); + for (ManifestEntry entry : list) { + List extraFiles = + entry.file().extraFiles().stream() + .filter(s -> s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + + Assertions.assertThat(extraFiles.size()).isEqualTo(1); + + String file = extraFiles.get(0); + + Path indexFilePath = + table.store() + .pathFactory() + .createDataFilePathFactory(entry.partition(), entry.bucket()) + .toPath(file); + try (FileIndexFormat.Reader reader = + FileIndexFormat.createReader( + table.fileIO().newInputStream(indexFilePath), table.rowType())) { + Set readerSetK = reader.readColumnIndex("k"); + Assertions.assertThat(readerSetK.size()).isEqualTo(1); + + Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0, 1); + for (FileIndexReader fileIndexReader : readerSetK) { + Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isTrue(); + } + + predicateK = new PredicateBuilder(table.rowType()).equal(0, 4); + for (FileIndexReader fileIndexReader : readerSetK) { + Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isFalse(); + } + + Set readerSetV = reader.readColumnIndex("v"); + Assertions.assertThat(readerSetV.size()).isEqualTo(1); + + Predicate predicateV = + new PredicateBuilder(table.rowType()) + .equal(1, BinaryString.fromString("100")); + for (FileIndexReader fileIndexReader : readerSetV) { + Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isTrue(); + } + + predicateV = + new PredicateBuilder(table.rowType()) + .equal(1, BinaryString.fromString("101")); + for (FileIndexReader fileIndexReader : readerSetV) { + Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isFalse(); + } + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java index d465287ffcad..c49f8a9a6ee9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java @@ -43,80 +43,6 @@ /** IT Case for {@link RewriteFileIndexProcedure}. */ public class RewriteFileIndexProcedureITCase extends CatalogITCaseBase { - @Test - public void testFileIndexProcedureAddIndex() throws Exception { - sql( - "CREATE TABLE T (" - + " k INT," - + " v STRING," - + " hh INT," - + " dt STRING" - + ") PARTITIONED BY (dt, hh) WITH (" - + " 'write-only' = 'true'," - + " 'bucket' = '-1'" - + ")"); - - sql( - "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')"); - - tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); - sql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')"); - sql("CALL sys.rewrite_file_index('default.T')"); - - FileStoreTable table = paimonTable("T"); - List list = table.store().newScan().plan().files(); - - for (ManifestEntry entry : list) { - List extraFiles = - entry.file().extraFiles().stream() - .filter(s -> s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) - .collect(Collectors.toList()); - - Assertions.assertThat(extraFiles.size()).isEqualTo(1); - - String file = extraFiles.get(0); - - Path indexFilePath = - table.store() - .pathFactory() - .createDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(file); - try (FileIndexFormat.Reader reader = - FileIndexFormat.createReader( - table.fileIO().newInputStream(indexFilePath), table.rowType())) { - Set readerSetK = reader.readColumnIndex("k"); - Assertions.assertThat(readerSetK.size()).isEqualTo(1); - - Predicate predicateK = new PredicateBuilder(table.rowType()).equal(0, 1); - for (FileIndexReader fileIndexReader : readerSetK) { - Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isTrue(); - } - - predicateK = new PredicateBuilder(table.rowType()).equal(0, 4); - for (FileIndexReader fileIndexReader : readerSetK) { - Assertions.assertThat(predicateK.visit(fileIndexReader).remain()).isFalse(); - } - - Set readerSetV = reader.readColumnIndex("v"); - Assertions.assertThat(readerSetV.size()).isEqualTo(1); - - Predicate predicateV = - new PredicateBuilder(table.rowType()) - .equal(1, BinaryString.fromString("100")); - for (FileIndexReader fileIndexReader : readerSetV) { - Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isTrue(); - } - - predicateV = - new PredicateBuilder(table.rowType()) - .equal(1, BinaryString.fromString("101")); - for (FileIndexReader fileIndexReader : readerSetV) { - Assertions.assertThat(predicateV.visit(fileIndexReader).remain()).isFalse(); - } - } - } - } - @Test public void testFileIndexProcedureSchemaEvolution() throws Exception { sql(