Skip to content

Commit

Permalink
[hive] Enable Format Table by default (#4461)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 7, 2024
1 parent da3e795 commit 4cdaf66
Show file tree
Hide file tree
Showing 20 changed files with 179 additions and 122 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Integer</td>
<td>Configure the size of the connection pool.</td>
</tr>
<tr>
<td><h5>format-table.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.</td>
</tr>
<tr>
<td><h5>lineage-meta</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
<td>String</td>
<td>Specify client cache key, multiple elements separated by commas.<br /><ul><li>"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.</li></ul><ul><li>"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.</li></ul></td>
</tr>
<tr>
<td><h5>format-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.</td>
</tr>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@ public class CatalogOptions {
.booleanType()
.defaultValue(false)
.withDescription("Sync all table properties to hive metastore");

public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
ConfigOptions.key("format-table.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. "
+ "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ " the metastore and need to be manually added as separate partition operations.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
public class FormatTableOptions {

public static final ConfigOption<String> FIELD_DELIMITER =
ConfigOptions.key("csv.field-delimiter")
ConfigOptions.key("field-delimiter")
.stringType()
.defaultValue(",")
.withDescription("Optional field delimiter character (',' by default)");
.withDescription(
"Optional field delimiter character for CSV (',' by default).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -885,4 +886,50 @@ public void testView() throws Exception {
assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
.isInstanceOf(Catalog.ViewNotExistException.class);
}

protected boolean supportsFormatTable() {
return false;
}

@Test
public void testFormatTable() throws Exception {
if (!supportsFormatTable()) {
return;
}

Identifier identifier = new Identifier("format_db", "my_format");
catalog.createDatabase(identifier.getDatabaseName(), false);

// create table
Schema schema =
Schema.newBuilder()
.column("str", DataTypes.STRING())
.column("int", DataTypes.INT())
.option("type", "format-table")
.option("file.format", "csv")
.build();
catalog.createTable(identifier, schema, false);
assertThat(catalog.listTables(identifier.getDatabaseName()))
.contains(identifier.getTableName());
assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class);

// alter table
SchemaChange schemaChange = SchemaChange.addColumn("new_col", DataTypes.STRING());
assertThatThrownBy(() -> catalog.alterTable(identifier, schemaChange, false))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Only data table support alter table.");

// drop table
catalog.dropTable(identifier, false);
assertThatThrownBy(() -> catalog.getTable(identifier))
.isInstanceOf(Catalog.TableNotExistException.class);

// rename table
catalog.createTable(identifier, schema, false);
Identifier newIdentifier = new Identifier("format_db", "new_format");
catalog.renameTable(identifier, newIdentifier, false);
assertThatThrownBy(() -> catalog.getTable(identifier))
.isInstanceOf(Catalog.TableNotExistException.class);
assertThat(catalog.getTable(newIdentifier)).isInstanceOf(FormatTable.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ public void alterTable(
throw new TableNotExistException(getName(), tablePath);
}

Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table.");
checkArgument(
table instanceof FileStoreTable,
"Only support alter data table, but is: " + table.getClass());
validateAlterTable(toCatalogTable(table), newTable);
Map<String, Integer> oldTableNonPhysicalColumnIndex =
FlinkCatalogPropertiesUtil.nonPhysicalColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static FlinkGenericCatalog createCatalog(
ClassLoader cl, Map<String, String> optionMap, String name, Catalog flinkCatalog) {
Options options = Options.fromMap(optionMap);
options.set(CatalogOptions.METASTORE, "hive");
options.set(CatalogOptions.FORMAT_TABLE_ENABLED, false);
FlinkCatalog paimon =
new FlinkCatalog(
org.apache.paimon.catalog.CatalogFactory.createCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.table.FormatTable;

import org.apache.flink.connector.file.table.FileSystemTableFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand All @@ -30,17 +29,16 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;

/** A {@link CatalogTable} to represent format table. */
public class FormatCatalogTable implements CatalogTable {
Expand Down Expand Up @@ -83,18 +81,17 @@ public CatalogTable copy(Map<String, String> map) {
public Map<String, String> getOptions() {
if (cachedOptions == null) {
cachedOptions = new HashMap<>();
FileSystemTableFactory fileSystemFactory = new FileSystemTableFactory();
Set<String> validOptions = new HashSet<>();
fileSystemFactory.requiredOptions().forEach(o -> validOptions.add(o.key()));
fileSystemFactory.optionalOptions().forEach(o -> validOptions.add(o.key()));
String format = table.format().name().toLowerCase();
table.options()
.forEach(
(k, v) -> {
if (validOptions.contains(k) || k.startsWith(format + ".")) {
cachedOptions.put(k, v);
}
});
Map<String, String> options = table.options();
options.forEach(
(k, v) -> {
if (k.startsWith(format + ".")) {
cachedOptions.put(k, v);
}
});
if (options.containsKey(FIELD_DELIMITER.key())) {
cachedOptions.put("csv.field-delimiter", options.get(FIELD_DELIMITER.key()));
}
cachedOptions.put(CONNECTOR.key(), "filesystem");
cachedOptions.put(PATH.key(), table.location());
cachedOptions.put(FORMAT.key(), format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ public void testCreateSystemDatabase() {
public void testChangeTableInSystemDatabase() {
sql("USE sys");
assertThatCode(() -> sql("ALTER TABLE all_table_options SET ('bucket-num' = '5')"))
.hasRootCauseMessage("Can't alter system table.");
.rootCause()
.hasMessageContaining("Only support alter data table, but is: ");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.paimon.table.CatalogTableType;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -97,20 +98,21 @@
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

/** A catalog implementation for Hive. */
Expand Down Expand Up @@ -172,8 +174,8 @@ public HiveCatalog(
this.clients = new CachedClientPool(hiveConf, options, clientClassName);
}

private boolean formatTableEnabled() {
return options.get(FORMAT_TABLE_ENABLED);
private boolean formatTableDisabled() {
return !options.get(FORMAT_TABLE_ENABLED);
}

@Override
Expand Down Expand Up @@ -607,7 +609,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier)
} catch (TableNotExistException ignore) {
}

if (!formatTableEnabled()) {
if (formatTableDisabled()) {
throw new TableNotExistException(identifier);
}

Expand All @@ -620,7 +622,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier)

@Override
public void createFormatTable(Identifier identifier, Schema schema) {
if (!formatTableEnabled()) {
if (formatTableDisabled()) {
throw new UnsupportedOperationException(
"Format table is not enabled for " + identifier);
}
Expand All @@ -641,7 +643,7 @@ public void createFormatTable(Identifier identifier, Schema schema) {
schema.comment());
try {
Path location = getTableLocation(identifier, null);
Table hiveTable = createHiveTable(identifier, newSchema, location);
Table hiveTable = createHiveFormatTable(identifier, newSchema, location);
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db and fs if failed.
Expand Down Expand Up @@ -727,12 +729,10 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
}

private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) {
checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE);

Map<String, String> tblProperties;
String provider = PAIMON_TABLE_TYPE_VALUE;
if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) {
provider = tableSchema.options().get(FILE_FORMAT.key());
}
if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) {
if (syncAllProperties()) {
tblProperties = new HashMap<>(tableSchema.options());

// add primary-key, partition-key to tblproperties
Expand All @@ -748,8 +748,32 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa
}
}

Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);

String provider = tableSchema.options().get(FILE_FORMAT.key());
checkNotNull(provider, FILE_FORMAT.key() + " should be configured.");
// valid supported format
FormatTable.Format.valueOf(provider.toUpperCase());

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
table.getSd()
.getSerdeInfo()
.getParameters()
.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
}
return table;
}

Expand Down Expand Up @@ -796,6 +820,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
Table table = getHmsTable(identifier);
if (!isPaimonTable(identifier, table)) {
throw new UnsupportedOperationException("Only data table support alter table.");
}

final SchemaManager schemaManager = schemaManager(identifier, getTableLocation(identifier));
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);
Expand All @@ -805,12 +834,6 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
return;
}
try {
Table table =
clients.run(
client ->
client.getTable(
identifier.getDatabaseName(),
identifier.getTableName()));
alterTableToHms(table, identifier, schema);
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,5 @@ public final class HiveCatalogOptions {
+ "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified."))
.build());

public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
ConfigOptions.key("format-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. "
+ "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ " the metastore and need to be manually added as separate partition operations.");

private HiveCatalogOptions() {}
}
Loading

0 comments on commit 4cdaf66

Please sign in to comment.