From 6e7a6efeba6269ea72a487e621bebc9ef054bb22 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Thu, 29 Aug 2024 21:16:17 +0800 Subject: [PATCH] add CreateTagFromWatermarkAction --- .../CreateTagFromTimestampActionFactory.java | 4 +- .../action/CreateTagFromWatermarkAction.java | 56 ++++++ .../CreateTagFromWatermarkActionFactory.java | 71 +++++++ .../org.apache.paimon.factories.Factory | 1 + ...CreateTagFromWatermarkProcedureITCase.java | 180 +++++++++++++----- 5 files changed, 263 insertions(+), 49 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java index f70b41da81160..2d2fae73925de 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromTimestampActionFactory.java @@ -47,10 +47,10 @@ public Optional create(MultipleParameterToolAdapter params) { String timeRetained = params.get(TIME_RETAINED); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); - CreateTagFromTimestampAction migrateFileAction = + CreateTagFromTimestampAction createTagFromTimestampAction = new CreateTagFromTimestampAction( warehouse, table, tag, timestamp, timeRetained, catalogConfig); - return Optional.of(migrateFileAction); + return Optional.of(createTagFromTimestampAction); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java new file mode 100644 index 0000000000000..8afa54082f29d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkAction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.CreateTagFromWatermarkProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Create tag from watermark action for Flink. */ +public class CreateTagFromWatermarkAction extends ActionBase { + private final String table; + private final String tag; + private final Long watermark; + private final String timeRetained; + + public CreateTagFromWatermarkAction( + String warehouse, + String table, + String tag, + Long watermark, + String timeRetained, + Map catalogConfig) { + super(warehouse, catalogConfig); + this.table = table; + this.tag = tag; + this.watermark = watermark; + this.timeRetained = timeRetained; + } + + @Override + public void run() throws Exception { + CreateTagFromWatermarkProcedure createTagFromWatermarkProcedure = + new CreateTagFromWatermarkProcedure(); + createTagFromWatermarkProcedure.withCatalog(catalog); + createTagFromWatermarkProcedure.call( + new DefaultProcedureContext(env), table, tag, watermark, timeRetained); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java new file mode 100644 index 0000000000000..1fb86bde7ebbd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link CreateTagFromWatermarkAction}. */ +public class CreateTagFromWatermarkActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "create_tag_from_watermark"; + + private static final String TABLE = "table"; + + private static final String TAG = "tag"; + + private static final String WATERMARK = "watermark"; + + private static final String TIME_RETAINED = "time_retained"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + String warehouse = params.get(WAREHOUSE); + String table = params.get(TABLE); + String tag = params.get(TAG); + Long watermark = Long.parseLong(params.get(WATERMARK)); + String timeRetained = params.get(TIME_RETAINED); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + + CreateTagFromWatermarkAction createTagFromWatermarkAction = + new CreateTagFromWatermarkAction( + warehouse, table, tag, watermark, timeRetained, catalogConfig); + return Optional.of(createTagFromWatermarkAction); + } + + @Override + public void printHelp() { + System.out.println("Action \"create_tag_from_watermark\" create tag from watermark."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " create_tag_from_watermark --warehouse " + + "--table " + + "--tag " + + "--watermark " + + "[--timeRetained ] " + + "[--options =,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index d268add737ded..b72b8d5cb2f9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -23,6 +23,7 @@ org.apache.paimon.flink.action.MergeIntoActionFactory org.apache.paimon.flink.action.RollbackToActionFactory org.apache.paimon.flink.action.CreateTagActionFactory org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory +org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory org.apache.paimon.flink.action.DeleteTagActionFactory org.apache.paimon.flink.action.ResetConsumerActionFactory org.apache.paimon.flink.action.MigrateTableActionFactory diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java index 7d3dd0398843b..a807b667f8b72 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java @@ -20,12 +20,17 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.ActionFactory; +import org.apache.paimon.flink.action.CreateTagFromWatermarkAction; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import java.util.concurrent.ThreadLocalRandom; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; @@ -80,30 +85,67 @@ public void testCreatTagsFromSnapshotsWatermark() throws Exception { assertThat(watermark1 == Long.MIN_VALUE).isTrue(); assertThat(watermark2 == 1000).isTrue(); assertThat(watermark3 == 2000).isTrue(); + if (ThreadLocalRandom.current().nextBoolean()) { + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag2'," + + "`watermark` => %s)", + watermark2 - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2)); + } else { + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + path, + "--table", + "default.T", + "--tag", + "tag2", + "--watermark", + Long.toString(watermark2 - 1)) + .run(); + } + Snapshot snapshot = table.tagManager().taggedSnapshot("tag2"); + assertThat(table.tagManager().tagExists("tag2")).isTrue(); + assertThat(snapshot.watermark()).isEqualTo(watermark2); + assertThat(snapshot.timeMillis()).isEqualTo(commitTime2); - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag2'," - + "`watermark` => %s)", - watermark2 - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2)); - - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag3'," - + "`watermark` => %s)", - watermark2 + 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3)); + if (ThreadLocalRandom.current().nextBoolean()) { + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag3'," + + "`watermark` => %s)", + watermark2 + 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3)); + } else { + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + path, + "--table", + "default.T", + "--tag", + "tag3", + "--watermark", + Long.toString(watermark2 + 1)) + .run(); + } + + assertThat(table.tagManager().tagExists("tag3")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3); + assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3); assertThatException() .isThrownBy( @@ -121,7 +163,7 @@ public void testCreatTagsFromSnapshotsWatermark() throws Exception { } @Test - public void testCreatTagsFromTagsWatermark() throws Exception { + public void testCreateTagsFromTagsWatermark() throws Exception { sql( "CREATE TABLE T (" + " k STRING," @@ -160,28 +202,72 @@ public void testCreatTagsFromTagsWatermark() throws Exception { assertThat(watermark2 == 2000).isTrue(); // create tag from tag1 that snapshot is 1. - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag2'," - + "`watermark` => %s)", - tagsWatermark - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark)); - - assertThat( - sql( - "CALL sys.create_tag_from_watermark(" - + "`table` => 'default.T'," - + "`tag` => 'tag3'," - + "`watermark` => %s)", - watermark2 - 1) - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2)); + if (ThreadLocalRandom.current().nextBoolean()) { + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag2'," + + "`watermark` => %s)", + tagsWatermark - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark)); + } else { + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + path, + "--table", + "default.T", + "--tag", + "tag2", + "--watermark", + Long.toString(tagsWatermark - 1)) + .run(); + } + assertThat(table.tagManager().tagExists("tag2")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark); + assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()) + .isEqualTo(tagsCommitTime); + + if (ThreadLocalRandom.current().nextBoolean()) { + assertThat( + sql( + "CALL sys.create_tag_from_watermark(" + + "`table` => 'default.T'," + + "`tag` => 'tag3'," + + "`watermark` => %s)", + watermark2 - 1) + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2)); + } else { + createAction( + CreateTagFromWatermarkAction.class, + "create_tag_from_watermark", + "--warehouse", + path, + "--table", + "default.T", + "--tag", + "tag3", + "--watermark", + Long.toString(watermark2 - 1)) + .run(); + } + assertThat(table.tagManager().tagExists("tag3")).isTrue(); + assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2); + } + + private T createAction(Class clazz, String... args) { + return ActionFactory.createAction(args) + .filter(clazz::isInstance) + .map(clazz::cast) + .orElseThrow(() -> new RuntimeException("Failed to create action")); } }