Skip to content

Commit

Permalink
fix minus
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Dec 13, 2023
1 parent 2d300d7 commit ec60f1c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,29 @@ public boolean forceCreatingSnapshot() {
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables) {
ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
addFile(manifestCommittable, committables);
return manifestCommittable;
return combine(checkpointId, watermark, manifestCommittable, committables);
}

@Override
public ManifestCommittable combine(
long checkpointId,
long watermark,
ManifestCommittable oldCommitable,
ManifestCommittable manifestCommittable,
List<Committable> committables) {
addFile(oldCommitable, committables);
return oldCommitable;
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
return manifestCommittable;
}

@Override
Expand Down Expand Up @@ -109,22 +120,6 @@ public void close() throws Exception {
commit.close();
}

private void addFile(ManifestCommittable manifestCommittable, List<Committable> committables) {
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
}

private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
if (committerMetrics == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public WrappedManifestCommittable combine(
long checkpointId, long watermark, List<MultiTableCommittable> committables) {
WrappedManifestCommittable wrappedManifestCommittable =
new WrappedManifestCommittable(checkpointId, watermark);
combineFile(checkpointId, watermark, wrappedManifestCommittable, committables);
return wrappedManifestCommittable;
return combine(checkpointId, watermark, wrappedManifestCommittable, committables);
}

@Override
Expand All @@ -99,7 +98,25 @@ public WrappedManifestCommittable combine(
long watermark,
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
combineFile(checkpointId, watermark, wrappedManifestCommittable, committables);
for (MultiTableCommittable committable : committables) {
ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
Identifier.create(committable.getDatabase(), committable.getTable()),
checkpointId,
watermark);

switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
return wrappedManifestCommittable;
}

Expand Down Expand Up @@ -168,32 +185,6 @@ public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(
return grouped;
}

private void combineFile(
long checkpointId,
long watermark,
WrappedManifestCommittable wrappedManifestCommittable,
List<MultiTableCommittable> committables) {
for (MultiTableCommittable committable : committables) {
ManifestCommittable manifestCommittable =
wrappedManifestCommittable.computeCommittableIfAbsent(
Identifier.create(committable.getDatabase(), committable.getTable()),
checkpointId,
watermark);

switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
}
}
}

private StoreCommitter getStoreCommitter(Identifier tableId) {
StoreCommitter committer = tableCommitters.get(tableId);

Expand Down

0 comments on commit ec60f1c

Please sign in to comment.