Skip to content

Commit

Permalink
integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
hdygxsj authored and JingsongLi committed Dec 11, 2023
1 parent cde6026 commit df962f3
Showing 1 changed file with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -32,10 +33,14 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/** Integration test for {@link FlinkExternalCatalog}. */
public class FlinkExternalCatalogITCase {

protected TableEnvironment tEnv;
private TableEnvironment tEnv;
private String catalog = "PAIMON_EXTERNAL";
private String catalog2 = "PAIMON_EXTERNAL2";

@TempDir public static java.nio.file.Path path;

Expand All @@ -47,7 +52,7 @@ public void setup() {
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
String catalog = "PAIMON_EXTERNAL";

Map<String, String> options = new HashMap<>();
options.put("type", "paimon-external");
options.put("warehouse", path.toUri().getPath());
Expand All @@ -62,21 +67,28 @@ public void setup() {
options.entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","))));
tEnv.useCatalog(catalog);
tEnv.executeSql(
String.format(
"CREATE CATALOG %s WITH (" + "%s" + ")",
catalog2,
options.entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","))));
}

@Test
public void testUseExternalTable() {
public void testMix() {
tEnv.useCatalog(catalog);
tEnv.executeSql(
"CREATE TABLE word_count (\n"
"CREATE TABLE word_count1 (\n"
+ " word STRING PRIMARY KEY NOT ENFORCED,\n"
+ " cnt BIGINT\n"
+ ") with (\n"
+ "\t'connector'='paimon'\n"
+ ")");
tEnv.executeSql(
String.format(
"CREATE TABLE file (\n"
"CREATE TABLE file1 (\n"
+ " word STRING PRIMARY KEY NOT ENFORCED,\n"
+ " cnt BIGINT\n"
+ ") with (\n"
Expand All @@ -85,21 +97,40 @@ public void testUseExternalTable() {
+ " 'format'='json' \n"
+ ")",
path2.toUri().getPath()));
tEnv.executeSql("insert into file select * from `word_count`");
tEnv.executeSql("insert into file1 select * from `word_count1`");
}

@Test
public void testUseSystemTable() {
tEnv.useCatalog(catalog);
tEnv.executeSql(
"CREATE TABLE word_count (\n"
"CREATE TABLE word_count2 (\n"
+ " word STRING PRIMARY KEY NOT ENFORCED,\n"
+ " cnt BIGINT\n"
+ ") with (\n"
+ "\t'connector'='paimon'\n"
+ ")");
tEnv.executeSql(
String.format(
"CREATE TABLE file (\n"
"CREATE TABLE file2 (\n"
+ " word STRING PRIMARY KEY NOT ENFORCED,\n"
+ " rowkind String,\n"
+ " cnt BIGINT\n"
+ ") with (\n"
+ " 'connector'='filesystem',\n"
+ " 'path'='file://%s', \n"
+ " 'format'='json' \n"
+ ")",
path2.toUri().getPath()));
tEnv.executeSql("insert into file2 select * from word_count2$audit_log");
}

@Test
public void testUseExternalTable() {
tEnv.useCatalog(catalog);
tEnv.executeSql(
String.format(
"CREATE TABLE file3 (\n"
+ " word STRING PRIMARY KEY NOT ENFORCED,\n"
+ " rowkind String,\n"
+ " cnt BIGINT\n"
Expand All @@ -109,6 +140,9 @@ public void testUseSystemTable() {
+ " 'format'='json' \n"
+ ")",
path2.toUri().getPath()));
tEnv.executeSql("insert into file select * from word_count$audit_log");
Table fileTable = tEnv.from("file3");
tEnv.useCatalog(catalog2);
Table fileTable2 = tEnv.from("file3");
assertThat(fileTable.getResolvedSchema()).isEqualTo(fileTable2.getResolvedSchema());
}
}

0 comments on commit df962f3

Please sign in to comment.