diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 40b838a4a48d..55e21a35470e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -447,6 +447,11 @@ public static List> getOptions() { return list; } + public static String generateCustomUid( + String uidPrefix, String tableName, String userDefinedSuffix) { + return String.format("%s_%s_%s", uidPrefix, tableName, userDefinedSuffix); + } + /** The mode of lookup cache. */ public enum LookupCacheMode { /** Auto mode, try to use partial mode. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index ef4b6495649c..c2299a7e8699 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -39,6 +39,7 @@ import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX; +import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; /** Sink for dynamic bucket table. */ @@ -103,8 +104,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) { bucketAssigned = bucketAssigned.uid( - String.format( - "%s_%s_%s", + generateCustomUid( DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 76a0516c9942..59f2f4b1035f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -68,6 +68,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY; +import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -232,7 +233,7 @@ public DataStream doWrite( String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX); if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) { - written = written.uid(String.format("%s_%s_%s", WRITER_NAME, table.name(), uidSuffix)); + written = written.uid(generateCustomUid(WRITER_NAME, table.name(), uidSuffix)); } if (options.get(SINK_USE_MANAGED_MEMORY)) { @@ -303,12 +304,12 @@ protected DataStreamSink doCommit(DataStream written, String com .setParallelism(1) .setMaxParallelism(1); if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) { - committed.uid( - String.format( - "%s_%s_%s", - GLOBAL_COMMITTER_NAME, - table.name(), - options.get(SINK_OPERATOR_UID_SUFFIX))); + committed = + committed.uid( + generateCustomUid( + GLOBAL_COMMITTER_NAME, + table.name(), + options.get(SINK_OPERATOR_UID_SUFFIX))); } configureGlobalCommitter( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3d7f5c1f0ef9..ed94043c035d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -65,6 +65,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE; import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX; +import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -218,9 +219,7 @@ private DataStream toDataStream(Source source) { if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) { dataStream = (DataStreamSource) - dataStream.uid( - String.format( - "%s_%s_%s", SOURCE_NAME, table.name(), uidSuffix)); + dataStream.uid(generateCustomUid(SOURCE_NAME, table.name(), uidSuffix)); } if (parallelism != null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java index 1176aed34fc6..c46c4c358922 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; @@ -48,8 +49,11 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test case for flink source / sink restore from savepoint. */ +@SuppressWarnings("BusyWait") public class FlinkJobRecoveryITCase extends CatalogITCaseBase { + private static final String MINI_CLUSTER_FIELD = "miniCluster"; + @BeforeEach @Override public void before() throws IOException { @@ -113,6 +117,7 @@ public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) th afterRecoverCheckSql, afterRecoverExpectedRows, Collections.emptyList(), + Pair.of("target_table", "target_table"), Collections.emptyMap()); } @@ -146,6 +151,7 @@ public void testRestoreFromSavepointWithIgnoreSourceState() throws Exception { afterRecoverCheckSql, afterRecoverExpectedRows, Collections.emptyList(), + Pair.of("target_table", "target_table"), Collections.singletonMap( StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true")); } @@ -181,6 +187,7 @@ public void testRestoreFromSavepointWithIgnoreSinkState() throws Exception { afterRecoverCheckSql, afterRecoverExpectedRows, Collections.singletonList(updateSql), + Pair.of("target_table", "target_table2"), Collections.singletonMap( StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true")); } @@ -193,12 +200,15 @@ private void testRecoverFromSavepoint( String afterRecoverCheckSql, List afterRecoverExpectedRows, List updateSql, + Pair targetTables, Map recoverOptions) throws Exception { + //noinspection OptionalGetWithoutIsPresent JobClient jobClient = sEnv.executeSql(beforeRecoverSql).getJobClient().get(); String checkpointPath = - triggerCheckpointAndWaitForWrites(jobClient, beforeRecoverExpectedRows.size()); + triggerCheckpointAndWaitForWrites( + jobClient, targetTables.getLeft(), beforeRecoverExpectedRows.size()); jobClient.cancel().get(); List rows = batchSql(beforeRecoverCheckSql); @@ -217,8 +227,10 @@ private void testRecoverFromSavepoint( config.setString(entry.getKey(), entry.getValue()); } + //noinspection OptionalGetWithoutIsPresent jobClient = sEnv.executeSql(afterRecoverSql).getJobClient().get(); - triggerCheckpointAndWaitForWrites(jobClient, afterRecoverExpectedRows.size()); + triggerCheckpointAndWaitForWrites( + jobClient, targetTables.getRight(), afterRecoverExpectedRows.size()); jobClient.cancel().get(); rows = batchSql(afterRecoverCheckSql); @@ -268,16 +280,17 @@ private Snapshot waitForNewSnapshot(String tableName, long initialSnapshot) } @SuppressWarnings("unchecked") - private T reflectGetField(Object instance, String fieldName) + private T reflectGetMiniCluster(Object instance) throws NoSuchFieldException, IllegalAccessException { - Field field = instance.getClass().getDeclaredField(fieldName); + Field field = instance.getClass().getDeclaredField(MINI_CLUSTER_FIELD); field.setAccessible(true); return (T) field.get(instance); } - private String triggerCheckpointAndWaitForWrites(JobClient jobClient, long totalReocrds) - throws Exception { - MiniCluster miniCluster = reflectGetField(jobClient, "miniCluster"); + private String triggerCheckpointAndWaitForWrites( + JobClient jobClient, String targetTable, long totalRecords) throws Exception { + //noinspection resource + MiniCluster miniCluster = reflectGetMiniCluster(jobClient); JobID jobID = jobClient.getJobID(); JobStatus jobStatus = jobClient.getJobStatus().get(); while (jobStatus == JobStatus.INITIALIZING || jobStatus == JobStatus.CREATED) { @@ -309,10 +322,11 @@ private String triggerCheckpointAndWaitForWrites(JobClient jobClient, long total } String checkpointPath = miniCluster.triggerCheckpoint(jobID).get(); - Snapshot snapshot = waitForNewSnapshot("target_table", -1L); - while (snapshot.totalRecordCount() < totalReocrds) { + Snapshot snapshot = waitForNewSnapshot(targetTable, -1L); + //noinspection DataFlowIssue + while (snapshot.totalRecordCount() < totalRecords) { checkpointPath = miniCluster.triggerCheckpoint(jobID).get(); - snapshot = waitForNewSnapshot("target_table", snapshot.id()); + snapshot = waitForNewSnapshot(targetTable, snapshot.id()); } return checkpointPath;