Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support Table Multi-Location Management #4760

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1008,5 +1008,11 @@
<td>Integer</td>
<td>The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.</td>
</tr>
<tr>
<td><h5>data-file.external-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The path where the data of this table is currently written.</td>
</tr>
</tbody>
</table>
52 changes: 44 additions & 8 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,32 @@ public class CoreOptions implements Serializable {
+ "if there is no primary key, the full row will be used.")
.build());

public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATH =
key("data-file.external-path")
.stringType()
.noDefaultValue()
.withDescription("The path where the data of this table is currently written.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> PATH =
public static final ConfigOption<String> TABLE_SCHEMA_PATH =
key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");
.withDescription(
"The schema file path of this table in the filesystem. if "
+ DATA_FILE_EXTERNAL_PATH.key()
+ "is not set, the data file path will be the same as the schema file path.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> TABLE_DATA_PATH =
key("table.data.path")
.stringType()
.noDefaultValue()
.withDescription(
"The data file path of this table in the filesystem. if "
+ DATA_FILE_EXTERNAL_PATH.key()
+ "is not set, it will be same with."
+ TABLE_SCHEMA_PATH.key());

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
Expand Down Expand Up @@ -1548,8 +1568,12 @@ public int bucket() {
return options.get(BUCKET);
}

public Path path() {
return path(options.toMap());
public Path schemaPath() {
return schemaPath(options.toMap());
}

public Path dataPath() {
return dataPath(options.toMap());
}

public String branch() {
Expand All @@ -1563,12 +1587,20 @@ public static String branch(Map<String, String> options) {
return BRANCH.defaultValue();
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
public static Path schemaPath(Map<String, String> options) {
return new Path(options.get(TABLE_SCHEMA_PATH.key()));
}

public static Path schemaPath(Options options) {
return new Path(options.get(TABLE_SCHEMA_PATH));
}

public static Path path(Options options) {
return new Path(options.get(PATH));
public static Path dataPath(Map<String, String> options) {
return new Path(options.get(TABLE_DATA_PATH.key()));
}

public static Path dataPath(Options options) {
return new Path(options.get(TABLE_DATA_PATH));
}

public TableType type() {
Expand Down Expand Up @@ -2360,6 +2392,10 @@ public boolean asyncFileWrite() {
return options.get(ASYNC_FILE_WRITE);
}

public String getDataFileExternalPath() {
return options.get(DATA_FILE_EXTERNAL_PATH);
}

public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}
Expand Down
140 changes: 140 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.options.Options;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A hybrid implementation of {@link FileIO} that supports multiple file system schemas. It
* dynamically selects the appropriate {@link FileIO} based on the URI scheme of the given path.
*/
public class HybridFileIO implements FileIO {

private static final long serialVersionUID = 1L;

protected Options options;

private Map<String, FileIO> fileIOMap;
private volatile FileIO fallbackFileIO;

@Override
public boolean isObjectStore() {
if (options.get(CoreOptions.DATA_FILE_EXTERNAL_PATH) != null
&& ((options.get(CoreOptions.DATA_FILE_EXTERNAL_PATH).startsWith("oss://")
|| (options.get(CoreOptions.DATA_FILE_EXTERNAL_PATH)
.startsWith("s3://"))))) {
return true;
}
return false;
}

@Override
public void configure(CatalogContext context) {
this.options = context.options();
this.fileIOMap = new ConcurrentHashMap<>();
}

@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
return wrap(() -> fileIO(path).newInputStream(path));
}

@Override
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
return wrap(() -> fileIO(path).newOutputStream(path, overwrite));
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return wrap(() -> fileIO(path).getFileStatus(path));
}

@Override
public FileStatus[] listStatus(Path path) throws IOException {
return wrap(() -> fileIO(path).listStatus(path));
}

@Override
public boolean exists(Path path) throws IOException {
return wrap(() -> fileIO(path).exists(path));
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
return wrap(() -> fileIO(path).delete(path, recursive));
}

@Override
public boolean mkdirs(Path path) throws IOException {
return wrap(() -> fileIO(path).mkdirs(path));
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
return wrap(() -> fileIO(src).rename(src, dst));
}

private FileIO fileIO(Path path) throws IOException {
String schema = path.toUri().getScheme();
if (schema == null) {
if (fallbackFileIO == null) {
synchronized (this) {
if (fallbackFileIO == null) {
CatalogContext catalogContext = CatalogContext.create(options);
fallbackFileIO = FileIO.get(path, catalogContext);
}
}
}
return fallbackFileIO;
}

if (!fileIOMap.containsKey(schema)) {
synchronized (this) {
if (!fileIOMap.containsKey(schema)) {
CatalogContext catalogContext = CatalogContext.create(options);
FileIO fileIO = FileIO.get(path, catalogContext);
fileIOMap.put(path.toUri().getScheme(), fileIO);
}
}
}
return fileIOMap.get(path.toUri().getScheme());
}

private <T> T wrap(Func<T> func) throws IOException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(HybridFileIO.class.getClassLoader());
return func.apply();
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
}

/** Apply function with wrapping classloader. */
@FunctionalInterface
protected interface Func<T> {
T apply() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public FileStorePathFactory pathFactory() {

protected FileStorePathFactory pathFactory(String format) {
return new FileStorePathFactory(
options.path(),
options.schemaPath(),
partitionType,
options.partitionDefaultName(),
format,
Expand All @@ -119,12 +119,13 @@ protected FileStorePathFactory pathFactory(String format) {
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression(),
options.dataFilePathDirectory());
options.dataFilePathDirectory(),
options.dataPath());
}

@Override
public SnapshotManager snapshotManager() {
return new SnapshotManager(fileIO, options.path(), options.branch(), snapshotCache);
return new SnapshotManager(fileIO, options.schemaPath(), options.branch(), snapshotCache);
}

@Override
Expand Down Expand Up @@ -275,7 +276,7 @@ public ChangelogDeletion newChangelogDeletion() {

@Override
public TagManager newTagManager() {
return new TagManager(fileIO, options.path());
return new TagManager(fileIO, options.schemaPath());
}

@Override
Expand Down Expand Up @@ -344,7 +345,7 @@ public List<TagCallback> createTagCallbacks() {

@Override
public ServiceManager newServiceManager() {
return new ServiceManager(fileIO, options.path());
return new ServiceManager(fileIO, options.schemaPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void close() throws Exception {
for (DataFileMeta file : compactAfter) {
// appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we
// can directly delete the file in compactAfter.
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath()));
}

sinkWriter.close();
Expand All @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception {
} finally {
// remove small files
for (DataFileMeta file : files) {
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private void validateAutoCreateClose(Map<String, String> options) {
}

private void validateCustomTablePath(Map<String, String> options) {
if (!allowCustomTablePath() && options.containsKey(CoreOptions.PATH.key())) {
if (!allowCustomTablePath() && options.containsKey(CoreOptions.TABLE_SCHEMA_PATH.key())) {
throw new UnsupportedOperationException(
String.format(
"The current catalog %s does not support specifying the table path when creating a table.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
switch (storageType) {
case TABLE_LOCATION:
this.pathFactory = new IcebergPathFactory(new Path(table.location(), "metadata"));
this.pathFactory =
new IcebergPathFactory(new Path(table.tableDataPath(), "metadata"));
break;
case HADOOP_CATALOG:
case HIVE_CATALOG:
Expand Down Expand Up @@ -139,11 +140,12 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {

public static Path catalogTableMetadataPath(FileStoreTable table) {
Path icebergDBPath = catalogDatabasePath(table);
return new Path(icebergDBPath, String.format("%s/metadata", table.location().getName()));
return new Path(
icebergDBPath, String.format("%s/metadata", table.tableDataPath().getName()));
}

public static Path catalogDatabasePath(FileStoreTable table) {
Path dbPath = table.location().getParent();
Path dbPath = table.tableDataPath().getParent();
final String dbSuffix = ".db";
if (dbPath.getName().endsWith(dbSuffix)) {
String dbName =
Expand Down Expand Up @@ -250,7 +252,7 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
IcebergMetadata metadata =
new IcebergMetadata(
tableUuid,
table.location().toString(),
table.tableDataPath().toString(),
snapshotId,
icebergSchema.highestFieldId(),
Collections.singletonList(icebergSchema),
Expand Down Expand Up @@ -448,7 +450,7 @@ private boolean collectFileChanges(
boolean isAddOnly = true;
for (ManifestEntry entry : manifestEntries) {
String path =
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket())
fileStorePathFactory.dataBucketPath(entry.partition(), entry.bucket())
+ "/"
+ entry.fileName();
switch (entry.kind()) {
Expand Down Expand Up @@ -753,7 +755,7 @@ private void expireAllBefore(long snapshotId) throws IOException {

private class SchemaCache {

SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.tableSchemaPath());
Map<Long, IcebergSchema> schemas = new HashMap<>();

private IcebergSchema get(long schemaId) {
Expand Down
Loading
Loading