From 9dbc03556a9b0d3bd25bb3c74bf9e8417b2daf61 Mon Sep 17 00:00:00 2001 From: HeavenZH Date: Mon, 29 Jul 2024 18:03:01 +0800 Subject: [PATCH 1/3] [flink] Support the watermark function of auditlog table --- .../paimon/flink/SystemCatalogTable.java | 26 +++++++++++--- .../apache/paimon/flink/WatermarkITCase.java | 35 ++++++++++++++++++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java index 30a2b5b2eb81..d5d843d91bb1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java @@ -19,17 +19,24 @@ package org.apache.paimon.flink; import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.AuditLogTable; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.types.utils.TypeConversions; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; /** A {@link CatalogTable} to represent system table. */ public class SystemCatalogTable implements CatalogTable { @@ -46,10 +53,21 @@ public Table table() { @Override public Schema getUnresolvedSchema() { - return Schema.newBuilder() - .fromRowDataType( - TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType()))) - .build(); + Schema.Builder builder = Schema.newBuilder(); + builder.fromRowDataType( + TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType()))); + if (table instanceof AuditLogTable) { + Map newOptions = new HashMap<>(table.options()); + if (newOptions.keySet().stream() + .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { + WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions); + return builder.watermark( + watermarkSpec.getRowtimeAttribute(), + watermarkSpec.getWatermarkExpr()) + .build(); + } + } + return builder.build(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java index c213ef66f60a..6e1ae01ecacd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java @@ -41,6 +41,39 @@ public void testWatermark() throws Exception { innerTestWatermark(); } + @Test + public void testAuditLogWatermark() throws Exception { + String[] options = + new String[] { + "'scan.watermark.idle-timeout'='1s'", + "'scan.watermark.alignment.group'='group'", + "'scan.watermark.alignment.update-interval'='2s'", + "'scan.watermark.alignment.max-drift'='1s'" + }; + sql( + "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts) WITH (" + + String.join(",", options) + + ")"); + + BlockingIterator select = + BlockingIterator.of( + streamSqlIter( + "SELECT window_start, window_end, SUM(f0) FROM TABLE(" + + "TUMBLE(TABLE T$audit_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + + " GROUP BY window_start, window_end;")); + + sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')"); + sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')"); + + assertThat(select.collect(1)) + .containsExactlyInAnyOrder( + Row.of( + LocalDateTime.parse("2023-02-02T12:00"), + LocalDateTime.parse("2023-02-02T12:10"), + 1)); + select.close(); + } + @Disabled // TODO unstable alignment may block watermark generation @Test public void testWatermarkAlignment() throws Exception { @@ -61,7 +94,7 @@ private void innerTestWatermark(String... options) throws Exception { BlockingIterator.of( streamSqlIter( "SELECT window_start, window_end, SUM(f0) FROM TABLE(" - + "TUMBLE(TABLE T, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + + "TUMBLE(TABLE T$audit_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + " GROUP BY window_start, window_end;")); sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')"); From c81248f106afaf2e169769af82021627d057e195 Mon Sep 17 00:00:00 2001 From: HeavenZH Date: Wed, 31 Jul 2024 17:06:38 +0800 Subject: [PATCH 2/3] fix --- .../src/test/java/org/apache/paimon/flink/WatermarkITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java index 6e1ae01ecacd..f8558737fccc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java @@ -94,7 +94,7 @@ private void innerTestWatermark(String... options) throws Exception { BlockingIterator.of( streamSqlIter( "SELECT window_start, window_end, SUM(f0) FROM TABLE(" - + "TUMBLE(TABLE T$audit_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + + "TUMBLE(TABLE T, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + " GROUP BY window_start, window_end;")); sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')"); From a6056643afa0abfc02b8502542c70ecf7b4f3bc9 Mon Sep 17 00:00:00 2001 From: HeavenZH Date: Wed, 31 Jul 2024 19:22:52 +0800 Subject: [PATCH 3/3] fix --- .../src/test/java/org/apache/paimon/flink/WatermarkITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java index f8558737fccc..11aef106246a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java @@ -63,7 +63,7 @@ public void testAuditLogWatermark() throws Exception { + " GROUP BY window_start, window_end;")); sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')"); - sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:01')"); + sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:05')"); assertThat(select.collect(1)) .containsExactlyInAnyOrder(