diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 31c3a28a823d..5da73341b857 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -518,20 +518,8 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema) { if (syncAllProperties()) { tblProperties = new HashMap<>(tableSchema.options()); - // add primary-key, partition-key, bucket-id to tblproperties - if (!tableSchema.primaryKeys().isEmpty()) { - tblProperties.put( - CoreOptions.PRIMARY_KEY.key(), String.join(",", tableSchema.primaryKeys())); - } - if (!tableSchema.partitionKeys().isEmpty()) { - tblProperties.put( - CoreOptions.PARTITION.key(), String.join(",", tableSchema.partitionKeys())); - } - if (!tableSchema.bucketKeys().isEmpty()) { - tblProperties.put( - CoreOptions.BUCKET_KEY.key(), String.join(",", tableSchema.bucketKeys())); - } - tblProperties.put(CoreOptions.BUCKET.key(), String.valueOf(tableSchema.numBuckets())); + // add primary-key, partition-key to tblproperties + tblProperties.putAll(convertToPropertiesTableKey(tableSchema)); } else { tblProperties = convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX); } @@ -691,7 +679,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException { isPaimonTable(table), "Table %s is not a paimon table in hive metastore.", identifier.getFullName()); - if (!newTable.getSd().getCols().equals(table.getSd().getCols())) { + if (!newTable.getSd().getCols().equals(table.getSd().getCols()) + || !newTable.getParameters().equals(table.getParameters())) { alterTableToHms(table, identifier, tableSchema); } } catch (NoSuchObjectException e) { @@ -819,12 +808,30 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche private void updateHmsTablePars(Table table, TableSchema schema) { if (syncAllProperties()) { table.getParameters().putAll(schema.options()); + table.getParameters().putAll(convertToPropertiesTableKey(schema)); } else { table.getParameters() .putAll(convertToPropertiesPrefixKey(schema.options(), HIVE_PREFIX)); } } + private Map convertToPropertiesTableKey(TableSchema tableSchema) { + Map properties = new HashMap<>(); + if (!tableSchema.primaryKeys().isEmpty()) { + properties.put( + CoreOptions.PRIMARY_KEY.key(), String.join(",", tableSchema.primaryKeys())); + } + if (!tableSchema.partitionKeys().isEmpty()) { + properties.put( + CoreOptions.PARTITION.key(), String.join(",", tableSchema.partitionKeys())); + } + if (!tableSchema.bucketKeys().isEmpty()) { + properties.put( + CoreOptions.BUCKET_KEY.key(), String.join(",", tableSchema.bucketKeys())); + } + return properties; + } + @VisibleForTesting public IMetaStoreClient getHmsClient() { try { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index d9c38a670988..c103564db95f 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -75,6 +75,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -382,7 +383,6 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception { "CREATE TABLE t01 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')") .await(); // assert contain properties - List descFormattedT01 = hiveShell.executeQuery("DESC FORMATTED t01"); assertThat( hiveShell .executeQuery("DESC FORMATTED t01") @@ -413,20 +413,6 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception { .contains("\tbucket \t3 ")) .isTrue(); - tEnv.executeSql("ALTER TABLE t01 SET ( 'file.format' = 'parquet' )").await(); - assertThat( - hiveShell - .executeQuery("DESC FORMATTED t01") - .contains("\tfile.format \tparquet ")) - .isTrue(); - - tEnv.executeSql("ALTER TABLE t01 SET ('owner' = 'hive')").await(); - assertThat( - hiveShell - .executeQuery("DESC FORMATTED t01") - .contains("\towner \thive ")) - .isTrue(); - tEnv.executeSql( String.join( "\n", @@ -477,20 +463,108 @@ public void testCreateCatalogWithSyncTblProperties() throws Exception { .executeQuery("DESC FORMATTED t02") .contains("\tbucket \t3 ")) .isFalse(); + } + + @Test + public void testAlterTableWithSyncTblProperties() + throws ExecutionException, InterruptedException { + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_03 WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG paimon_catalog_03").await(); + tEnv.executeSql("USE test_db").await(); + tEnv.executeSql( + "CREATE TABLE t03 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')") + .await(); - tEnv.executeSql("ALTER TABLE t02 SET ( 'file.format' = 'parquet' )").await(); + tEnv.executeSql("ALTER TABLE t03 SET ( 'file.format' = 'parquet' )").await(); assertThat( hiveShell - .executeQuery("DESC FORMATTED t02") + .executeQuery("DESC FORMATTED t03") .contains("\tfile.format \tparquet ")) .isFalse(); - tEnv.executeSql("ALTER TABLE t02 SET ('owner' = 'hive')").await(); assertThat( hiveShell - .executeQuery("DESC FORMATTED t02") - .contains("\towner \thive ")) + .executeQuery("DESC FORMATTED t03") + .contains("\tprimary-key \tcc,aa ")) + .isFalse(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tpartition \tcc ")) .isFalse(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tbucket \t3 ")) + .isFalse(); + + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_03_syn WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL',", + " 'sync-all-properties' = 'true'", + ")")) + .await(); + + tEnv.executeSql("USE CATALOG paimon_catalog_03_syn").await(); + tEnv.executeSql("USE test_db").await(); + + tEnv.executeSql("ALTER TABLE t03 SET ( 'file.format' = 'parquet' )").await(); + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tfile.format \tparquet ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tprimary-key \tcc,aa ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tpartition \tcc ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tbucket-key \taa ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\tbucket \t3 ")) + .isTrue(); + + tEnv.executeSql("ALTER TABLE t03 SET ('owner' = 'test')").await(); + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t03") + .contains("\towner \ttest ")) + .isTrue(); } @Test @@ -1615,6 +1689,96 @@ public void testRepairTableWithCustomLocation() throws Exception { .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", "dt=2020-01-03/hh=10"); } + @Test + public void testRepairTableWithSyncTblProperties() + throws ExecutionException, InterruptedException { + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_repair_03 WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG paimon_catalog_repair_03").await(); + tEnv.executeSql("USE test_db").await(); + tEnv.executeSql( + "CREATE TABLE repair_t03 ( aa INT, bb STRING, cc STRING, PRIMARY KEY (cc, aa) NOT ENFORCED) PARTITIONED BY (cc) WITH ('file.format' = 'avro', 'bucket' = '3')") + .await(); + + tEnv.executeSql("CALL sys.repair('test_db.repair_t03')"); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tfile.format \tavro ")) + .isFalse(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tprimary-key \tcc,aa ")) + .isFalse(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tpartition \tcc ")) + .isFalse(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tbucket \t3 ")) + .isFalse(); + + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_repair_syn_03 WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL',", + " 'sync-all-properties' = 'true'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG paimon_catalog_repair_syn_03").await(); + hiveShell.execute("use test_db"); + + tEnv.executeSql("CALL sys.repair('test_db.repair_t03')"); + hiveShell.executeQuery("DESC FORMATTED repair_t03").stream().forEach(System.out::println); + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tfile.format \tavro ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tprimary-key \tcc,aa ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tpartition \tcc ")) + .isTrue(); + + assertThat( + hiveShell + .executeQuery("DESC FORMATTED repair_t03") + .contains("\tbucket \t3 ")) + .isTrue(); + } + @Test public void testExpiredPartitionsSyncToMetastore() throws Exception { // Use flink to create a partitioned table and write data, hive read.