Skip to content

Commit

Permalink
[flink] support multiple writers writing to the same partition when u…
Browse files Browse the repository at this point in the history
…sing kafka as logSystem in unaware bucket mode. (#4516)
  • Loading branch information
liming30 authored Nov 13, 2024
1 parent 7eaf30f commit 30bf503
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ public void addFileCommittable(CommitMessage commitMessage) {
commitMessages.add(commitMessage);
}

public void addLogOffset(int bucket, long offset) {
if (logOffsets.containsKey(bucket)) {
public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
if (!allowDuplicate && logOffsets.containsKey(bucket)) {
throw new RuntimeException(
String.format(
"bucket-%d appears multiple times, which is not possible.", bucket));
}
logOffsets.put(bucket, offset);
long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset), offset);
logOffsets.put(bucket, newOffset);
}

public long identifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public List<Snapshot> commitData(
null,
Collections.emptyList(),
(commit, committable) -> {
logOffsets.forEach(committable::addLogOffset);
logOffsets.forEach(
(bucket, offset) -> committable.addLogOffset(bucket, offset, false));
commit.commit(committable, Collections.emptyMap());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static void addFileCommittables(

if (!committable.logOffsets().containsKey(bucket)) {
int offset = ID.incrementAndGet();
committable.addLogOffset(bucket, offset);
committable.addLogOffset(bucket, offset, false);
assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.flink.sink.partition.PartitionListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand All @@ -44,6 +45,7 @@ public class StoreCommitter implements Committer<Committable, ManifestCommittabl
private final TableCommitImpl commit;
@Nullable private final CommitterMetrics committerMetrics;
private final PartitionListeners partitionListeners;
private final boolean allowLogOffsetDuplicate;

public StoreCommitter(FileStoreTable table, TableCommit commit, Context context) {
this.commit = (TableCommitImpl) commit;
Expand All @@ -60,6 +62,7 @@ public StoreCommitter(FileStoreTable table, TableCommit commit, Context context)
} catch (Exception e) {
throw new RuntimeException(e);
}
allowLogOffsetDuplicate = table.bucketMode() == BucketMode.BUCKET_UNAWARE;
}

@VisibleForTesting
Expand Down Expand Up @@ -94,7 +97,8 @@ public ManifestCommittable combine(
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
manifestCommittable.addLogOffset(
offset.bucket(), offset.offset(), allowLogOffsetDuplicate);
break;
}
}
Expand Down Expand Up @@ -138,6 +142,10 @@ public void close() throws Exception {
partitionListeners.close();
}

public boolean allowLogOffsetDuplicate() {
return allowLogOffsetDuplicate;
}

private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
if (committerMetrics == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public WrappedManifestCommittable combine(
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
for (MultiTableCommittable committable : committables) {
Identifier identifier =
Identifier.create(committable.getDatabase(), committable.getTable());
ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
Identifier.create(committable.getDatabase(), committable.getTable()),
checkpointId,
watermark);
identifier, checkpointId, watermark);

switch (committable.kind()) {
case FILE:
Expand All @@ -106,7 +106,9 @@ public WrappedManifestCommittable combine(
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
StoreCommitter committer = tableCommitters.get(identifier);
manifestCommittable.addLogOffset(
offset.bucket(), offset.offset(), committer.allowLogOffsetDuplicate());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void addFileCommittables(

if (!committable.logOffsets().containsKey(bucket)) {
int offset = ID.incrementAndGet();
committable.addLogOffset(bucket, offset);
committable.addLogOffset(bucket, offset, false);
assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
}
}
Expand Down

0 comments on commit 30bf503

Please sign in to comment.