Skip to content

Commit

Permalink
[hive] Fix sync hms partition with data-file.path-directory (apache#4735
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Zouxxyy authored Dec 18, 2024
1 parent 0b684ca commit 99f0b23
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class CoreOptions implements Serializable {
.defaultValue("data-")
.withDescription("Specify the file name prefix of data files.");

@Immutable
public static final ConfigOption<String> DATA_FILE_PATH_DIRECTORY =
key("data-file.path-directory")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.TYPE;
Expand Down Expand Up @@ -771,22 +772,27 @@ protected void createTableImpl(Identifier identifier, Schema schema) {

private Table createHiveTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);
Map<String, String> options = tableSchema.options();
checkArgument(Options.fromMap(options).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
if (syncAllProperties()) {
tblProperties = new HashMap<>(tableSchema.options());

tblProperties = new HashMap<>(options);
// add primary-key, partition-key to tblproperties
tblProperties.putAll(convertToPropertiesTableKey(tableSchema));
} else {
tblProperties = convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX);
if (tableSchema.options().containsKey(PARTITION_EXPIRATION_TIME.key())) {
tblProperties = convertToPropertiesPrefixKey(options, HIVE_PREFIX);
if (options.containsKey(PARTITION_EXPIRATION_TIME.key())) {
// This property will be stored in the 'table_params' table of the HMS database for
// querying by other engines or products.
tblProperties.put(
PARTITION_EXPIRATION_TIME.key(),
tableSchema.options().get(PARTITION_EXPIRATION_TIME.key()));
options.get(PARTITION_EXPIRATION_TIME.key()));
}
if (options.containsKey(DATA_FILE_PATH_DIRECTORY.key())) {
tblProperties.put(
DATA_FILE_PATH_DIRECTORY.key(),
options.get(DATA_FILE_PATH_DIRECTORY.key()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.hive.pool.CachedClientPool;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

import java.util.ArrayList;
Expand All @@ -56,19 +58,25 @@ public class HiveMetastoreClient implements MetastoreClient {

private final ClientPool<IMetaStoreClient, TException> clients;
private final StorageDescriptor sd;
private final String dataFilePath;

HiveMetastoreClient(Identifier identifier, ClientPool<IMetaStoreClient, TException> clients)
throws TException, InterruptedException {
this.identifier = identifier;
this.clients = clients;
this.sd =
this.clients
.run(
client ->
client.getTable(
identifier.getDatabaseName(),
identifier.getTableName()))
.getSd();
Table table =
this.clients.run(
client ->
client.getTable(
identifier.getDatabaseName(), identifier.getTableName()));
this.sd = table.getSd();
this.dataFilePath =
table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key())
? sd.getLocation()
+ "/"
+ table.getParameters()
.get(CoreOptions.DATA_FILE_PATH_DIRECTORY.key())
: sd.getLocation();
}

@Override
Expand Down Expand Up @@ -185,7 +193,7 @@ private Partition toHivePartition(
Partition hivePartition = new Partition();
StorageDescriptor newSd = new StorageDescriptor(sd);
newSd.setLocation(
sd.getLocation() + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec));
dataFilePath + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec));
hivePartition.setDbName(identifier.getDatabaseName());
hivePartition.setTableName(identifier.getTableName());
hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.fs.Path
import org.apache.paimon.hive.HiveMetastoreClient
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FileStoreTable
Expand Down Expand Up @@ -296,43 +297,64 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
test("Paimon DDL with hive catalog: sync partitions to HMS") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
val dbName = "default"
val tblName = "t"
spark.sql(s"USE $catalogName.$dbName")
withTable(tblName) {
spark.sql(s"""
|CREATE TABLE $tblName (id INT, pt INT)
|USING PAIMON
|TBLPROPERTIES ('metastore.partitioned-table' = 'true')
|PARTITIONED BY (pt)
|""".stripMargin)
Seq("", "data").foreach {
dataFilePathDir =>
val dbName = "default"
val tblName = "t"
spark.sql(s"USE $catalogName.$dbName")
withTable(tblName) {
spark.sql(s"""
|CREATE TABLE $tblName (id INT, pt INT)
|USING PAIMON
|TBLPROPERTIES (
|${if (dataFilePathDir.isEmpty) ""
else s"'data-file.path-directory' = '$dataFilePathDir',"}
|'metastore.partitioned-table' = 'true'
|)
|PARTITIONED BY (pt)
|""".stripMargin)

val table = loadTable(dbName, tblName)
val metastoreClient = table
.catalogEnvironment()
.metastoreClientFactory()
.create()
.asInstanceOf[HiveMetastoreClient]
.client()
val fileIO = table.fileIO()

def containsDir(root: Path, targets: Array[String]): Boolean = {
targets.forall(fileIO.listDirectories(root).map(_.getPath.getName).contains)
}

spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)")
// check partitions in paimon
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
// check partitions in HMS
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)
// check partitions in filesystem
if (dataFilePathDir.isEmpty) {
assert(containsDir(table.location(), Array("pt=1", "pt=2", "pt=3")))
} else {
assert(!containsDir(table.location(), Array("pt=1", "pt=2", "pt=3")))
assert(
containsDir(new Path(table.location(), "data"), Array("pt=1", "pt=2", "pt=3")))
}

val metastoreClient = loadTable(dbName, tblName)
.catalogEnvironment()
.metastoreClientFactory()
.create()
.asInstanceOf[HiveMetastoreClient]
.client()

spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)")
// check partitions in paimon
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
// check partitions in HMS
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)

spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)")
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4)

spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)")
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)
spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)")
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4)

spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)")
checkAnswer(
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)
}
}
}
}
Expand Down

0 comments on commit 99f0b23

Please sign in to comment.