Skip to content

Commit

Permalink
[core] Introduce Format table to support csv,orc,parquet tables (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 26, 2024
1 parent 7ef5091 commit c5a950e
Show file tree
Hide file tree
Showing 15 changed files with 993 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
<td>String</td>
<td>Specify client cache key, multiple elements separated by commas.<br /><ul><li>"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.</li></ul><ul><li>"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.</li></ul></td>
</tr>
<tr>
<td><h5>format-table.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.</td>
</tr>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
Expand Down Expand Up @@ -336,7 +337,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
return getDataTable(identifier);
try {
return getDataTable(identifier);
} catch (TableNotExistException e) {
return getFormatTable(identifier);
}
}
}

Expand All @@ -355,6 +360,17 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE
lineageMetaFactory));
}

/**
* Return a {@link FormatTable} identified by the given {@link Identifier}.
*
* @param identifier Path of the table
* @return The requested table
* @throws Catalog.TableNotExistException if the target does not exist
*/
public FormatTable getFormatTable(Identifier identifier) throws Catalog.TableNotExistException {
throw new Catalog.TableNotExistException(identifier);
}

/**
* Get warehouse path for specified database. If a catalog would like to provide individual path
* for each database, this method can be `Override` in that catalog.
Expand Down
317 changes: 317 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/**
* A file format table refers to a directory that contains multiple files of the same format, where
* operations on this table allow for reading or writing to these files, facilitating the retrieval
* of existing data and the addition of new files.
*
* <p>Partitioned file format table just like the standard hive format. Partitions are discovered
* and inferred based on directory structure.
*
* @since 0.9.0
*/
@Public
public interface FormatTable extends Table {

/** Directory location in file system. */
String location();

/** Format of this table. */
Format format();

@Override
FormatTable copy(Map<String, String> dynamicOptions);

/** Currently supported formats. */
enum Format {
ORC,
PARQUET,
CSV
}

/** Create a new builder for {@link FormatTable}. */
static FormatTable.Builder builder() {
return new FormatTable.Builder();
}

/** Builder for {@link FormatTable}. */
class Builder {

private Identifier identifier;
private RowType rowType;
private List<String> partitionKeys;
private String location;
private FormatTable.Format format;
private Map<String, String> options;
@Nullable private String comment;

public FormatTable.Builder identifier(Identifier identifier) {
this.identifier = identifier;
return this;
}

public FormatTable.Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
}

public FormatTable.Builder partitionKeys(List<String> partitionKeys) {
this.partitionKeys = partitionKeys;
return this;
}

public FormatTable.Builder location(String location) {
this.location = location;
return this;
}

public FormatTable.Builder format(FormatTable.Format format) {
this.format = format;
return this;
}

public FormatTable.Builder options(Map<String, String> options) {
this.options = options;
return this;
}

public FormatTable.Builder comment(@Nullable String comment) {
this.comment = comment;
return this;
}

public FormatTable build() {
return new FormatTable.FormatTableImpl(
identifier, rowType, partitionKeys, location, format, options, comment);
}
}

/** An implementation for {@link FormatTable}. */
class FormatTableImpl implements FormatTable {

private final Identifier identifier;
private final RowType rowType;
private final List<String> partitionKeys;
private final String location;
private final Format format;
private final Map<String, String> options;
@Nullable private final String comment;

public FormatTableImpl(
Identifier identifier,
RowType rowType,
List<String> partitionKeys,
String location,
Format format,
Map<String, String> options,
@Nullable String comment) {
this.identifier = identifier;
this.rowType = rowType;
this.partitionKeys = partitionKeys;
this.location = location;
this.format = format;
this.options = options;
this.comment = comment;
}

@Override
public String name() {
return identifier.getTableName();
}

@Override
public String fullName() {
return identifier.getFullName();
}

@Override
public RowType rowType() {
return rowType;
}

@Override
public List<String> partitionKeys() {
return partitionKeys;
}

@Override
public List<String> primaryKeys() {
return Collections.emptyList();
}

@Override
public String location() {
return location;
}

@Override
public Format format() {
return format;
}

@Override
public Map<String, String> options() {
return options;
}

@Override
public Optional<String> comment() {
return Optional.ofNullable(comment);
}

@Override
public FormatTable copy(Map<String, String> dynamicOptions) {
Map<String, String> newOptions = new HashMap<>(options);
newOptions.putAll(dynamicOptions);
return new FormatTableImpl(
identifier, rowType, partitionKeys, location, format, newOptions, comment);
}
}

// ===================== Unsupported ===============================

@Override
default Optional<Statistics> statistics() {
return Optional.empty();
}

@Override
default OptionalLong latestSnapshotId() {
throw new UnsupportedOperationException();
}

@Override
default Snapshot snapshot(long snapshotId) {
throw new UnsupportedOperationException();
}

@Override
default SimpleFileReader<ManifestFileMeta> manifestListReader() {
throw new UnsupportedOperationException();
}

@Override
default SimpleFileReader<ManifestEntry> manifestFileReader() {
throw new UnsupportedOperationException();
}

@Override
default void rollbackTo(long snapshotId) {
throw new UnsupportedOperationException();
}

@Override
default void createTag(String tagName, long fromSnapshotId) {
throw new UnsupportedOperationException();
}

@Override
default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void createTag(String tagName) {
throw new UnsupportedOperationException();
}

@Override
default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException();
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException();
}

@Override
default void rollbackTo(String tagName) {
throw new UnsupportedOperationException();
}

@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException();
}

@Override
default void createBranch(String branchName, String tagName) {
throw new UnsupportedOperationException();
}

@Override
default void deleteBranch(String branchName) {
throw new UnsupportedOperationException();
}

@Override
default void fastForward(String branchName) {
throw new UnsupportedOperationException();
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException();
}

@Override
default ExpireSnapshots newExpireChangelog() {
throw new UnsupportedOperationException();
}

@Override
default ReadBuilder newReadBuilder() {
throw new UnsupportedOperationException();
}

@Override
default BatchWriteBuilder newBatchWriteBuilder() {
throw new UnsupportedOperationException();
}

@Override
default StreamWriteBuilder newStreamWriteBuilder() {
throw new UnsupportedOperationException();
}
}
Loading

0 comments on commit c5a950e

Please sign in to comment.