Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Jan 25, 2024
1 parent 4b97ede commit 9dad8ab
Showing 1 changed file with 40 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void testFlinkWriteAndHiveRead() throws Exception {
"SELECT f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, hex(f11), hex(f12), f13, f14, f15, f15[0] as f15a, f16['key1'] as f16a, f16['key2'] as f16b, f17, f17.f0, f17.f1 FROM t ORDER BY f3"))
.isEqualTo(
Arrays.asList(
"true\t1\t1\t1\t1234567890123456789\t1.23\t3.14159\t1234.56\tABC\tv1\tHello, World!\t01\t010203\t2023-01-01\t2023-01-01 12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t1",
"true\t\t\t\t1234567890123456789\t.23\t3.14159\t1234.56\tABC\tv1\tHello, World!\t01\t010203\t2023-01-01\t2023-01-01 12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t",
"false\t2\t2\t2\t234567890123456789\t2.34\t2.111111\t2345.67\tDEF\tv2\tApache Paimon\t04\t040506\t2023-02-01\t2023-02-01 12:00:00.456\t[\"value4\",\"value5\",\"value6\"]\tvalue4\tvalue11\tvalue22\t{\"f0\":\"v2\",\"f1\":2}\tv2\t2"));

assertThatThrownBy(
Expand Down Expand Up @@ -404,14 +404,14 @@ public void testHiveCreateAndFlinkInsertRead() throws Exception {
public void testCreateTableAs() throws Exception {
tEnv.executeSql("CREATE TABLE t (a INT)").await();
tEnv.executeSql("INSERT INTO t VALUES(1)").await();
tEnv.executeSql("CREATE TABLE t1 AS SELECT * FROM t").await();
tEnv.executeSql("CREATE TABLE t AS SELECT * FROM t").await();
List<Row> result =
collect(
"SELECT schema_id, fields, partition_keys, "
+ "primary_keys, options, `comment` FROM t1$schemas s");
assertThat(result.toString())
.isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
List<Row> data = collect("SELECT * FROM t1");
List<Row> data = collect("SELECT * FROM t");
assertThat(data).contains(Row.of(1));

// change option
Expand Down Expand Up @@ -560,64 +560,64 @@ public void testCreateTableAs() throws Exception {

@Test
public void testRenameTable() throws Exception {
tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
tEnv.executeSql("CREATE TABLE t (a INT)").await();
tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
tEnv.executeSql("INSERT INTO t1 SELECT 1").await();
tEnv.executeSql("INSERT INTO t SELECT 1").await();
// the source table do not exist.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO t4"))
.hasMessage(
"Table `my_hive`.`test_db`.`t3` doesn't exist or is a temporary table.");

// the target table has existed.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO t2"))
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t RENAME TO t2"))
.hasMessage(
"Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.t2");
"Could not execute ALTER TABLE my_hive.test_db.t RENAME TO my_hive.test_db.t2");

// the target table name has upper case.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO T1"))
.hasMessage("Table name [T1] cannot contain upper case in the catalog.");
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t RENAME TO t"))
.hasMessage("Table name [t] cannot contain upper case in the catalog.");

tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
tEnv.executeSql("ALTER TABLE t RENAME TO t3").await();

// hive read
List<String> tables = hiveShell.executeQuery("SHOW TABLES");
assertThat(tables.contains("t3")).isTrue();
assertThat(tables.contains("t1")).isFalse();
assertThat(tables.contains("t")).isFalse();
List<String> data = hiveShell.executeQuery("SELECT * FROM t3");
assertThat(data).containsExactlyInAnyOrder("1");

// flink read
List<Row> tablesFromFlink = collect("SHOW TABLES");
assertThat(tablesFromFlink).contains(Row.of("t3"));
assertThat(tablesFromFlink).doesNotContain(Row.of("t1"));
assertThat(tablesFromFlink).doesNotContain(Row.of("t"));

List<Row> dataFromFlink = collect("SELECT * FROM t3");
assertThat(dataFromFlink).contains(Row.of(1));
}

@Test
public void testAlterTable() throws Exception {
tEnv.executeSql("CREATE TABLE t1 (a INT, b STRING, c TIMESTAMP(3))").await();
tEnv.executeSql("CREATE TABLE t (a INT, b STRING, c TIMESTAMP(3))").await();
List<String> result =
collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[a, INT, true, null, null, null]",
"+I[b, STRING, true, null, null, null]",
"+I[c, TIMESTAMP(3), true, null, null, null]");

// Dropping Columns
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 DROP b")).doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t DROP b")).doesNotThrowAnyException();
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[a, INT, true, null, null, null]",
"+I[c, TIMESTAMP(3), true, null, null, null]");

