diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index dd5632c18b42..6e1e9bba076b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -184,6 +184,7 @@ public class CoreOptions implements Serializable { .defaultValue("data-") .withDescription("Specify the file name prefix of data files."); + @Immutable public static final ConfigOption DATA_FILE_PATH_DIRECTORY = key("data-file.path-directory") .stringType() diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index a590ede6d2e6..975c1c0b7aed 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -95,6 +95,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; +import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; import static org.apache.paimon.CoreOptions.TYPE; @@ -771,22 +772,27 @@ protected void createTableImpl(Identifier identifier, Schema schema) { private Table createHiveTable( Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { - checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE); + Map options = tableSchema.options(); + checkArgument(Options.fromMap(options).get(TYPE) != FORMAT_TABLE); Map tblProperties; if (syncAllProperties()) { - tblProperties = new HashMap<>(tableSchema.options()); - + tblProperties = new HashMap<>(options); // add primary-key, partition-key to tblproperties tblProperties.putAll(convertToPropertiesTableKey(tableSchema)); } else { - tblProperties = convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX); - if (tableSchema.options().containsKey(PARTITION_EXPIRATION_TIME.key())) { + tblProperties = convertToPropertiesPrefixKey(options, HIVE_PREFIX); + if (options.containsKey(PARTITION_EXPIRATION_TIME.key())) { // This property will be stored in the 'table_params' table of the HMS database for // querying by other engines or products. tblProperties.put( PARTITION_EXPIRATION_TIME.key(), - tableSchema.options().get(PARTITION_EXPIRATION_TIME.key())); + options.get(PARTITION_EXPIRATION_TIME.key())); + } + if (options.containsKey(DATA_FILE_PATH_DIRECTORY.key())) { + tblProperties.put( + DATA_FILE_PATH_DIRECTORY.key(), + options.get(DATA_FILE_PATH_DIRECTORY.key())); } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index f7be538c259d..0661988648f4 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -18,6 +18,7 @@ package org.apache.paimon.hive; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; import org.apache.paimon.hive.pool.CachedClientPool; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; import java.util.ArrayList; @@ -56,19 +58,25 @@ public class HiveMetastoreClient implements MetastoreClient { private final ClientPool clients; private final StorageDescriptor sd; + private final String dataFilePath; HiveMetastoreClient(Identifier identifier, ClientPool clients) throws TException, InterruptedException { this.identifier = identifier; this.clients = clients; - this.sd = - this.clients - .run( - client -> - client.getTable( - identifier.getDatabaseName(), - identifier.getTableName())) - .getSd(); + Table table = + this.clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), identifier.getTableName())); + this.sd = table.getSd(); + this.dataFilePath = + table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) + ? sd.getLocation() + + "/" + + table.getParameters() + .get(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) + : sd.getLocation(); } @Override @@ -185,7 +193,7 @@ private Partition toHivePartition( Partition hivePartition = new Partition(); StorageDescriptor newSd = new StorageDescriptor(sd); newSd.setLocation( - sd.getLocation() + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec)); + dataFilePath + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec)); hivePartition.setDbName(identifier.getDatabaseName()); hivePartition.setTableName(identifier.getTableName()); hivePartition.setValues(new ArrayList<>(partitionSpec.values())); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 526e24250751..d51cdce34cb9 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.fs.Path import org.apache.paimon.hive.HiveMetastoreClient import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.paimon.table.FileStoreTable @@ -296,43 +297,64 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { test("Paimon DDL with hive catalog: sync partitions to HMS") { Seq(sparkCatalogName, paimonHiveCatalogName).foreach { catalogName => - val dbName = "default" - val tblName = "t" - spark.sql(s"USE $catalogName.$dbName") - withTable(tblName) { - spark.sql(s""" - |CREATE TABLE $tblName (id INT, pt INT) - |USING PAIMON - |TBLPROPERTIES ('metastore.partitioned-table' = 'true') - |PARTITIONED BY (pt) - |""".stripMargin) + Seq("", "data").foreach { + dataFilePathDir => + val dbName = "default" + val tblName = "t" + spark.sql(s"USE $catalogName.$dbName") + withTable(tblName) { + spark.sql(s""" + |CREATE TABLE $tblName (id INT, pt INT) + |USING PAIMON + |TBLPROPERTIES ( + |${if (dataFilePathDir.isEmpty) "" + else s"'data-file.path-directory' = '$dataFilePathDir',"} + |'metastore.partitioned-table' = 'true' + |) + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable(dbName, tblName) + val metastoreClient = table + .catalogEnvironment() + .metastoreClientFactory() + .create() + .asInstanceOf[HiveMetastoreClient] + .client() + val fileIO = table.fileIO() + + def containsDir(root: Path, targets: Array[String]): Boolean = { + targets.forall(fileIO.listDirectories(root).map(_.getPath.getName).contains) + } + + spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)") + // check partitions in paimon + checkAnswer( + spark.sql(s"show partitions $tblName"), + Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"))) + // check partitions in HMS + assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) + // check partitions in filesystem + if (dataFilePathDir.isEmpty) { + assert(containsDir(table.location(), Array("pt=1", "pt=2", "pt=3"))) + } else { + assert(!containsDir(table.location(), Array("pt=1", "pt=2", "pt=3"))) + assert( + containsDir(new Path(table.location(), "data"), Array("pt=1", "pt=2", "pt=3"))) + } - val metastoreClient = loadTable(dbName, tblName) - .catalogEnvironment() - .metastoreClientFactory() - .create() - .asInstanceOf[HiveMetastoreClient] - .client() - - spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)") - // check partitions in paimon - checkAnswer( - spark.sql(s"show partitions $tblName"), - Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"))) - // check partitions in HMS - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) - - spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)") - checkAnswer( - spark.sql(s"show partitions $tblName"), - Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4"))) - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4) - - spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)") - checkAnswer( - spark.sql(s"show partitions $tblName"), - Seq(Row("pt=2"), Row("pt=3"), Row("pt=4"))) - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) + spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)") + checkAnswer( + spark.sql(s"show partitions $tblName"), + Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4"))) + assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4) + + spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)") + checkAnswer( + spark.sql(s"show partitions $tblName"), + Seq(Row("pt=2"), Row("pt=3"), Row("pt=4"))) + assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) + } } } }