Skip to content

Commit

Permalink
[core] Force creating snapshot for tag with auto process time creation (
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Oct 30, 2023
1 parent e97f697 commit 313f8b2
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,44 @@
/** Manifest commit message. */
public class WrappedManifestCommittable {

private Map<Identifier, ManifestCommittable> manifestCommittables;
private final long checkpointId;

public WrappedManifestCommittable() {
private final long watermark;

private final Map<Identifier, ManifestCommittable> manifestCommittables;

public WrappedManifestCommittable(long checkpointId, long watermark) {
this.checkpointId = checkpointId;
this.watermark = watermark;
this.manifestCommittables =
new TreeMap<>(
Comparator.comparing(Identifier::getDatabaseName)
.thenComparing(Identifier::getObjectName));
}

public long checkpointId() {
return checkpointId;
}

public long watermark() {
return watermark;
}

public Map<Identifier, ManifestCommittable> manifestCommittables() {
return manifestCommittables;
}

public ManifestCommittable computeCommittableIfAbsent(
Identifier identifier, long checkpointId, long watermark) {
return manifestCommittables.computeIfAbsent(
identifier, id -> new ManifestCommittable(checkpointId, watermark));
}

public void putManifestCommittable(
Identifier identifier, ManifestCommittable manifestCommittable) {
manifestCommittables.put(identifier, manifestCommittable);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -46,58 +75,25 @@ public boolean equals(Object o) {
return false;
}
WrappedManifestCommittable that = (WrappedManifestCommittable) o;

if (manifestCommittables.size() != that.manifestCommittables.size()) {
return false;
}

for (Map.Entry<Identifier, ManifestCommittable> entry : manifestCommittables.entrySet()) {
if (!Objects.equals(entry.getValue(), that.manifestCommittables.get(entry.getKey()))) {
return false;
}
}

return true;
return checkpointId == that.checkpointId
&& watermark == that.watermark
&& Objects.equals(manifestCommittables, that.manifestCommittables);
}

@Override
public int hashCode() {
return Objects.hash(manifestCommittables.values().toArray(new Object[0]));
return Objects.hash(checkpointId, watermark, manifestCommittables);
}

@Override
public String toString() {
return String.format(
"WrappedManifestCommittable {" + "manifestCommittables = %s",
formatManifestCommittables());
}

private String formatManifestCommittables() {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (Identifier id : manifestCommittables.keySet()) {
ManifestCommittable committable = manifestCommittables.get(id);
sb.append(String.format("%s=%s, ", id.getFullName(), committable.toString()));
}
if (manifestCommittables.size() > 0) {
sb.delete(sb.length() - 2, sb.length());
}
sb.append("}");
return sb.toString();
}

public ManifestCommittable computeCommittableIfAbsent(
Identifier identifier, long checkpointId, long watermark) {
return manifestCommittables.computeIfAbsent(
identifier, id -> new ManifestCommittable(checkpointId, watermark));
}

public ManifestCommittable putManifestCommittable(
Identifier identifier, ManifestCommittable manifestCommittable) {
return manifestCommittables.put(identifier, manifestCommittable);
}

public Map<Identifier, ManifestCommittable> getManifestCommittables() {
return manifestCommittables;
return "WrappedManifestCommittable{"
+ "checkpointId="
+ checkpointId
+ ", watermark="
+ watermark
+ ", manifestCommittables="
+ manifestCommittables
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public TableCommitImpl(
this.tableName = tableName;
}

public boolean forceCreatingSnapshot() {
return tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot();
}

@Override
public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {
this.overwritePartition = overwritePartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ private TagAutoCreation(
}
}

public boolean forceCreatingSnapshot() {
return timeExtractor.forceCreatingSnapshot();
}

public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface TagTimeExtractor {

Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark);

boolean forceCreatingSnapshot();

/** Extract time from snapshot time millis. */
class ProcessTimeExtractor implements TagTimeExtractor {

Expand All @@ -43,6 +45,11 @@ public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark)
.atZone(ZoneId.systemDefault())
.toLocalDateTime());
}

@Override
public boolean forceCreatingSnapshot() {
return true;
}
}

/** Extract time from snapshot watermark. */
Expand All @@ -63,6 +70,11 @@ public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark)
return Optional.of(
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
}

