Skip to content

Commit

Permalink
[fix] revert and fix by another way
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Dec 13, 2023
1 parent 5705fbd commit 2d300d7
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public interface Committer<CommitT, GlobalCommitT> extends AutoCloseable {
GlobalCommitT combine(long checkpointId, long watermark, List<CommitT> committables)
throws IOException;

GlobalCommitT combine(
long checkpointId, long watermark, GlobalCommitT t, List<CommitT> committables);

/** Commits the given {@link GlobalCommitT}. */
void commit(List<GlobalCommitT> globalCommittables) throws IOException, InterruptedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,24 @@ private void pollInputs() throws Exception {
for (Map.Entry<Long, List<CommitT>> entry : grouped.entrySet()) {
Long cp = entry.getKey();
List<CommitT> committables = entry.getValue();
if (committablesPerCheckpoint.containsKey(cp)) {
if (cp != null && cp == Long.MAX_VALUE && committablesPerCheckpoint.containsKey(cp)) {
GlobalCommitT commitT =
committer.combine(
cp,
currentWatermark,
committablesPerCheckpoint.get(cp),
committables);
committablesPerCheckpoint.put(cp, commitT);
} else if (committablesPerCheckpoint.containsKey(cp)) {
throw new RuntimeException(
String.format(
"Repeatedly commit the same checkpoint files. \n"
+ "The previous files is %s, \n"
+ "and the subsequent files is %s",
committablesPerCheckpoint.get(cp), committables));
} else {
committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
}

committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
}

this.inputs.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,20 @@ public boolean forceCreatingSnapshot() {
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables) {
ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
addFile(manifestCommittable, committables);
return manifestCommittable;
}

@Override
public ManifestCommittable combine(
long checkpointId,
long watermark,
ManifestCommittable oldCommitable,
List<Committable> committables) {
addFile(oldCommitable, committables);
return oldCommitable;
}

@Override
public void commit(List<ManifestCommittable> committables)
throws IOException, InterruptedException {
Expand All @@ -111,6 +109,22 @@ public void close() throws Exception {
commit.close();
}

private void addFile(ManifestCommittable manifestCommittable, List<Committable> committables) {
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
}

private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
if (committerMetrics == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,17 @@ public WrappedManifestCommittable combine(
long checkpointId, long watermark, List<MultiTableCommittable> committables) {
WrappedManifestCommittable wrappedManifestCommittable =
new WrappedManifestCommittable(checkpointId, watermark);
for (MultiTableCommittable committable : committables) {

ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
Identifier.create(committable.getDatabase(), committable.getTable()),
checkpointId,
watermark);
combineFile(checkpointId, watermark, wrappedManifestCommittable, committables);
return wrappedManifestCommittable;
}

switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
@Override
public WrappedManifestCommittable combine(
long checkpointId,
long watermark,
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
combineFile(checkpointId, watermark, wrappedManifestCommittable, committables);
return wrappedManifestCommittable;
}

Expand Down Expand Up @@ -177,6 +168,32 @@ public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(
return grouped;
}

private void combineFile(
long checkpointId,
long watermark,
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
for (MultiTableCommittable committable : committables) {
ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
Identifier.create(committable.getDatabase(), committable.getTable()),
checkpointId,
watermark);

switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
}

private StoreCommitter getStoreCommitter(Identifier tableId) {
StoreCommitter committer = tableCommitters.get(tableId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
Expand Down Expand Up @@ -278,6 +281,136 @@ public void testRestoreCommitUser() throws Exception {
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
}

@Test
public void testCommitInputEnd() throws Exception {
FileStoreTable table = createFileStoreTable();
String commitUser = UUID.randomUUID().toString();
OneInputStreamOperator<Committable, Committable> operator =
createCommitterOperator(table, commitUser, new NoopCommittableStateManager());
OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createTestHarness(operator);
testHarness.open();
Assertions.assertThatCode(
() -> {
long time = System.currentTimeMillis();
long cp = 0L;
testHarness.processElement(
new Committable(
Long.MAX_VALUE,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.snapshot(cp++, time++);
testHarness.processElement(
new Committable(
Long.MAX_VALUE,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.processElement(
new Committable(
Long.MAX_VALUE,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.snapshot(cp++, time++);
testHarness.snapshot(cp, time);
})
.doesNotThrowAnyException();

Assertions.assertThat(
((ManifestCommittable)
((CommitterOperator) operator)
.committablesPerCheckpoint.get(Long.MAX_VALUE))
.fileCommittables()
.size())
.isEqualTo(3);

Assertions.assertThatCode(
() -> {
long time = System.currentTimeMillis();
long cp = 0L;
testHarness.processElement(
new Committable(
0L,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.snapshot(cp++, time++);
testHarness.processElement(
new Committable(
0L,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.processElement(
new Committable(
Long.MAX_VALUE,
Committable.Kind.FILE,
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()))),
cp);
testHarness.snapshot(cp++, time++);
testHarness.snapshot(cp, time);
})
.hasRootCauseInstanceOf(RuntimeException.class)
.cause()
.hasMessageContaining("Repeatedly commit the same checkpoint files.");
}

private static OperatorSubtaskState writeAndSnapshot(
FileStoreTable table,
String commitUser,
Expand Down

0 comments on commit 2d300d7

Please sign in to comment.