diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java index a0f4ef33dee2..585c73cb952c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java @@ -34,6 +34,7 @@ import java.util.Map; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkState; /** * Pick the tables to be cloned based on the user input parameters. The record type of the build @@ -114,6 +115,8 @@ private DataStream> build(Catalog sourceCatalog) throws E database + "." + tableName, targetDatabase + "." + targetTableName)); } + checkState(!result.isEmpty(), "Didn't find any table in source catalog."); + if (LOG.isDebugEnabled()) { LOG.debug("The clone identifiers of source table and target table are: {}", result); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java index 71672551abcb..a55b01cc203b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -44,8 +45,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.apache.paimon.utils.Preconditions.checkState; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for {@link CloneAction}. */ public class CloneActionITCase extends ActionITCaseBase { @@ -640,6 +643,46 @@ public void testCloneTableWithExpiration(String invoker) throws Exception { .isEqualTo(Collections.singletonList("+I[1]")); } + // ------------------------------------------------------------------------ + // Negative Tests + // ------------------------------------------------------------------------ + + @Test + public void testEmptySourceCatalog() { + String sourceWarehouse = getTempDirPath("source-ware"); + + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build(); + tEnv.executeSql( + "CREATE CATALOG sourcecat WITH (\n" + + " 'type' = 'paimon',\n" + + String.format(" 'warehouse' = '%s'\n", sourceWarehouse) + + ")"); + + String targetWarehouse = getTempDirPath("target-ware"); + + String[] args = + new String[] { + "clone", + "--warehouse", + sourceWarehouse, + "--target_warehouse", + targetWarehouse, + "--parallelism", + "1" + }; + CloneAction action = (CloneAction) ActionFactory.createAction(args).get(); + + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().allowRestart().build(); + action.withStreamExecutionEnvironment(env); + + assertThatThrownBy(action::run) + .satisfies( + anyCauseMatches( + IllegalStateException.class, + "Didn't find any table in source catalog.")); + } + // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------