diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java index 30a2b5b2eb81..d5d843d91bb1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java @@ -19,17 +19,24 @@ package org.apache.paimon.flink; import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.AuditLogTable; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.types.utils.TypeConversions; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; /** A {@link CatalogTable} to represent system table. */ public class SystemCatalogTable implements CatalogTable { @@ -46,10 +53,21 @@ public Table table() { @Override public Schema getUnresolvedSchema() { - return Schema.newBuilder() - .fromRowDataType( - TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType()))) - .build(); + Schema.Builder builder = Schema.newBuilder(); + builder.fromRowDataType( + TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType()))); + if (table instanceof AuditLogTable) { + Map newOptions = new HashMap<>(table.options()); + if (newOptions.keySet().stream() + .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { + WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions); + return builder.watermark( + watermarkSpec.getRowtimeAttribute(), + watermarkSpec.getWatermarkExpr()) + .build(); + } + } + return builder.build(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java index c213ef66f60a..11aef106246a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java @@ -41,6 +41,39 @@ public void testWatermark() throws Exception { innerTestWatermark(); } + @Test + public void testAuditLogWatermark() throws Exception { + String[] options = + new String[] { + "'scan.watermark.idle-timeout'='1s'", + "'scan.watermark.alignment.group'='group'", + "'scan.watermark.alignment.update-interval'='2s'", + "'scan.watermark.alignment.max-drift'='1s'" + }; + sql( + "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts) WITH (" + + String.join(",", options) + + ")"); + + BlockingIterator select = + BlockingIterator.of( + streamSqlIter( + "SELECT window_start, window_end, SUM(f0) FROM TABLE(" + + "TUMBLE(TABLE T$audit_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n" + + " GROUP BY window_start, window_end;")); + + sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')"); + sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:05')"); + + assertThat(select.collect(1)) + .containsExactlyInAnyOrder( + Row.of( + LocalDateTime.parse("2023-02-02T12:00"), + LocalDateTime.parse("2023-02-02T12:10"), + 1)); + select.close(); + } + @Disabled // TODO unstable alignment may block watermark generation @Test public void testWatermarkAlignment() throws Exception {