Skip to content

Commit

Permalink
[hive] repair table support sync all properties to hms (#4047)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Aug 26, 2024
1 parent 67032b3 commit ae6f2bc
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,8 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema) {
if (syncAllProperties()) {
tblProperties = new HashMap<>(tableSchema.options());

// add primary-key, partition-key, bucket-id to tblproperties
if (!tableSchema.primaryKeys().isEmpty()) {
tblProperties.put(
CoreOptions.PRIMARY_KEY.key(), String.join(",", tableSchema.primaryKeys()));
}
if (!tableSchema.partitionKeys().isEmpty()) {
tblProperties.put(
CoreOptions.PARTITION.key(), String.join(",", tableSchema.partitionKeys()));
}
if (!tableSchema.bucketKeys().isEmpty()) {
tblProperties.put(
CoreOptions.BUCKET_KEY.key(), String.join(",", tableSchema.bucketKeys()));
}
tblProperties.put(CoreOptions.BUCKET.key(), String.valueOf(tableSchema.numBuckets()));
// add primary-key, partition-key to tblproperties
tblProperties.putAll(convertToPropertiesTableKey(tableSchema));
} else {
tblProperties = convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX);
}
Expand Down Expand Up @@ -691,7 +679,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
identifier.getFullName());
if (!newTable.getSd().getCols().equals(table.getSd().getCols())) {
if (!newTable.getSd().getCols().equals(table.getSd().getCols())
|| !newTable.getParameters().equals(table.getParameters())) {
alterTableToHms(table, identifier, tableSchema);
}
} catch (NoSuchObjectException e) {
Expand Down Expand Up @@ -819,12 +808,30 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche
private void updateHmsTablePars(Table table, TableSchema schema) {
if (syncAllProperties()) {
table.getParameters().putAll(schema.options());
table.getParameters().putAll(convertToPropertiesTableKey(schema));
} else {
table.getParameters()
.putAll(convertToPropertiesPrefixKey(schema.options(), HIVE_PREFIX));
}
}

private Map<String, String> convertToPropertiesTableKey(TableSchema tableSchema) {
Map<String, String> properties = new HashMap<>();
if (!tableSchema.primaryKeys().isEmpty()) {
properties.put(
CoreOptions.PRIMARY_KEY.key(), String.join(",", tableSchema.primaryKeys()));
}
if (!tableSchema.partitionKeys().isEmpty()) {
properties.put(
CoreOptions.PARTITION.key(), String.join(",", tableSchema.partitionKeys()));
}
if (!tableSchema.bucketKeys().isEmpty()) {
properties.put(
CoreOptions.BUCKET_KEY.key(), String.join(",", tableSchema.bucketKeys()));
}
return properties;
}

@VisibleForTesting
public IMetaStoreClient getHmsClient() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -382,7 +383,6 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception {
"CREATE TABLE t01 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')")
.await();
// assert contain properties
List<String> descFormattedT01 = hiveShell.executeQuery("DESC FORMATTED t01");
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t01")
Expand Down Expand Up @@ -413,20 +413,6 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception {
.contains("\tbucket \t3 "))
.isTrue();

tEnv.executeSql("ALTER TABLE t01 SET ( 'file.format' = 'parquet' )").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t01")
.contains("\tfile.format \tparquet "))
.isTrue();

tEnv.executeSql("ALTER TABLE t01 SET ('owner' = 'hive')").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t01")
.contains("\towner \thive "))
.isTrue();

tEnv.executeSql(
String.join(
"\n",
Expand Down Expand Up @@ -477,20 +463,108 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception {
.executeQuery("DESC FORMATTED t02")
.contains("\tbucket \t3 "))
.isFalse();
}

@Test
public void testAlterTableWithSyncTblProperties()
throws ExecutionException, InterruptedException {
tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_03 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_03").await();
tEnv.executeSql("USE test_db").await();
tEnv.executeSql(
"CREATE TABLE t03 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')")
.await();

tEnv.executeSql("ALTER TABLE t02 SET ( 'file.format' = 'parquet' )").await();
tEnv.executeSql("ALTER TABLE t03 SET ( 'file.format' = 'parquet' )").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t02")
.executeQuery("DESC FORMATTED t03")
.contains("\tfile.format \tparquet "))
.isFalse();

tEnv.executeSql("ALTER TABLE t02 SET ('owner' = 'hive')").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t02")
.contains("\towner \thive "))
.executeQuery("DESC FORMATTED t03")
.contains("\tprimary-key \tcc,aa "))
.isFalse();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tpartition \tcc "))
.isFalse();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tbucket \t3 "))
.isFalse();

tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_03_syn WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'sync-all-properties' = 'true'",
")"))
.await();

tEnv.executeSql("USE CATALOG paimon_catalog_03_syn").await();
tEnv.executeSql("USE test_db").await();

tEnv.executeSql("ALTER TABLE t03 SET ( 'file.format' = 'parquet' )").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tfile.format \tparquet "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tprimary-key \tcc,aa "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tpartition \tcc "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tbucket-key \taa "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\tbucket \t3 "))
.isTrue();

tEnv.executeSql("ALTER TABLE t03 SET ('owner' = 'test')").await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t03")
.contains("\towner \ttest "))
.isTrue();
}

@Test
Expand Down Expand Up @@ -1615,6 +1689,96 @@ public void testRepairTableWithCustomLocation() throws Exception {
.containsExactlyInAnyOrder("dt=2020-01-02/hh=09", "dt=2020-01-03/hh=10");
}

@Test
public void testRepairTableWithSyncTblProperties()
throws ExecutionException, InterruptedException {
tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_repair_03 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_repair_03").await();
tEnv.executeSql("USE test_db").await();
tEnv.executeSql(
"CREATE TABLE repair_t03 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')")
.await();

tEnv.executeSql("CALL sys.repair('test_db.repair_t03')");

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tfile.format \tavro "))
.isFalse();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tprimary-key \tcc,aa "))
.isFalse();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tpartition \tcc "))
.isFalse();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tbucket \t3 "))
.isFalse();

tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_repair_syn_03 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'sync-all-properties' = 'true'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_repair_syn_03").await();
hiveShell.execute("use test_db");

tEnv.executeSql("CALL sys.repair('test_db.repair_t03')");
hiveShell.executeQuery("DESC FORMATTED repair_t03").stream().forEach(System.out::println);
assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tfile.format \tavro "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tprimary-key \tcc,aa "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tpartition \tcc "))
.isTrue();

assertThat(
hiveShell
.executeQuery("DESC FORMATTED repair_t03")
.contains("\tbucket \t3 "))
.isTrue();
}

@Test
public void testExpiredPartitionsSyncToMetastore() throws Exception {
// Use flink to create a partitioned table and write data, hive read.
Expand Down

0 comments on commit ae6f2bc

Please sign in to comment.