Skip to content

Commit

Permalink
[flink] Fix multi-add conflict when two batch jobs are writing into t…
Browse files Browse the repository at this point in the history
…he same partition at the same time (#3699)
  • Loading branch information
tsreaper authored Jul 9, 2024
1 parent d59cefa commit 0b5929f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public boolean forceCreatingSnapshot() {
if (this.forceCreatingSnapshot) {
return true;
}
if (overwritePartition != null) {
return true;
}
return tagAutoManager != null
&& tagAutoManager.getTagAutoCreation() != null
&& tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
Expand Down Expand Up @@ -230,6 +233,11 @@ public void commitMultiple(List<ManifestCommittable> committables, boolean check
}

public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
return filterAndCommitMultiple(committables, true);
}

public int filterAndCommitMultiple(
List<ManifestCommittable> committables, boolean checkAppendFiles) {
Set<Long> retryIdentifiers =
commit.filterCommitted(
committables.stream()
Expand All @@ -250,9 +258,9 @@ public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
// identifier must be in increasing order
.sorted(Comparator.comparingLong(ManifestCommittable::identifier))
.collect(Collectors.toList());
if (retryCommittables.size() > 0) {
if (!retryCommittables.isEmpty()) {
checkFilesExistence(retryCommittables);
commitMultiple(retryCommittables, true);
commitMultiple(retryCommittables, checkAppendFiles);
}
return retryCommittables.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ GlobalCommitT combine(
* Filter out all {@link GlobalCommitT} which have committed, and commit the remaining {@link
* GlobalCommitT}.
*/
int filterAndCommit(List<GlobalCommitT> globalCommittables) throws IOException;
int filterAndCommit(List<GlobalCommitT> globalCommittables, boolean checkAppendFiles)
throws IOException;

default int filterAndCommit(List<GlobalCommitT> globalCommittables) throws IOException {
return filterAndCommit(globalCommittables, true);
}

Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT> committables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> extends AbstractStreamOpe
implements OneInputStreamOperator<CommitT, CommitT>, BoundedOneInput {

private static final long serialVersionUID = 1L;
private static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE;

/** Record all the inputs until commit. */
private final Deque<CommitT> inputs = new ArrayDeque<>();
Expand Down Expand Up @@ -188,28 +189,38 @@ public void endInput() throws Exception {
}

pollInputs();
commitUpToCheckpoint(Long.MAX_VALUE);
commitUpToCheckpoint(END_INPUT_CHECKPOINT_ID);
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
commitUpToCheckpoint(endInput ? END_INPUT_CHECKPOINT_ID : checkpointId);
}

private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
List<GlobalCommitT> committables = committables(headMap);
committer.commit(committables);
headMap.clear();
if (committables.isEmpty() && committer.forceCreatingSnapshot()) {
committables =
Collections.singletonList(
toCommittables(checkpointId, Collections.emptyList()));
}

if (committables.isEmpty()) {
if (committer.forceCreatingSnapshot()) {
GlobalCommitT commit = toCommittables(checkpointId, Collections.emptyList());
committer.commit(Collections.singletonList(commit));
}
if (checkpointId == END_INPUT_CHECKPOINT_ID) {
// In new versions of Flink, if a batch job fails, it might restart from some operator
// in the middle.
// If the job is restarted from the commit operator, endInput will be called again, and
// the same commit messages will be committed again.
// So when `endInput` is called, we must check if the corresponding snapshot exists.
// However, if the snapshot does not exist, then append files must be new files. So
// there is no need to check for duplicated append files.
committer.filterAndCommit(committables, false);
} else {
committer.commit(committables);
}
headMap.clear();
}

@Override
Expand Down Expand Up @@ -240,12 +251,14 @@ private void pollInputs() throws Exception {
List<CommitT> committables = entry.getValue();
// To prevent the asynchronous completion of tasks with multiple concurrent bounded
// stream inputs, which leads to some tasks passing a Committable with cp =
// Long.MAX_VALUE during the endInput method call of the current checkpoint, while other
// tasks pass a Committable with Long.MAX_VALUE during other checkpoints hence causing
// an error here, we have a special handling for Committables with Long.MAX_VALUE:
// instead of throwing an error, we merge them.
if (cp != null && cp == Long.MAX_VALUE && committablesPerCheckpoint.containsKey(cp)) {
// Merge the Long.MAX_VALUE committables here.
// END_INPUT_CHECKPOINT_ID during the endInput method call of the current checkpoint,
// while other tasks pass a Committable with END_INPUT_CHECKPOINT_ID during other
// checkpoints hence causing an error here, we have a special handling for Committables
// with END_INPUT_CHECKPOINT_ID: instead of throwing an error, we merge them.
if (cp != null
&& cp == END_INPUT_CHECKPOINT_ID
&& committablesPerCheckpoint.containsKey(cp)) {
// Merge the END_INPUT_CHECKPOINT_ID committables here.
GlobalCommitT commitT =
committer.combine(
cp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ public void commit(List<ManifestCommittable> committables)
}

@Override
public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
int committed = commit.filterAndCommitMultiple(globalCommittables);
public int filterAndCommit(
List<ManifestCommittable> globalCommittables, boolean checkAppendFiles) {
int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles);
if (partitionMarkDone != null) {
partitionMarkDone.notifyCommittable(globalCommittables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,15 @@ public void commit(List<WrappedManifestCommittable> committables)
}

@Override
public int filterAndCommit(List<WrappedManifestCommittable> globalCommittables)
public int filterAndCommit(
List<WrappedManifestCommittable> globalCommittables, boolean checkAppendFiles)
throws IOException {
int result = 0;
for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
groupByTable(globalCommittables).entrySet()) {
result += getStoreCommitter(entry.getKey()).filterAndCommit(entry.getValue());
result +=
getStoreCommitter(entry.getKey())
.filterAndCommit(entry.getValue(), checkAppendFiles);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,42 @@ private void innerTestChangelogProducing(List<String> options) throws Exception
it.close();
}

@Test
public void testBatchJobWithConflictAndRestart() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().allowRestart(10).build();
tEnv.executeSql(
"CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '"
+ getTempDirPath()
+ "' )");
tEnv.executeSql("USE CATALOG mycat");
tEnv.executeSql(
"CREATE TABLE t ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) "
// force compaction for each commit
+ "WITH ( 'bucket' = '2', 'full-compaction.delta-commits' = '1' )");
// write some basic records
tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)").await();

// two batch jobs compact at the same time
// let writer's parallelism > 1, so it cannot be chained with committer
TableResult result1 =
tEnv.executeSql(
"INSERT INTO t /*+ OPTIONS('sink.parallelism' = '2') */ VALUES (1, 11), (2, 21), (3, 31)");
TableResult result2 =
tEnv.executeSql(
"INSERT INTO t /*+ OPTIONS('sink.parallelism' = '2') */ VALUES (1, 12), (2, 22), (3, 32)");

result1.await();
result2.await();

try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM t").collect()) {
for (int i = 0; i < 3; i++) {
assertThat(it).hasNext();
Row row = it.next();
assertThat(row.getField(1)).isNotEqualTo((int) row.getField(0) * 10);
}
}
}

// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected static class TableEnvironmentBuilder {
private boolean streamingMode = true;
private Integer parallelism = null;
private Integer checkpointIntervalMs = null;
private boolean allowRestart = false;
private int numRestarts = 0;
private Configuration conf = new Configuration();

public TableEnvironmentBuilder batchMode() {
Expand All @@ -142,12 +142,11 @@ public TableEnvironmentBuilder checkpointIntervalMs(int checkpointIntervalMs) {
}

public TableEnvironmentBuilder allowRestart() {
this.allowRestart = true;
return this;
return allowRestart(Integer.MAX_VALUE);
}

public TableEnvironmentBuilder allowRestart(boolean allowRestart) {
this.allowRestart = allowRestart;
public TableEnvironmentBuilder allowRestart(int numRestarts) {
this.numRestarts = numRestarts;
return this;
}

Expand Down Expand Up @@ -192,15 +191,15 @@ public TableEnvironment build() {
parallelism);
}

if (allowRestart) {
if (numRestarts > 0) {
tEnv.getConfig()
.getConfiguration()
.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
tEnv.getConfig()
.getConfiguration()
.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
Integer.MAX_VALUE);
numRestarts);
tEnv.getConfig()
.getConfiguration()
.set(
Expand Down

0 comments on commit 0b5929f

Please sign in to comment.