@Override
public boolean forceCreatingSnapshot() {
return false;
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public void testEquals() {
ManifestCommittable committable1 = create();
ManifestCommittable committable2 = create();

WrappedManifestCommittable wrapped1 = new WrappedManifestCommittable();
WrappedManifestCommittable wrapped1 = new WrappedManifestCommittable(-1, -1);
wrapped1.putManifestCommittable(Identifier.create("db", "table1"), committable1);
wrapped1.putManifestCommittable(Identifier.create("db", "table2"), committable2);

// add manifest committables in reverse order
WrappedManifestCommittable wrapped2 = new WrappedManifestCommittable();
WrappedManifestCommittable wrapped2 = new WrappedManifestCommittable(-1, -1);
wrapped2.putManifestCommittable(Identifier.create("db", "table2"), committable2);
wrapped2.putManifestCommittable(Identifier.create("db", "table1"), committable1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
*/
public interface Committer<CommitT, GlobalCommitT> extends AutoCloseable {

boolean forceCreatingSnapshot();

/** Compute an aggregated committable from a list of committables. */
GlobalCommitT combine(long checkpointId, long watermark, List<CommitT> committables)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -154,8 +155,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
committer.commit(committables(headMap));
List<GlobalCommitT> committables = committables(headMap);
committer.commit(committables);
headMap.clear();

if (committables.isEmpty()) {
if (committer.forceCreatingSnapshot()) {
GlobalCommitT commit = toCommittables(checkpointId, Collections.emptyList());
committer.commit(Collections.singletonList(commit));
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public CommitterMetrics getCommitterMetrics() {
return committerMetrics;
}

@Override
public boolean forceCreatingSnapshot() {
return commit.forceCreatingSnapshot();
}

@Override
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ public StoreMultiCommitter(
this.isCompactJob = isCompactJob;
}

@Override
public boolean forceCreatingSnapshot() {
return true;
}

@Override
public WrappedManifestCommittable combine(
long checkpointId, long watermark, List<MultiTableCommittable> committables) {
WrappedManifestCommittable wrappedManifestCommittable = new WrappedManifestCommittable();
WrappedManifestCommittable wrappedManifestCommittable =
new WrappedManifestCommittable(checkpointId, watermark);
for (MultiTableCommittable committable : committables) {

ManifestCommittable manifestCommittable =
Expand All @@ -109,15 +115,29 @@ public WrappedManifestCommittable combine(
@Override
public void commit(List<WrappedManifestCommittable> committables)
throws IOException, InterruptedException {
if (committables.isEmpty()) {
return;
}

// key by table id
Map<Identifier, List<ManifestCommittable>> committableMap = groupByTable(committables);

for (Map.Entry<Identifier, List<ManifestCommittable>> entry : committableMap.entrySet()) {
Identifier tableId = entry.getKey();
List<ManifestCommittable> committableList = entry.getValue();
StoreCommitter committer = getStoreCommitter(tableId);
committer.commit(committableList);
committableMap.keySet().forEach(this::getStoreCommitter);

long checkpointId = committables.get(0).checkpointId();
long watermark = committables.get(0).watermark();
for (Map.Entry<Identifier, StoreCommitter> entry : tableCommitters.entrySet()) {
List<ManifestCommittable> committableList = committableMap.get(entry.getKey());
StoreCommitter committer = entry.getValue();
if (committableList != null) {
committer.commit(committableList);
} else {
// try best to commit empty snapshot, but tableCommitters may not contain all tables
if (committer.forceCreatingSnapshot()) {
ManifestCommittable combine =
committer.combine(checkpointId, watermark, Collections.emptyList());
committer.commit(Collections.singletonList(combine));
}
}
}
}

Expand All @@ -138,7 +158,7 @@ private Map<Identifier, List<ManifestCommittable>> groupByTable(
.flatMap(
wrapped -> {
Map<Identifier, ManifestCommittable> manifestCommittables =
wrapped.getManifestCommittables();
wrapped.manifestCommittables();
return manifestCommittables.entrySet().stream()
.map(entry -> Tuple2.of(entry.getKey(), entry.getValue()));
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ public byte[] serialize(WrappedManifestCommittable wrapped) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);

view.writeLong(wrapped.checkpointId());
view.writeLong(wrapped.watermark());

// Serialize ManifestCommittable map inside WrappedManifestCommittable
Map<Identifier, ManifestCommittable> map = wrapped.getManifestCommittables();
Map<Identifier, ManifestCommittable> map = wrapped.manifestCommittables();
view.writeInt(map.size());
for (Map.Entry<Identifier, ManifestCommittable> entry : map.entrySet()) {
byte[] serializedKey = entry.getKey().getFullName().getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -83,9 +86,13 @@ public WrappedManifestCommittable deserialize(int version, byte[] serialized)

DataInputDeserializer view = new DataInputDeserializer(serialized);

long checkpointId = view.readLong();
long watermark = view.readLong();

// Deserialize ManifestCommittable map inside WrappedManifestCommittable
int mapSize = view.readInt();
WrappedManifestCommittable wrappedManifestCommittable = new WrappedManifestCommittable();
WrappedManifestCommittable wrappedManifestCommittable =
new WrappedManifestCommittable(checkpointId, watermark);
for (int i = 0; i < mapSize; i++) {
int keyLength = view.readInt();
byte[] serializedKey = new byte[keyLength];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
Expand Down Expand Up @@ -332,6 +334,46 @@ public void testWatermarkCommit() throws Exception {
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
}

@Test
public void testEmptyCommit() throws Exception {
FileStoreTable table = createFileStoreTable();

OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createRecoverableTestHarness(table);
testHarness.open();

testHarness.snapshot(1, 1);
testHarness.notifyOfCompletedCheckpoint(1);
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNull();
}

@Test
public void testEmptyCommitWithProcessTimeTag() throws Exception {
FileStoreTable table =
createFileStoreTable(
options ->
options.set(
CoreOptions.TAG_AUTOMATIC_CREATION,
CoreOptions.TagCreationMode.PROCESS_TIME));

OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createRecoverableTestHarness(table);
testHarness.open();

testHarness.snapshot(1, 1);
testHarness.notifyOfCompletedCheckpoint(1);
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(1);

testHarness.snapshot(2, 2);
testHarness.notifyOfCompletedCheckpoint(2);
snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(2);
}

// ------------------------------------------------------------------------
// Metrics tests
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 313f8b2

Please sign in to comment.