// Adding New Columns
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 ADD (d BIGINT, e CHAR(5))"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t ADD (d BIGINT, e CHAR(5))"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[a, INT, true, null, null, null]",
Expand All @@ -626,9 +626,9 @@ public void testAlterTable() throws Exception {
"+I[e, CHAR(5), true, null, null, null]");

// Adding Column Position
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 ADD f INT AFTER a"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t ADD f INT AFTER a"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[a, INT, true, null, null, null]",
Expand All @@ -638,9 +638,9 @@ public void testAlterTable() throws Exception {
"+I[e, CHAR(5), true, null, null, null]");

// Changing Column Position
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 MODIFY f INT AFTER e"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t MODIFY f INT AFTER e"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[a, INT, true, null, null, null]",
Expand All @@ -650,9 +650,9 @@ public void testAlterTable() throws Exception {
"+I[f, INT, true, null, null, null]");

// Renaming Column Name
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 RENAME a TO g"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t RENAME a TO g"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[g, INT, true, null, null, null]",
Expand All @@ -662,9 +662,9 @@ public void testAlterTable() throws Exception {
"+I[f, INT, true, null, null, null]");

// Changing Column Type
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 MODIFY d DOUBLE"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t MODIFY d DOUBLE"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[g, INT, true, null, null, null]",
Expand All @@ -674,9 +674,9 @@ public void testAlterTable() throws Exception {
"+I[f, INT, true, null, null, null]");

// Changing Column Comment
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t1 MODIFY g INT COMMENT 'test comment'"))
assertThatCode(() -> tEnv.executeSql("ALTER TABLE t MODIFY g INT COMMENT 'test comment'"))
.doesNotThrowAnyException();
result = collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
result = collect("DESC t").stream().map(Objects::toString).collect(Collectors.toList());
assertThat(result)
.containsExactly(
"+I[g, INT, true, null, null, null, test comment]",
Expand Down Expand Up @@ -940,41 +940,33 @@ public void testHistoryPartitionsCascadeToUpdate() throws Exception {
tEnv.executeSql(
String.join(
"\n",
"CREATE TABLE t1 (",
"CREATE TABLE t (",
" k INT,",
" v BIGINT,",
" PRIMARY KEY (k) NOT ENFORCED",
") WITH (",
" 'bucket' = '2',",
" 'metastore.tag-to-partition' = 'dt'",
")"));
tEnv.executeSql("INSERT INTO t1 VALUES (1, 10), (2, 20)").await();

// TODO modify to CALL after Flink 1.18
Table table =
((DataCatalogTable)
tEnv.getCatalog(tEnv.getCurrentCatalog())
.get()
.getTable(new ObjectPath(tEnv.getCurrentDatabase(), "t1")))
.table();
table.createTag("2023-10-16", 1);

assertThat(hiveShell.executeQuery("SHOW PARTITIONS t1"))
tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");

assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("dt=2023-10-16");

assertThat(hiveShell.executeQuery("SELECT k, v FROM t1 WHERE dt='2023-10-16'"))
assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE dt='2023-10-16'"))
.containsExactlyInAnyOrder("1\t10", "2\t20");

assertThat(hiveShell.executeQuery("SELECT * FROM t1 WHERE dt='2023-10-16'"))
assertThat(hiveShell.executeQuery("SELECT * FROM t WHERE dt='2023-10-16'"))
.containsExactlyInAnyOrder("1\t10\t2023-10-16", "2\t20\t2023-10-16");

tEnv.executeSql("INSERT INTO t1 VALUES (3, 30), (4, 40)").await();
table.createTag("2023-10-17", 2);
tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");

tEnv.executeSql("ALTER TABLE t1 ADD z INT");
tEnv.executeSql("INSERT INTO t1 VALUES (3, 30, 5), (4, 40, 6)").await();
tEnv.executeSql("ALTER TABLE t ADD z INT");
tEnv.executeSql("INSERT INTO t VALUES (3, 30, 5), (4, 40, 6)").await();

assertThat(hiveShell.executeQuery("SELECT * FROM t1 WHERE dt='2023-10-16'"))
assertThat(hiveShell.executeQuery("SELECT * FROM t WHERE dt='2023-10-16'"))
.containsExactlyInAnyOrder("1\t10\tNULL\t2023-10-16", "2\t20\tNULL\t2023-10-16");
}

Expand Down

0 comments on commit 9dad8ab

Please sign in to comment.