diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java new file mode 100644 index 000000000000..b20974e7f44e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.FileStoreTable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link CreateTagFromTimestampAction}. */ +public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { + + @BeforeEach + public void setUp() { + init(warehouse); + } + + @Test + public void testCreateTagsFromSnapshotsWatermark() throws Exception { + bEnv.executeSql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " PRIMARY KEY (k, dt) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '1'" + + ")"); + + bEnv.executeSql("insert into T values('k1', '2024-01-02')").await(); + // create snapshot 2 with watermark 1000. + bEnv.executeSql( + "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')") + .await(); + // create snapshot 3 with watermark 2000. + bEnv.executeSql( + "insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k3', '2024-01-02')") + .await(); + FileStoreTable table = getFileStoreTable("T"); + + Snapshot snapshot2 = table.snapshotManager().snapshot(2); + long commitTime2 = snapshot2.timeMillis(); + long watermark2 = snapshot2.watermark(); + + Snapshot snapshot3 = table.snapshotManager().snapshot(3); + long commitTime3 = snapshot3.timeMillis(); + long watermark3 = snapshot3.watermark(); + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + warehouse, + "--table", + database + ".T", + "--tag", + "tag2", + "--watermark", + Long.toString(watermark2 - 1)) + .run(); + assertThat(table.tagManager().tagExists("tag2")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()).isEqualTo(commitTime2); + + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + warehouse, + "--table", + database + ".T", + "--tag", + "tag3", + "--watermark", + Long.toString(watermark2 + 1)) + .run(); + assertThat(table.tagManager().tagExists("tag3")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3); + assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3); + } + + @Test + public void testCreateTagsFromTagsWatermark() throws Exception { + bEnv.executeSql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " PRIMARY KEY (k, dt) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '1'" + + ")"); + + bEnv.executeSql( + "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')") + .await(); + + bEnv.executeSql("CALL sys.create_tag('default.T', 'tag1', 1)").await(); + + // make snapshot-1 expire. + bEnv.executeSql( + "insert into T/*+ OPTIONS('end-input.watermark'= '2000'," + + " 'snapshot.num-retained.max' = '1'," + + " 'snapshot.num-retained.min' = '1') */" + + " values('k2', '2024-01-02')") + .await(); + + FileStoreTable table = getFileStoreTable("T"); + + assertThat(table.snapshotManager().snapshotExists(1)).isFalse(); + + Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1"); + + long tagsCommitTime = tagSnapshot1.timeMillis(); + long tagsWatermark = tagSnapshot1.watermark(); + + Snapshot snapshot2 = table.snapshotManager().snapshot(2); + long commitTime2 = snapshot2.timeMillis(); + long watermark2 = snapshot2.watermark(); + + assertThat(tagsWatermark == 1000).isTrue(); + assertThat(watermark2 == 2000).isTrue(); + + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + warehouse, + "--table", + database + ".T", + "--tag", + "tag2", + "--watermark", + Long.toString(tagsWatermark - 1)) + .run(); + assertThat(table.tagManager().tagExists("tag2")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark); + assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()) + .isEqualTo(tagsCommitTime); + + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + warehouse, + "--table", + database + ".T", + "--tag", + "tag3", + "--watermark", + Long.toString(watermark2 - 1)) + .run(); + assertThat(table.tagManager().tagExists("tag3")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java index a807b667f8b7..8e659e75c832 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java @@ -20,17 +20,12 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.CatalogITCaseBase; -import org.apache.paimon.flink.action.ActionBase; -import org.apache.paimon.flink.action.ActionFactory; -import org.apache.paimon.flink.action.CreateTagFromWatermarkAction; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; -import java.util.concurrent.ThreadLocalRandom; - import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; @@ -38,7 +33,7 @@ public class CreateTagFromWatermarkProcedureITCase extends CatalogITCaseBase { @Test - public void testCreatTagsFromSnapshotsWatermark() throws Exception { + public void testCreateTagsFromSnapshotsWatermark() throws Exception { sql( "CREATE TABLE T (" + " k STRING," @@ -85,67 +80,30 @@ public void testCreatTagsFromSnapshotsWatermark() throws Exception { assertThat(watermark1 == Long.MIN_VALUE).isTrue(); assertThat(watermark2 == 1000).isTrue(); assertThat(watermark3 == 2000).isTrue(); - if (ThreadLocalRandom.current().nextBoolean()) { - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag2'," - + "`watermark` => %s)", - watermark2 - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2)); - } else { - createAction( - CreateTagFromWatermarkAction.class, - "create_tag_from_watermark", - "--warehouse", - path, - "--table", - "default.T", - "--tag", - "tag2", - "--watermark", - Long.toString(watermark2 - 1)) - .run(); - } - Snapshot snapshot = table.tagManager().taggedSnapshot("tag2"); - assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(snapshot.watermark()).isEqualTo(watermark2); - assertThat(snapshot.timeMillis()).isEqualTo(commitTime2); - - if (ThreadLocalRandom.current().nextBoolean()) { - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag3'," - + "`watermark` => %s)", - watermark2 + 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3)); - } else { - createAction( - CreateTagFromWatermarkAction.class, - "create_tag_from_watermark", - "--warehouse", - path, - "--table", - "default.T", - "--tag", - "tag3", - "--watermark", - Long.toString(watermark2 + 1)) - .run(); - } - assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3); + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag2'," + + "`watermark` => %s)", + watermark2 - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2)); + + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag3'," + + "`watermark` => %s)", + watermark2 + 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3)); assertThatException() .isThrownBy( @@ -202,72 +160,28 @@ public void testCreateTagsFromTagsWatermark() throws Exception { assertThat(watermark2 == 2000).isTrue(); // create tag from tag1 that snapshot is 1. - if (ThreadLocalRandom.current().nextBoolean()) { - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag2'," - + "`watermark` => %s)", - tagsWatermark - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark)); - } else { - createAction( - CreateTagFromWatermarkAction.class, - "create_tag_from_watermark", - "--warehouse", - path, - "--table", - "default.T", - "--tag", - "tag2", - "--watermark", - Long.toString(tagsWatermark - 1)) - .run(); - } - assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark); - assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()) - .isEqualTo(tagsCommitTime); - - if (ThreadLocalRandom.current().nextBoolean()) { - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag3'," - + "`watermark` => %s)", - watermark2 - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2)); - } else { - createAction( - CreateTagFromWatermarkAction.class, - "create_tag_from_watermark", - "--warehouse", - path, - "--table", - "default.T", - "--tag", - "tag3", - "--watermark", - Long.toString(watermark2 - 1)) - .run(); - } - assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2); - } - - private T createAction(Class clazz, String... args) { - return ActionFactory.createAction(args) - .filter(clazz::isInstance) - .map(clazz::cast) - .orElseThrow(() -> new RuntimeException("Failed to create action")); + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag2'," + + "`watermark` => %s)", + tagsWatermark - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark)); + + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag3'," + + "`watermark` => %s)", + watermark2 - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2)); } }