diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 20d41409267e..77ac03c22b99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -143,7 +143,7 @@ public class Snapshot { protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) - @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonInclude(JsonInclude.Include.NON_EMPTY) @Nullable protected final Map logOffsets; @@ -171,7 +171,7 @@ public class Snapshot { // null if there is no watermark in new committing, and the previous snapshot does not have a // watermark @JsonProperty(FIELD_WATERMARK) - @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = WatermarkFilter.class) @Nullable protected final Long watermark; @@ -516,4 +516,18 @@ public enum CommitKind { /** Collect statistics. */ ANALYZE } + + /** Json Filter for watermark field. */ + public static class WatermarkFilter { + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Long)) { + return true; + } + + long watermark = (long) obj; + return watermark == Long.MIN_VALUE; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 60f113761cff..af7947925765 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -109,10 +109,14 @@ public boolean forceCreatingSnapshot() { return false; } + Long watermark = latestSnapshot.watermark(); + if (watermark == null) { + return false; + } + LocalDateTime snapshotTime = LocalDateTime.ofInstant( - Instant.ofEpochMilli(latestSnapshot.watermark()), - ZoneId.systemDefault()); + Instant.ofEpochMilli(watermark), ZoneId.systemDefault()); return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), snapshotTime); } else if (timeExtractor instanceof ProcessTimeExtractor) { diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java index d8266c1c6d95..0a8c95cc9478 100644 --- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java @@ -20,6 +20,8 @@ import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + class SnapshotTest { @Test @@ -41,4 +43,74 @@ public void testJsonIgnoreProperties() { + " \"unknownKey\" : 22222\n" + "}"); } + + @Test + public void testIgnoreLogOffset() { + Snapshot snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"logOffsets\" : { }\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("logOffsets"); + + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"logOffsets\" : {\"1\" : 2}\n" + + "}"); + assertThat(snapshot.toJson()).contains("logOffsets"); + } + + @Test + public void testIgnoreWatermark() { + Snapshot snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"watermark\" : -9223372036854775808\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("watermark"); + + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234\n" + + "}"); + assertThat(snapshot.toJson()).doesNotContain("watermark"); + + snapshot = + Snapshot.fromJson( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 5,\n" + + " \"schemaId\" : 0,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1234,\n" + + " \"watermark\" : 121312312\n" + + "}"); + assertThat(snapshot.toJson()).contains("watermark"); + } }