Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 19, 2024
1 parent 06fbb5e commit aa64890
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

protected boolean externalTableEnabled() {
return false;
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand Down Expand Up @@ -249,20 +253,20 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");

Table table;
try {
getTable(identifier);
table = getTable(identifier);
} catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
}

dropTableImpl(identifier);
dropTableImpl(identifier, table.options());
}

protected abstract void dropTableImpl(Identifier identifier);
protected abstract void dropTableImpl(Identifier identifier, Map<String, String> options);

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
Expand All @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
validateExternalTableSupport(schema.options());

// check db exists
getDatabase(identifier.getDatabaseName());
Expand Down Expand Up @@ -590,6 +595,15 @@ private void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateExternalTableSupport(Map<String, String> options) {
if (options.containsKey(CoreOptions.PATH.key()) && !externalTableEnabled()) {
throw new UnsupportedOperationException(
String.format(
"The current catalog %s does not support external tables, so specifying the path is not allowed when creating a table.",
this.getClass().getSimpleName()));
}
}

// =============================== Meta in File System =====================================

protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public interface Catalog extends AutoCloseable {

String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";
String EXTERNAL_PROP = "external";
String DB_LOCATION_PROP = "location";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
Path path = getTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected List<String> listTablesImpl(String databaseName) {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
try {
int deletedRecords =
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -210,6 +209,12 @@ public Path getTableLocation(Identifier identifier) {
return getTableLocation(identifier, table);
}

private Path initialTableLocation(Map<String, String> tableOptions, Identifier identifier) {
return tableOptions.containsKey(CoreOptions.PATH.key())
? new Path(tableOptions.get(CoreOptions.PATH.key()))
: getTableLocation(identifier, null);
}

private Path getTableLocation(Identifier identifier, @Nullable Table table) {
try {
String databaseName = identifier.getDatabaseName();
Expand Down Expand Up @@ -634,16 +639,23 @@ public void createFormatTable(Identifier identifier, Schema schema) {
options,
schema.comment());
try {
Path location = getTableLocation(identifier, null);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location);
Map<String, String> tableOptions = schema.options();
Path location = initialTableLocation(tableOptions, identifier);
Table hiveTable =
createHiveFormatTable(
identifier, newSchema, location, usingExternalTable(tableOptions));
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db and fs if failed.
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
}
}

private boolean usingExternalTable() {
private boolean usingExternalTable(Map<String, String> tableOptions) {
if (tableOptions.containsKey(Catalog.EXTERNAL_PROP)) {
return tableOptions.get(Catalog.EXTERNAL_PROP).equals("true");
}

CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
Expand All @@ -652,20 +664,21 @@ private boolean usingExternalTable() {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
try {
boolean externalTable = usingExternalTable(options);
clients.execute(
client ->
client.dropTable(
identifier.getDatabaseName(),
identifier.getTableName(),
true,
!externalTable,
false,
true));

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
if (usingExternalTable()) {
if (externalTable) {
return;
}

Expand All @@ -691,13 +704,12 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
Path location = getTableLocation(identifier, null);
Map<String, String> tableOptions = schema.options();
Path location = initialTableLocation(tableOptions, identifier);
TableSchema tableSchema;
boolean externalTable = usingExternalTable(tableOptions);
try {
tableSchema =
schemaManager(identifier, location).createTable(schema, usingExternalTable());
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand All @@ -709,7 +721,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
clients.execute(
client ->
client.createTable(createHiveTable(identifier, tableSchema, location)));
client.createTable(
createHiveTable(
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
Expand All @@ -720,7 +734,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}
}

private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) {
private Table createHiveTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
Expand All @@ -740,13 +755,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa
}
}

Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE);
Table table =
newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location) {
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);

Expand All @@ -757,7 +773,7 @@ private Table createHiveFormatTable(

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider);
Table table = newHmsTable(identifier, tblProperties, provider, externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
Expand Down Expand Up @@ -865,6 +881,11 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
}

@Override
protected boolean externalTableEnabled() {
return true;
}

public boolean syncAllProperties() {
return catalogOptions.get(SYNC_ALL_PROPERTIES);
}
Expand Down Expand Up @@ -921,7 +942,12 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
TableSchema tableSchema =
tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema, location);
Table newTable =
createHiveTable(
identifier,
tableSchema,
location,
usingExternalTable(tableSchema.options()));
try {
try {
Table table = getHmsTable(identifier);
Expand Down Expand Up @@ -1013,12 +1039,11 @@ public static boolean isView(Table table) {
}

private Table newHmsTable(
Identifier identifier, Map<String, String> tableParameters, String provider) {
Identifier identifier,
Map<String, String> tableParameters,
String provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
if (provider == null) {
provider = PAIMON_TABLE_TYPE_VALUE;
}
Expand All @@ -1036,7 +1061,7 @@ private Table newHmsTable(
tableParameters,
null,
null,
tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE");
externalTable ? "EXTERNAL_TABLE" : "MANAGED_TABLE");
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
table.getParameters()
Expand All @@ -1045,7 +1070,7 @@ private Table newHmsTable(
table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (CatalogTableType.EXTERNAL.equals(tableType)) {
if (externalTable) {
table.getParameters().put("EXTERNAL", "TRUE");
}
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,13 @@ private Schema toInitialSchema(
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
normalizedProperties.put(
CoreOptions.PATH.key(), normalizedProperties.get(TableCatalog.PROP_LOCATION));
normalizedProperties.remove(TableCatalog.PROP_LOCATION);
// As long as the table has location, treat it as external
normalizedProperties.put(Catalog.EXTERNAL_PROP, "true");
}
List<String> primaryKeys =
pkAsString == null
? Collections.emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ case class SparkTable(table: Table)
if (table.comment.isPresent) {
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
if (properties.containsKey(CoreOptions.PATH.key())) {
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
}
properties
case _ => Collections.emptyMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark

import org.apache.paimon.Snapshot
import org.apache.paimon.hive.TestHiveMetastore

import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,4 +546,28 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
}

test("Paimon DDL: create and drop external / managed table") {
withTempDir {
tbLocation =>
withTable("external_tbl", "managed_tbl") {
// create external table
val error = intercept[UnsupportedOperationException] {
sql(
s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '${tbLocation.getCanonicalPath}'")
}.getMessage
assert(error.contains("does not support external tables"))

// create managed table
sql("CREATE TABLE managed_tbl (id INT) USING paimon")
val table = loadTable("managed_tbl")
val fileIO = table.fileIO()
val tableLocation = table.location()

// drop managed table
sql("DROP TABLE managed_tbl")
assert(!fileIO.exists(tableLocation))
}
}
}
}
Loading

0 comments on commit aa64890

Please sign in to comment.