Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Sep 3, 2024
1 parent 27b3246 commit 8050575
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.FileStoreTable;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link CreateTagFromTimestampAction}. */
public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase {

@BeforeEach
public void setUp() {
init(warehouse);
}

@Test
public void testCreateTagsFromSnapshotsWatermark() throws Exception {
bEnv.executeSql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1'"
+ ")");

bEnv.executeSql("insert into T values('k1', '2024-01-02')").await();
// create snapshot 2 with watermark 1000.
bEnv.executeSql(
"insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')")
.await();
// create snapshot 3 with watermark 2000.
bEnv.executeSql(
"insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k3', '2024-01-02')")
.await();
FileStoreTable table = getFileStoreTable("T");

Snapshot snapshot2 = table.snapshotManager().snapshot(2);
long commitTime2 = snapshot2.timeMillis();
long watermark2 = snapshot2.watermark();

Snapshot snapshot3 = table.snapshotManager().snapshot(3);
long commitTime3 = snapshot3.timeMillis();
long watermark3 = snapshot3.watermark();
createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--tag",
"tag2",
"--watermark",
Long.toString(watermark2 - 1))
.run();
assertThat(table.tagManager().tagExists("tag2")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(watermark2);
assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()).isEqualTo(commitTime2);

createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--tag",
"tag3",
"--watermark",
Long.toString(watermark2 + 1))
.run();
assertThat(table.tagManager().tagExists("tag3")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3);
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3);
}

@Test
public void testCreateTagsFromTagsWatermark() throws Exception {
bEnv.executeSql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1'"
+ ")");

bEnv.executeSql(
"insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k2', '2024-01-02')")
.await();

bEnv.executeSql("CALL sys.create_tag('default.T', 'tag1', 1)").await();

// make snapshot-1 expire.
bEnv.executeSql(
"insert into T/*+ OPTIONS('end-input.watermark'= '2000',"
+ " 'snapshot.num-retained.max' = '1',"
+ " 'snapshot.num-retained.min' = '1') */"
+ " values('k2', '2024-01-02')")
.await();

FileStoreTable table = getFileStoreTable("T");

assertThat(table.snapshotManager().snapshotExists(1)).isFalse();

Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1");

long tagsCommitTime = tagSnapshot1.timeMillis();
long tagsWatermark = tagSnapshot1.watermark();

Snapshot snapshot2 = table.snapshotManager().snapshot(2);
long commitTime2 = snapshot2.timeMillis();
long watermark2 = snapshot2.watermark();

assertThat(tagsWatermark == 1000).isTrue();
assertThat(watermark2 == 2000).isTrue();

createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--tag",
"tag2",
"--watermark",
Long.toString(tagsWatermark - 1))
.run();
assertThat(table.tagManager().tagExists("tag2")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark);
assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis())
.isEqualTo(tagsCommitTime);

createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--tag",
"tag3",
"--watermark",
Long.toString(watermark2 - 1))
.run();
assertThat(table.tagManager().tagExists("tag3")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2);
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,20 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.CreateTagFromWatermarkAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotNotExistException;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;

/** IT Case for {@link CreateTagFromWatermarkProcedure}. */
public class CreateTagFromWatermarkProcedureITCase extends CatalogITCaseBase {

@Test
public void testCreatTagsFromSnapshotsWatermark() throws Exception {
public void testCreateTagsFromSnapshotsWatermark() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
Expand Down Expand Up @@ -85,67 +80,30 @@ public void testCreatTagsFromSnapshotsWatermark() throws Exception {
assertThat(watermark1 == Long.MIN_VALUE).isTrue();
assertThat(watermark2 == 1000).isTrue();
assertThat(watermark3 == 2000).isTrue();
if (ThreadLocalRandom.current().nextBoolean()) {
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag2',"
+ "`watermark` => %s)",
watermark2 - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2));
} else {
createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
path,
"--table",
"default.T",
"--tag",
"tag2",
"--watermark",
Long.toString(watermark2 - 1))
.run();
}
Snapshot snapshot = table.tagManager().taggedSnapshot("tag2");
assertThat(table.tagManager().tagExists("tag2")).isTrue();
assertThat(snapshot.watermark()).isEqualTo(watermark2);
assertThat(snapshot.timeMillis()).isEqualTo(commitTime2);

if (ThreadLocalRandom.current().nextBoolean()) {
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag3',"
+ "`watermark` => %s)",
watermark2 + 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3));
} else {
createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
path,
"--table",
"default.T",
"--tag",
"tag3",
"--watermark",
Long.toString(watermark2 + 1))
.run();
}

assertThat(table.tagManager().tagExists("tag3")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3);
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3);
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag2',"
+ "`watermark` => %s)",
watermark2 - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag2, 2, %s, %s]", commitTime2, watermark2));

assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag3',"
+ "`watermark` => %s)",
watermark2 + 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag3, 3, %s, %s]", commitTime3, watermark3));

assertThatException()
.isThrownBy(
Expand Down Expand Up @@ -202,72 +160,28 @@ public void testCreateTagsFromTagsWatermark() throws Exception {
assertThat(watermark2 == 2000).isTrue();

// create tag from tag1 that snapshot is 1.
if (ThreadLocalRandom.current().nextBoolean()) {
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag2',"
+ "`watermark` => %s)",
tagsWatermark - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark));
} else {
createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
path,
"--table",
"default.T",
"--tag",
"tag2",
"--watermark",
Long.toString(tagsWatermark - 1))
.run();
}
assertThat(table.tagManager().tagExists("tag2")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark);
assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis())
.isEqualTo(tagsCommitTime);

if (ThreadLocalRandom.current().nextBoolean()) {
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag3',"
+ "`watermark` => %s)",
watermark2 - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2));
} else {
createAction(
CreateTagFromWatermarkAction.class,
"create_tag_from_watermark",
"--warehouse",
path,
"--table",
"default.T",
"--tag",
"tag3",
"--watermark",
Long.toString(watermark2 - 1))
.run();
}
assertThat(table.tagManager().tagExists("tag3")).isTrue();
assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2);
assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2);
}

private <T extends ActionBase> T createAction(Class<T> clazz, String... args) {
return ActionFactory.createAction(args)
.filter(clazz::isInstance)
.map(clazz::cast)
.orElseThrow(() -> new RuntimeException("Failed to create action"));
assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag2',"
+ "`watermark` => %s)",
tagsWatermark - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag2, 1, %s, %s]", tagsCommitTime, tagsWatermark));

assertThat(
sql(
"CALL sys.create_tag_from_watermark("
+ "`table` => 'default.T',"
+ "`tag` => 'tag3',"
+ "`watermark` => %s)",
watermark2 - 1)
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
String.format("+I[tag3, 2, %s, %s]", commitTime2, watermark2));
}
}

0 comments on commit 8050575

Please sign in to comment.