Skip to content

Commit

Permalink
[core] Exclude watermark and logOffsets for normark table
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed May 14, 2024
1 parent 1b82252 commit d810eb8
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
18 changes: 16 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class Snapshot {
protected final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Nullable
protected final Map<Integer, Long> logOffsets;

Expand Down Expand Up @@ -171,7 +171,7 @@ public class Snapshot {
// null if there is no watermark in new committing, and the previous snapshot does not have a
// watermark
@JsonProperty(FIELD_WATERMARK)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = WatermarkFilter.class)
@Nullable
protected final Long watermark;

Expand Down Expand Up @@ -516,4 +516,18 @@ public enum CommitKind {
/** Collect statistics. */
ANALYZE
}

/** Json Filter for watermark field. */
public static class WatermarkFilter {

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Long)) {
return true;
}

long watermark = (long) obj;
return watermark == Long.MIN_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ public boolean forceCreatingSnapshot() {
return false;
}

Long watermark = latestSnapshot.watermark();
if (watermark == null) {
return false;
}

LocalDateTime snapshotTime =
LocalDateTime.ofInstant(
Instant.ofEpochMilli(latestSnapshot.watermark()),
ZoneId.systemDefault());
Instant.ofEpochMilli(watermark), ZoneId.systemDefault());

return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), snapshotTime);
} else if (timeExtractor instanceof ProcessTimeExtractor) {
Expand Down
60 changes: 60 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class SnapshotTest {

@Test
Expand All @@ -41,4 +43,62 @@ public void testJsonIgnoreProperties() {
+ " \"unknownKey\" : 22222\n"
+ "}");
}

@Test
public void testIgnoreLogOffset() {
Snapshot snapshot =
Snapshot.fromJson(
"{\n"
+ " \"version\" : 3,\n"
+ " \"id\" : 5,\n"
+ " \"schemaId\" : 0,\n"
+ " \"commitIdentifier\" : 0,\n"
+ " \"commitKind\" : \"APPEND\",\n"
+ " \"timeMillis\" : 1234,\n"
+ " \"logOffsets\" : { }\n"
+ "}");
assertThat(snapshot.toJson()).doesNotContain("logOffsets");

snapshot =
Snapshot.fromJson(
"{\n"
+ " \"version\" : 3,\n"
+ " \"id\" : 5,\n"
+ " \"schemaId\" : 0,\n"
+ " \"commitIdentifier\" : 0,\n"
+ " \"commitKind\" : \"APPEND\",\n"
+ " \"timeMillis\" : 1234,\n"
+ " \"logOffsets\" : {\"1\" : 2}\n"
+ "}");
assertThat(snapshot.toJson()).contains("logOffsets");
}

@Test
public void testIgnoreWatermark() {
Snapshot snapshot =
Snapshot.fromJson(
"{\n"
+ " \"version\" : 3,\n"
+ " \"id\" : 5,\n"
+ " \"schemaId\" : 0,\n"
+ " \"commitIdentifier\" : 0,\n"
+ " \"commitKind\" : \"APPEND\",\n"
+ " \"timeMillis\" : 1234,\n"
+ " \"watermark\" : -9223372036854775808\n"
+ "}");
assertThat(snapshot.toJson()).doesNotContain("watermark");

snapshot =
Snapshot.fromJson(
"{\n"
+ " \"version\" : 3,\n"
+ " \"id\" : 5,\n"
+ " \"schemaId\" : 0,\n"
+ " \"commitIdentifier\" : 0,\n"
+ " \"commitKind\" : \"APPEND\",\n"
+ " \"timeMillis\" : 1234,\n"
+ " \"watermark\" : 121312312\n"
+ "}");
assertThat(snapshot.toJson()).contains("watermark");
}
}

0 comments on commit d810eb8

Please sign in to comment.