Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 20, 2024
1 parent 84914af commit f37017e
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

protected boolean externalTableEnabled() {
return false;
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand Down Expand Up @@ -249,20 +253,20 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");

Table table;
try {
getTable(identifier);
table = getTable(identifier);
} catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
}

dropTableImpl(identifier);
dropTableImpl(identifier, table.options());
}

protected abstract void dropTableImpl(Identifier identifier);
protected abstract void dropTableImpl(Identifier identifier, Map<String, String> options);

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
Expand All @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
validateExternalTableSupport(schema.options());

// check db exists
getDatabase(identifier.getDatabaseName());
Expand Down Expand Up @@ -590,6 +595,16 @@ private void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateExternalTableSupport(Map<String, String> options) {
if (options.getOrDefault(Catalog.EXTERNAL_PROP, "false").equals("true")
&& (options.containsKey(CoreOptions.PATH.key()) && !externalTableEnabled())) {
throw new UnsupportedOperationException(
String.format(
"The current catalog %s does not support external tables, so specifying the path is not allowed when creating a table.",
this.getClass().getSimpleName()));
}
}

// =============================== Meta in File System =====================================

protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public interface Catalog extends AutoCloseable {

String COMMENT_PROP = "comment";
String OWNER_PROP = "owner";
String EXTERNAL_PROP = "external";
String DB_LOCATION_PROP = "location";
String NUM_ROWS_PROP = "numRows";
String NUM_FILES_PROP = "numFiles";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
Path path = getTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected List<String> listTablesImpl(String databaseName) {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
try {
int deletedRecords =
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -210,6 +209,12 @@ public Path getTableLocation(Identifier identifier) {
return getTableLocation(identifier, table);
}

private Path initialTableLocation(Map<String, String> tableOptions, Identifier identifier) {
return tableOptions.containsKey(CoreOptions.PATH.key())
? new Path(tableOptions.get(CoreOptions.PATH.key()))
: getTableLocation(identifier, null);
}

private Path getTableLocation(Identifier identifier, @Nullable Table table) {
try {
String databaseName = identifier.getDatabaseName();
Expand Down Expand Up @@ -634,16 +639,23 @@ public void createFormatTable(Identifier identifier, Schema schema) {
options,
schema.comment());
try {
Path location = getTableLocation(identifier, null);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location);
Map<String, String> tableOptions = schema.options();
Path location = initialTableLocation(tableOptions, identifier);
Table hiveTable =
createHiveFormatTable(
identifier, newSchema, location, usingExternalTable(tableOptions));
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db and fs if failed.
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
}
}

private boolean usingExternalTable() {
private boolean usingExternalTable(Map<String, String> tableOptions) {
if (tableOptions.containsKey(Catalog.EXTERNAL_PROP)) {
return tableOptions.get(Catalog.EXTERNAL_PROP).equals("true");
}

CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
Expand All @@ -652,20 +664,21 @@ private boolean usingExternalTable() {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, Map<String, String> options) {
try {
boolean externalTable = usingExternalTable(options);
clients.execute(
client ->
client.dropTable(
identifier.getDatabaseName(),
identifier.getTableName(),
true,
!externalTable,
false,
true));

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
if (usingExternalTable()) {
if (externalTable) {
return;
}

Expand All @@ -691,13 +704,12 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
Path location = getTableLocation(identifier, null);
Map<String, String> tableOptions = schema.options();
Path location = initialTableLocation(tableOptions, identifier);
TableSchema tableSchema;
boolean externalTable = usingExternalTable(tableOptions);
try {
tableSchema =
schemaManager(identifier, location).createTable(schema, usingExternalTable());
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand All @@ -709,7 +721,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
clients.execute(
client ->
client.createTable(createHiveTable(identifier, tableSchema, location)));
client.createTable(
createHiveTable(
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
Expand All @@ -720,7 +734,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}
}

private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) {
private Table createHiveTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
Expand All @@ -740,13 +755,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa
}
}

Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE);
Table table =
newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location) {
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);

Expand All @@ -757,7 +773,7 @@ private Table createHiveFormatTable(

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider);
Table table = newHmsTable(identifier, tblProperties, provider, externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
Expand Down Expand Up @@ -865,6 +881,11 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
}

@Override
protected boolean externalTableEnabled() {
return true;
}

public boolean syncAllProperties() {
return catalogOptions.get(SYNC_ALL_PROPERTIES);
}
Expand Down Expand Up @@ -921,7 +942,12 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
TableSchema tableSchema =
tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema, location);
Table newTable =
createHiveTable(
identifier,
tableSchema,
location,
usingExternalTable(tableSchema.options()));
try {
try {
Table table = getHmsTable(identifier);
Expand Down Expand Up @@ -1013,12 +1039,11 @@ public static boolean isView(Table table) {
}

private Table newHmsTable(
Identifier identifier, Map<String, String> tableParameters, String provider) {
Identifier identifier,
Map<String, String> tableParameters,
String provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
if (provider == null) {
provider = PAIMON_TABLE_TYPE_VALUE;
}
Expand All @@ -1036,7 +1061,7 @@ private Table newHmsTable(
tableParameters,
null,
null,
tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE");
externalTable ? "EXTERNAL_TABLE" : "MANAGED_TABLE");
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
table.getParameters()
Expand All @@ -1045,7 +1070,7 @@ private Table newHmsTable(
table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (CatalogTableType.EXTERNAL.equals(tableType)) {
if (externalTable) {
table.getParameters().put("EXTERNAL", "TRUE");
}
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.hive;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ private Schema toInitialSchema(
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION);
normalizedProperties.put(CoreOptions.PATH.key(), path);
// For v2 table, as long as it has specified the location, treat it as external
normalizedProperties.put(Catalog.EXTERNAL_PROP, "true");
}

String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
List<String> primaryKeys =
pkAsString == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ case class SparkTable(table: Table)
if (table.comment.isPresent) {
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
if (properties.containsKey(CoreOptions.PATH.key())) {
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
}
properties
case _ => Collections.emptyMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')");
spark.sql(
"CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable");

String tablePath = new Path(warehousePath, "default.db/partitionedTableAs").toString();
Path tablePath = new Path(warehousePath, "default.db/partitionedTableAs");
assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString())
.isEqualTo(
String.format(
Expand Down Expand Up @@ -221,8 +220,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO testTable VALUES(1,'a','b')");
spark.sql(
"CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable");

String testTableAsPath = new Path(warehousePath, "default.db/testTableAs").toString();
tablePath = new Path(warehousePath, "default.db/testTableAs");
assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -234,8 +232,8 @@ public void testCreateTableAs() {
+ "]]",
showCreateString(
"testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"),
testTableAsPath,
testTableAsPath));
tablePath,
tablePath));
List<Row> resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList();

assertThat(resultProp.stream().map(Row::toString))
Expand All @@ -253,8 +251,7 @@ public void testCreateTableAs() {
+ "COMMENT 'table comment'");
spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')");
spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk");

String tPkAsPath = new Path(warehousePath, "default.db/t_pk_as").toString();
tablePath = new Path(warehousePath, "default.db/t_pk_as");
assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -263,8 +260,8 @@ public void testCreateTableAs() {
+ "TBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]",
showCreateString(
"t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"),
tPkAsPath,
tPkAsPath));
tablePath,
tablePath));
List<Row> resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList();

assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]");
Expand All @@ -283,8 +280,7 @@ public void testCreateTableAs() {
spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')");
spark.sql(
"CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all");

String tAllAsPath = new Path(warehousePath, "default.db/t_all_as").toString();
tablePath = new Path(warehousePath, "default.db/t_all_as");
assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString())
.isEqualTo(
String.format(
Expand All @@ -302,8 +298,8 @@ public void testCreateTableAs() {
"behavior STRING",
"dt STRING NOT NULL",
"hh STRING NOT NULL"),
tAllAsPath,
tAllAsPath));
tablePath,
tablePath));
List<Row> resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList();
assertThat(resultAll.stream().map(Row::toString))
.containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]");
Expand Down Expand Up @@ -380,8 +376,7 @@ public void testShowCreateTable() {
+ " 'k1' = 'v1'\n"
+ ")");

String tablePath = new Path(warehousePath, "default.db/tbl").toString();

Path tablePath = new Path(warehousePath, "default.db/tbl");
assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString())
.isEqualTo(
String.format(
Expand Down
Loading

0 comments on commit f37017e

Please sign in to comment.