Skip to content

Commit

Permalink
resolve comment
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Nov 1, 2024
1 parent 2be8070 commit 25c4195
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ public static List<ConfigOption<?>> getOptions() {
return list;
}

public static String generateCustomUid(
String uidPrefix, String tableName, String userDefinedSuffix) {
return String.format("%s_%s_%s", uidPrefix, tableName, userDefinedSuffix);
}

/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Auto mode, try to use partial mode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

/** Sink for dynamic bucket table. */
Expand Down Expand Up @@ -103,8 +104,7 @@ public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelis
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
bucketAssigned =
bucketAssigned.uid(
String.format(
"%s_%s_%s",
generateCustomUid(
DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -232,7 +233,7 @@ public DataStream<Committable> doWrite(

String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
written = written.uid(String.format("%s_%s_%s", WRITER_NAME, table.name(), uidSuffix));
written = written.uid(generateCustomUid(WRITER_NAME, table.name(), uidSuffix));
}

if (options.get(SINK_USE_MANAGED_MEMORY)) {
Expand Down Expand Up @@ -303,12 +304,12 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
.setParallelism(1)
.setMaxParallelism(1);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
committed.uid(
String.format(
"%s_%s_%s",
GLOBAL_COMMITTER_NAME,
table.name(),
options.get(SINK_OPERATOR_UID_SUFFIX)));
committed =
committed.uid(
generateCustomUid(
GLOBAL_COMMITTER_NAME,
table.name(),
options.get(SINK_OPERATOR_UID_SUFFIX)));
}
configureGlobalCommitter(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down Expand Up @@ -218,9 +219,7 @@ private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
dataStream =
(DataStreamSource<RowData>)
dataStream.uid(
String.format(
"%s_%s_%s", SOURCE_NAME, table.name(), uidSuffix));
dataStream.uid(generateCustomUid(SOURCE_NAME, table.name(), uidSuffix));
}

if (parallelism != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
Expand Down Expand Up @@ -48,8 +49,11 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Test case for flink source / sink restore from savepoint. */
@SuppressWarnings("BusyWait")
public class FlinkJobRecoveryITCase extends CatalogITCaseBase {

private static final String MINI_CLUSTER_FIELD = "miniCluster";

@BeforeEach
@Override
public void before() throws IOException {
Expand Down Expand Up @@ -113,6 +117,7 @@ public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) th
afterRecoverCheckSql,
afterRecoverExpectedRows,
Collections.emptyList(),
Pair.of("target_table", "target_table"),
Collections.emptyMap());
}

Expand Down Expand Up @@ -146,6 +151,7 @@ public void testRestoreFromSavepointWithIgnoreSourceState() throws Exception {
afterRecoverCheckSql,
afterRecoverExpectedRows,
Collections.emptyList(),
Pair.of("target_table", "target_table"),
Collections.singletonMap(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
}
Expand Down Expand Up @@ -181,6 +187,7 @@ public void testRestoreFromSavepointWithIgnoreSinkState() throws Exception {
afterRecoverCheckSql,
afterRecoverExpectedRows,
Collections.singletonList(updateSql),
Pair.of("target_table", "target_table2"),
Collections.singletonMap(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true"));
}
Expand All @@ -193,12 +200,15 @@ private void testRecoverFromSavepoint(
String afterRecoverCheckSql,
List<Row> afterRecoverExpectedRows,
List<String> updateSql,
Pair<String, String> targetTables,
Map<String, String> recoverOptions)
throws Exception {

//noinspection OptionalGetWithoutIsPresent
JobClient jobClient = sEnv.executeSql(beforeRecoverSql).getJobClient().get();
String checkpointPath =
triggerCheckpointAndWaitForWrites(jobClient, beforeRecoverExpectedRows.size());
triggerCheckpointAndWaitForWrites(
jobClient, targetTables.getLeft(), beforeRecoverExpectedRows.size());
jobClient.cancel().get();

List<Row> rows = batchSql(beforeRecoverCheckSql);
Expand All @@ -217,8 +227,10 @@ private void testRecoverFromSavepoint(
config.setString(entry.getKey(), entry.getValue());
}

//noinspection OptionalGetWithoutIsPresent
jobClient = sEnv.executeSql(afterRecoverSql).getJobClient().get();
triggerCheckpointAndWaitForWrites(jobClient, afterRecoverExpectedRows.size());
triggerCheckpointAndWaitForWrites(
jobClient, targetTables.getRight(), afterRecoverExpectedRows.size());
jobClient.cancel().get();

rows = batchSql(afterRecoverCheckSql);
Expand Down Expand Up @@ -268,16 +280,17 @@ private Snapshot waitForNewSnapshot(String tableName, long initialSnapshot)
}

@SuppressWarnings("unchecked")
private <T> T reflectGetField(Object instance, String fieldName)
private <T> T reflectGetMiniCluster(Object instance)
throws NoSuchFieldException, IllegalAccessException {
Field field = instance.getClass().getDeclaredField(fieldName);
Field field = instance.getClass().getDeclaredField(MINI_CLUSTER_FIELD);
field.setAccessible(true);
return (T) field.get(instance);
}

private String triggerCheckpointAndWaitForWrites(JobClient jobClient, long totalReocrds)
throws Exception {
MiniCluster miniCluster = reflectGetField(jobClient, "miniCluster");
private String triggerCheckpointAndWaitForWrites(
JobClient jobClient, String targetTable, long totalRecords) throws Exception {
//noinspection resource
MiniCluster miniCluster = reflectGetMiniCluster(jobClient);
JobID jobID = jobClient.getJobID();
JobStatus jobStatus = jobClient.getJobStatus().get();
while (jobStatus == JobStatus.INITIALIZING || jobStatus == JobStatus.CREATED) {
Expand Down Expand Up @@ -309,10 +322,11 @@ private String triggerCheckpointAndWaitForWrites(JobClient jobClient, long total
}

String checkpointPath = miniCluster.triggerCheckpoint(jobID).get();
Snapshot snapshot = waitForNewSnapshot("target_table", -1L);
while (snapshot.totalRecordCount() < totalReocrds) {
Snapshot snapshot = waitForNewSnapshot(targetTable, -1L);
//noinspection DataFlowIssue
while (snapshot.totalRecordCount() < totalRecords) {
checkpointPath = miniCluster.triggerCheckpoint(jobID).get();
snapshot = waitForNewSnapshot("target_table", snapshot.id());
snapshot = waitForNewSnapshot(targetTable, snapshot.id());
}

return checkpointPath;
Expand Down

0 comments on commit 25c4195

Please sign in to comment.