Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 20, 2024
1 parent 84914af commit fadf650
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

protected boolean externalTableEnabled() {
return false;
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand Down Expand Up @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
validateExternalTableSupport(schema.options());

// check db exists
getDatabase(identifier.getDatabaseName());
Expand Down Expand Up @@ -590,6 +595,15 @@ private void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateExternalTableSupport(Map<String, String> options) {
if (!externalTableEnabled() && options.containsKey(CoreOptions.PATH.key())) {
throw new UnsupportedOperationException(
String.format(
"The current catalog %s does not support external tables, so specifying the path is not allowed when creating a table.",
this.getClass().getSimpleName()));
}
}

// =============================== Meta in File System =====================================

protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -210,6 +209,12 @@ public Path getTableLocation(Identifier identifier) {
return getTableLocation(identifier, table);
}

private Path initialTableLocation(Map<String, String> tableOptions, Identifier identifier) {
return tableOptions.containsKey(CoreOptions.PATH.key())
? new Path(tableOptions.get(CoreOptions.PATH.key()))
: getTableLocation(identifier, null);
}

private Path getTableLocation(Identifier identifier, @Nullable Table table) {
try {
String databaseName = identifier.getDatabaseName();
Expand Down Expand Up @@ -634,8 +639,11 @@ public void createFormatTable(Identifier identifier, Schema schema) {
options,
schema.comment());
try {
Path location = getTableLocation(identifier, null);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location);
Map<String, String> tableOptions = schema.options();
boolean externalTable =
options.containsKey(CoreOptions.PATH.key()) || usingExternalTable();
Path location = initialTableLocation(tableOptions, identifier);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location, externalTable);
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db and fs if failed.
Expand All @@ -654,18 +662,19 @@ private boolean usingExternalTable() {
@Override
protected void dropTableImpl(Identifier identifier) {
try {
boolean externalTable = isExternalTable(getHmsTable(identifier));
clients.execute(
client ->
client.dropTable(
identifier.getDatabaseName(),
identifier.getTableName(),
true,
!externalTable,
false,
true));

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
if (usingExternalTable()) {
if (externalTable) {
return;
}

Expand All @@ -680,7 +689,7 @@ protected void dropTableImpl(Identifier identifier) {
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee);
}
} catch (TException e) {
} catch (TException | TableNotExistException e) {
throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -691,13 +700,12 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
Path location = getTableLocation(identifier, null);
Map<String, String> tableOptions = schema.options();
boolean externalTable = options.containsKey(CoreOptions.PATH.key()) || usingExternalTable();
Path location = initialTableLocation(tableOptions, identifier);
TableSchema tableSchema;
try {
tableSchema =
schemaManager(identifier, location).createTable(schema, usingExternalTable());
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand All @@ -709,7 +717,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
clients.execute(
client ->
client.createTable(createHiveTable(identifier, tableSchema, location)));
client.createTable(
createHiveTable(
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
Expand All @@ -720,7 +730,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}
}

private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) {
private Table createHiveTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
Expand All @@ -740,13 +751,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa
}
}

Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE);
Table table =
newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location) {
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);

Expand All @@ -757,7 +769,7 @@ private Table createHiveFormatTable(

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider);
Table table = newHmsTable(identifier, tblProperties, provider, externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
Expand Down Expand Up @@ -865,6 +877,11 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
}

@Override
protected boolean externalTableEnabled() {
return true;
}

public boolean syncAllProperties() {
return catalogOptions.get(SYNC_ALL_PROPERTIES);
}
Expand Down Expand Up @@ -921,10 +938,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
TableSchema tableSchema =
tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema, location);

try {
Table newTable = null;
try {
Table table = getHmsTable(identifier);
newTable =
createHiveTable(identifier, tableSchema, location, isExternalTable(table));
checkArgument(
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
Expand All @@ -935,7 +955,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
}
} catch (TableNotExistException e) {
// hive table does not exist.
clients.execute(client -> client.createTable(newTable));
Table finalNewTable = newTable;
clients.execute(client -> client.createTable(finalNewTable));
}

// repair partitions
Expand Down Expand Up @@ -1012,13 +1033,17 @@ public static boolean isView(Table table) {
return table != null && TableType.VIRTUAL_VIEW.name().equals(table.getTableType());
}

private boolean isExternalTable(Table table) {
return (table != null && TableType.EXTERNAL_TABLE.name().equals(table.getTableType()))
|| usingExternalTable();
}

private Table newHmsTable(
Identifier identifier, Map<String, String> tableParameters, String provider) {
Identifier identifier,
Map<String, String> tableParameters,
String provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
if (provider == null) {
provider = PAIMON_TABLE_TYPE_VALUE;
}
Expand All @@ -1036,7 +1061,9 @@ private Table newHmsTable(
tableParameters,
null,
null,
tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE");
externalTable
? TableType.EXTERNAL_TABLE.name()
: TableType.MANAGED_TABLE.name());
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
table.getParameters()
Expand All @@ -1045,7 +1072,7 @@ private Table newHmsTable(
table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (CatalogTableType.EXTERNAL.equals(tableType)) {
if (externalTable) {
table.getParameters().put("EXTERNAL", "TRUE");
}
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.hive;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ private Schema toInitialSchema(
String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION);
normalizedProperties.put(CoreOptions.PATH.key(), path);
}

String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
List<String> primaryKeys =
pkAsString == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ case class SparkTable(table: Table)
if (table.comment.isPresent) {
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
if (properties.containsKey(CoreOptions.PATH.key())) {
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
}
properties
case _ => Collections.emptyMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')");
spark.sql(
"CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable");

String tablePath = new Path(warehousePath, "default.db/partitionedTableAs").toString();
Path tablePath = new Path(warehousePath, "default.db/partitionedTableAs");
assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString())
.isEqualTo(
String.format(
Expand Down Expand Up @@ -221,8 +220,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO testTable VALUES(1,'a','b')");
spark.sql(
"CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable");

String testTableAsPath = new Path(warehousePath, "default.db/testTableAs").toString();
tablePath = new Path(warehousePath, "default.db/testTableAs");
assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -234,8 +232,8 @@ public void testCreateTableAs() {
+ "]]",
showCreateString(
"testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"),
testTableAsPath,
testTableAsPath));
tablePath,
tablePath));
List<Row> resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList();

assertThat(resultProp.stream().map(Row::toString))
Expand All @@ -253,8 +251,7 @@ public void testCreateTableAs() {
+ "COMMENT 'table comment'");
spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')");
spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk");

String tPkAsPath = new Path(warehousePath, "default.db/t_pk_as").toString();
tablePath = new Path(warehousePath, "default.db/t_pk_as");
assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -263,8 +260,8 @@ public void testCreateTableAs() {
+ "TBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]",
showCreateString(
"t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"),
tPkAsPath,
tPkAsPath));
tablePath,
tablePath));
List<Row> resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList();

assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]");
Expand All @@ -283,8 +280,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')");
spark.sql(
"CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all");

String tAllAsPath = new Path(warehousePath, "default.db/t_all_as").toString();
tablePath = new Path(warehousePath, "default.db/t_all_as");
assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -302,8 +298,8 @@ public void testCreateTableAs() {
"behavior STRING",
"dt STRING NOT NULL",
"hh STRING NOT NULL"),
tAllAsPath,
tAllAsPath));
tablePath,
tablePath));
List<Row> resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList();
assertThat(resultAll.stream().map(Row::toString))
.containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]");
Expand Down Expand Up @@ -380,8 +376,7 @@ public void testShowCreateTable() {
+ " 'k1' = 'v1'\n"
+ ")");

String tablePath = new Path(warehousePath, "default.db/tbl").toString();

Path tablePath = new Path(warehousePath, "default.db/tbl");
assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString())
.isEqualTo(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark

import org.apache.paimon.Snapshot
import org.apache.paimon.hive.TestHiveMetastore

import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,4 +546,28 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
}

test("Paimon DDL: create and drop external / managed table") {
withTempDir {
tbLocation =>
withTable("external_tbl", "managed_tbl") {
// create external table
val error = intercept[UnsupportedOperationException] {
sql(
s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '${tbLocation.getCanonicalPath}'")
}.getMessage
assert(error.contains("does not support external tables"))

// create managed table
sql("CREATE TABLE managed_tbl (id INT) USING paimon")
val table = loadTable("managed_tbl")
val fileIO = table.fileIO()
val tableLocation = table.location()

// drop managed table
sql("DROP TABLE managed_tbl")
assert(!fileIO.exists(tableLocation))
}
}
}
}
Loading

0 comments on commit fadf650

Please sign in to comment.