From ca40f9879b109128f10fa58cc34a676fc309bcae Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 14 May 2024 16:47:21 +0800 Subject: [PATCH 1/2] [core] Exclude watermark and logOffsets for normark table --- .../main/java/org/apache/paimon/Snapshot.java | 18 +++++- .../apache/paimon/tag/TagAutoCreation.java | 8 ++- .../java/org/apache/paimon/SnapshotTest.java | 60 +++++++++++++++++++ 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 20d41409267e..77ac03c22b99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -143,7 +143,7 @@ public class Snapshot { protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) - @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonInclude(JsonInclude.Include.NON_EMPTY) @Nullable protected final Map logOffsets; @@ -171,7 +171,7 @@ public class Snapshot { // null if there is no watermark in new committing, and the previous snapshot does not have a // watermark @JsonProperty(FIELD_WATERMARK) - @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = WatermarkFilter.class) @Nullable protected final Long watermark; @@ -516,4 +516,18 @@ public enum CommitKind { /** Collect statistics. */ ANALYZE } + + /** Json Filter for watermark field. */ + public static class WatermarkFilter { + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Long)) { + return true; + } + + long watermark = (long) obj; + return watermark == Long.MIN_VALUE; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 60f113761cff..af7947925765 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -109,10 +109,14 @@ public boolean forceCreatingSnapshot() { return false; } + Long watermark = latestSnapshot.watermark(); + if (watermark == null) { + return false; + } + LocalDateTime snapshotTime = LocalDateTime.ofInstant( - Instant.ofEpochMilli(latestSnapshot.watermark()), - ZoneId.systemDefault()); + Instant.ofEpochMilli(watermark), ZoneId.systemDefault()); return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), snapshotTime); } else if (timeExtractor instanceof ProcessTimeExtractor) { diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java index d8266c1c6d95..7fdaa3d4b2ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java @@ -20,6 +20,8 @@ import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + class SnapshotTest { @Test @@ -41,4 +43,62 @@ public void testJsonIgnoreProperties() { + " \"unknownKey\" : 22222\n" + "}"); } + + @Test + public void testIgnoreLogOffset() { + Snapshot snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"logOffsets\" : { }\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("logOffsets"); + + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"logOffsets\" : {\"1\" : 2}\n" + + "}"); + assertThat(snapshot.toJson()).contains("logOffsets"); + } + + @Test + public void testIgnoreWatermark() { + Snapshot snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"watermark\" : -9223372036854775808\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("watermark"); + + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"watermark\" : 121312312\n" + + "}"); + assertThat(snapshot.toJson()).contains("watermark"); + } } From d3121a50cf795b875558be42ef61cacd79a2d423 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 14 May 2024 16:50:39 +0800 Subject: [PATCH 2/2] add test --- .../test/java/org/apache/paimon/SnapshotTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java index 7fdaa3d4b2ff..0a8c95cc9478 100644 --- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java @@ -88,6 +88,18 @@ public void testIgnoreWatermark() { + "}"); assertThat(snapshot.toJson()).doesNotContain("watermark"); + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("watermark"); + snapshot = Snapshot.fromJson( "{\n"