Skip to content

Commit

Permalink
Support create table with user defined location.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang committed Aug 4, 2024
1 parent f216ceb commit 7c5491b
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
Expand Down Expand Up @@ -117,6 +119,9 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
checkArgument(
!schema.options().containsKey(CoreOptions.PATH.key()),
"The FileSystemCatalog does not support specifying location when creating a table.");
uncheck(() -> schemaManager(identifier).createTable(schema));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogLockContext;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -235,6 +237,9 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
checkArgument(
!schema.options().containsKey(CoreOptions.PATH.key()),
"The FileSystemCatalog does not support specifying location when creating a table.");
try {
// create table file
getSchemaManager(identifier).createTable(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -454,7 +455,18 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
// if changes on Hive fails there is no harm to perform the same changes to files again
TableSchema tableSchema;
try {
tableSchema = schemaManager(identifier).createTable(schema, usingExternalTable());
Path tableRoot;
if (schema.options().containsKey(CoreOptions.PATH.key())) {
checkArgument(
Objects.equals(createTableType(), TableType.EXTERNAL),
"The HiveCatalog only supports specifying location when creating an external table");
tableRoot = new Path(schema.options().get(CoreOptions.PATH.key()));
} else {
tableRoot = getDataTableLocation(identifier);
}

tableSchema =
schemaManager(identifier, tableRoot).createTable(schema, usingExternalTable());
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand Down Expand Up @@ -669,10 +681,7 @@ public String warehouse() {

private Table newHmsTable(Identifier identifier, Map<String, String> tableParameters) {
long currentTimeMillis = System.currentTimeMillis();
TableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()),
TableType.class);
TableType tableType = createTableType();
Table table =
new Table(
identifier.getTableName(),
Expand All @@ -697,6 +706,11 @@ private Table newHmsTable(Identifier identifier, Map<String, String> tableParame
return table;
}

private TableType createTableType() {
return OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), TableType.class);
}

private void updateHmsTable(Table table, Identifier identifier, TableSchema schema) {
StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor();

Expand Down Expand Up @@ -754,7 +768,11 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche
}

// update location
locationHelper.specifyTableLocation(table, getDataTableLocation(identifier).toString());
String location =
schema.options().containsKey(CoreOptions.PATH.key())
? schema.options().get(CoreOptions.PATH.key())
: getDataTableLocation(identifier).toString();
locationHelper.specifyTableLocation(table, location);
}

private void updateHmsTablePars(Table table, TableSchema schema) {
Expand All @@ -778,6 +796,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
}

private SchemaManager schemaManager(Identifier identifier) {
return schemaManager(identifier, getDataTableLocation(identifier));
}

private SchemaManager schemaManager(Identifier identifier, Path path) {
return new SchemaManager(
fileIO,
getDataTableLocation(identifier),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -29,6 +31,7 @@
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.HadoopUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -37,6 +40,7 @@
import org.apache.thrift.TException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.lang.reflect.Field;
import java.util.Arrays;
Expand Down Expand Up @@ -268,4 +272,36 @@ public void testAlterHiveTableParameters() {
fail("Test failed due to exception: " + e.getMessage());
}
}

@Test
public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path tempDir)
throws Exception {
HiveConf hiveConf = new HiveConf();
String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID();
hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true");
hiveConf.set(CatalogOptions.TABLE_TYPE.key(), "external");
String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
HiveCatalog externalWarehouseCatalog =
new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse);

String externalTablePath = tempDir.toString();

Schema schema =
new Schema(
Lists.newArrayList(new DataField(0, "foo", DataTypes.INT())),
Collections.emptyList(),
Collections.emptyList(),
ImmutableMap.of("path", externalTablePath),
"");

Identifier identifier = Identifier.create("default", "my_table");
externalWarehouseCatalog.createTable(identifier, schema, true);

org.apache.paimon.table.Table table = externalWarehouseCatalog.getTable(identifier);
assertThat(table.options())
.extracting(CoreOptions.PATH.key())
.isEqualTo("file:" + externalTablePath);

externalWarehouseCatalog.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ private Schema toInitialSchema(
Map<String, String> normalizedProperties = mergeSQLConf(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION);
normalizedProperties.put(CoreOptions.PATH.key(), path);
}

String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
List<String> primaryKeys =
pkAsString == null
Expand Down

0 comments on commit 7c5491b

Please sign in to comment.