Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hive][spark] Support create table with specified location using hive catalog #3843

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

protected boolean allowCustomTablePath() {
return false;
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand Down Expand Up @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
validateCustomTablePath(schema.options());

// check db exists
getDatabase(identifier.getDatabaseName());
Expand Down Expand Up @@ -590,6 +595,15 @@ private void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateCustomTablePath(Map<String, String> options) {
if (!allowCustomTablePath() && options.containsKey(CoreOptions.PATH.key())) {
throw new UnsupportedOperationException(
String.format(
"The current catalog %s does not support specifying the table path when creating a table.",
this.getClass().getSimpleName()));
}
}

// =============================== Meta in File System =====================================

protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,4 @@ public Schema build() {
return new Schema(columns, partitionKeys, primaryKeys, options, comment);
}
}

public static Schema fromTableSchema(TableSchema tableSchema) {
return new Schema(
tableSchema.fields(),
tableSchema.partitionKeys(),
tableSchema.primaryKeys(),
tableSchema.options(),
tableSchema.comment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -37,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Pick the files to be cloned of a table based on the input record. The record type it produce is
Expand Down Expand Up @@ -77,7 +84,7 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
targetCatalog.createTable(
targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true);
targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true);

List<CloneFileInfo> result =
toCloneFileInfos(
Expand All @@ -95,6 +102,18 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
}
}

private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
return new Schema(
ImmutableList.copyOf(tableSchema.fields()),
ImmutableList.copyOf(tableSchema.partitionKeys()),
ImmutableList.copyOf(tableSchema.primaryKeys()),
ImmutableMap.copyOf(
Iterables.filter(
tableSchema.options().entrySet(),
entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))),
tableSchema.comment());
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
}

private List<CloneFileInfo> toCloneFileInfos(
List<Path> files,
Path sourceTableRoot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
Expand Down Expand Up @@ -83,7 +84,6 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -210,6 +210,20 @@ public Path getTableLocation(Identifier identifier) {
return getTableLocation(identifier, table);
}

private Pair<Path, Boolean> initialTableLocation(
Map<String, String> tableOptions, Identifier identifier) {
boolean externalTable;
Path location;
if (tableOptions.containsKey(CoreOptions.PATH.key())) {
externalTable = true;
location = new Path(tableOptions.get(CoreOptions.PATH.key()));
} else {
externalTable = usingExternalTable();
location = getTableLocation(identifier, null);
}
return Pair.of(location, externalTable);
}

private Path getTableLocation(Identifier identifier, @Nullable Table table) {
try {
String databaseName = identifier.getDatabaseName();
Expand Down Expand Up @@ -634,8 +648,10 @@ public void createFormatTable(Identifier identifier, Schema schema) {
options,
schema.comment());
try {
Path location = getTableLocation(identifier, null);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location);
Pair<Path, Boolean> pair = initialTableLocation(schema.options(), identifier);
Path location = pair.getLeft();
boolean externalTable = pair.getRight();
Table hiveTable = createHiveFormatTable(identifier, newSchema, location, externalTable);
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db and fs if failed.
Expand All @@ -654,18 +670,19 @@ private boolean usingExternalTable() {
@Override
protected void dropTableImpl(Identifier identifier) {
try {
boolean externalTable = isExternalTable(getHmsTable(identifier));
clients.execute(
client ->
client.dropTable(
identifier.getDatabaseName(),
identifier.getTableName(),
true,
!externalTable,
false,
true));

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
if (usingExternalTable()) {
if (externalTable) {
return;
}

Expand All @@ -680,7 +697,7 @@ protected void dropTableImpl(Identifier identifier) {
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee);
}
} catch (TException e) {
} catch (TException | TableNotExistException e) {
throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -691,13 +708,12 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
Path location = getTableLocation(identifier, null);
Pair<Path, Boolean> pair = initialTableLocation(schema.options(), identifier);
Path location = pair.getLeft();
boolean externalTable = pair.getRight();
TableSchema tableSchema;
try {
tableSchema =
schemaManager(identifier, location).createTable(schema, usingExternalTable());
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand All @@ -709,7 +725,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
clients.execute(
client ->
client.createTable(createHiveTable(identifier, tableSchema, location)));
client.createTable(
createHiveTable(
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
Expand All @@ -720,7 +738,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}
}

private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) {
private Table createHiveTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
Expand All @@ -740,13 +759,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa
}
}

Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE);
Table table =
newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location) {
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);

Expand All @@ -757,7 +777,7 @@ private Table createHiveFormatTable(

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider);
Table table = newHmsTable(identifier, tblProperties, provider, externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
Expand Down Expand Up @@ -865,6 +885,11 @@ public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
}

@Override
protected boolean allowCustomTablePath() {
return true;
}

public boolean syncAllProperties() {
return catalogOptions.get(SYNC_ALL_PROPERTIES);
}
Expand Down Expand Up @@ -921,10 +946,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
TableSchema tableSchema =
tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema, location);

try {
Table newTable = null;
try {
Table table = getHmsTable(identifier);
newTable =
createHiveTable(identifier, tableSchema, location, isExternalTable(table));
checkArgument(
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
Expand All @@ -935,7 +963,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
}
} catch (TableNotExistException e) {
// hive table does not exist.
clients.execute(client -> client.createTable(newTable));
if (newTable == null) {
newTable =
createHiveTable(
identifier, tableSchema, location, usingExternalTable());
}
Table finalNewTable = newTable;
clients.execute(client -> client.createTable(finalNewTable));
}

// repair partitions
Expand Down Expand Up @@ -1012,13 +1046,16 @@ public static boolean isView(Table table) {
return table != null && TableType.VIRTUAL_VIEW.name().equals(table.getTableType());
}

private boolean isExternalTable(Table table) {
return table != null && TableType.EXTERNAL_TABLE.name().equals(table.getTableType());
}

private Table newHmsTable(
Identifier identifier, Map<String, String> tableParameters, String provider) {
Identifier identifier,
Map<String, String> tableParameters,
String provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
if (provider == null) {
provider = PAIMON_TABLE_TYPE_VALUE;
}
Expand All @@ -1036,7 +1073,9 @@ private Table newHmsTable(
tableParameters,
null,
null,
tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE");
externalTable
? TableType.EXTERNAL_TABLE.name()
: TableType.MANAGED_TABLE.name());
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
table.getParameters()
Expand All @@ -1045,7 +1084,7 @@ private Table newHmsTable(
table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (CatalogTableType.EXTERNAL.equals(tableType)) {
if (externalTable) {
table.getParameters().put("EXTERNAL", "TRUE");
}
return table;
Expand Down
Loading
Loading