Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] support write to the external path #4770

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1008,5 +1008,11 @@
<td>Integer</td>
<td>The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.</td>
</tr>
<tr>
<td><h5>data-file.external-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The path where the data of this table is currently written.</td>
</tr>
</tbody>
</table>
30 changes: 30 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,31 @@ public class CoreOptions implements Serializable {
+ "if there is no primary key, the full row will be used.")
.build());

public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATH =
key("data-file.external-path")
.stringType()
.noDefaultValue()
.withDescription("The path where the data of this table is currently written.");

// todo, this path is the table schema path, the name will be changed in the later PR.
@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> PATH =
key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> TABLE_DATA_PATH =
key("table.data.path")
.stringType()
.noDefaultValue()
.withDescription(
"The data file path of this table in the filesystem. if "
+ DATA_FILE_EXTERNAL_PATH.key()
+ "is not set, it will be same with."
+ PATH.key());

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

Expand Down Expand Up @@ -1552,6 +1570,10 @@ public Path path() {
return path(options.toMap());
}

public Path dataPath() {
return dataPath(options.toMap());
}

public String branch() {
return branch(options.toMap());
}
Expand All @@ -1571,6 +1593,10 @@ public static Path path(Options options) {
return new Path(options.get(PATH));
}

public static Path dataPath(Map<String, String> options) {
return new Path(options.get(TABLE_DATA_PATH.key()));
}

public TableType type() {
return options.get(TYPE);
}
Expand Down Expand Up @@ -2364,6 +2390,10 @@ public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}

public String dataFileExternalPath() {
neuyilan marked this conversation as resolved.
Show resolved Hide resolved
return options.get(DATA_FILE_EXTERNAL_PATH);
}

public boolean dataFileThinMode() {
return options.get(DATA_FILE_THIN_MODE);
}
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class Path implements Comparable<Path>, Serializable {
/** A hierarchical URI. */
private URI uri;

private boolean isExternalPath;
neuyilan marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new Path based on the child path resolved against the parent path.
*
Expand Down Expand Up @@ -385,4 +387,13 @@ public int hashCode() {
public int compareTo(Path that) {
return this.uri.compareTo(that.uri);
}

public Path setExternalPath(boolean externalPath) {
this.isExternalPath = externalPath;
return this;
}

public boolean isExternalPath() {
return isExternalPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ protected FileStorePathFactory pathFactory(String format) {
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression(),
options.dataFilePathDirectory());
options.dataFilePathDirectory(),
options.dataPath());
}

@Override
Expand Down
22 changes: 22 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,28 @@ public DataFileMeta copy(List<String> newExtraFiles) {
externalPath);
}

public DataFileMeta copy(String newExternalPath) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols,
newExternalPath);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
return new DataFileMeta(
fileName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ public class DataFilePathFactory {
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;
private final boolean isExternalPath;

public DataFilePathFactory(
Path parent,
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix,
boolean fileSuffixIncludeCompression,
String fileCompression) {
String fileCompression,
boolean isExternalPath) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
Expand All @@ -59,6 +61,7 @@ public DataFilePathFactory(
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;
this.isExternalPath = isExternalPath;
}

public Path newPath() {
Expand All @@ -74,7 +77,7 @@ public String newChangelogFileName() {
}

public Path newPath(String prefix) {
return new Path(parent, newFileName(prefix));
return new Path(parent, newFileName(prefix)).setExternalPath(isExternalPath);
}

private String newFileName(String prefix) {
Expand All @@ -88,21 +91,27 @@ private String newFileName(String prefix) {
}

public Path toPath(DataFileMeta file) {
return file.externalPath().map(Path::new).orElse(new Path(parent, file.fileName()));
return file.externalPath()
.map(Path::new)
.orElse(new Path(parent, file.fileName()))
.setExternalPath(file.externalPath().isPresent());
}

public Path toPath(FileEntry file) {
return Optional.ofNullable(file.externalPath())
.map(Path::new)
.orElse(new Path(parent, file.fileName()));
.orElse(new Path(parent, file.fileName()))
.setExternalPath(Optional.ofNullable(file.externalPath()).isPresent());
}

public Path toAlignedPath(String fileName, DataFileMeta aligned) {
return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName)
.setExternalPath(aligned.externalPathDir().isPresent());
}

public static Path dataFileToFileIndexPath(Path dataFilePath) {
return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX);
return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX)
.setExternalPath(dataFilePath.isExternalPath());
}

public static Path createNewFileIndexFilePath(Path filePath) {
Expand All @@ -121,7 +130,9 @@ public static Path createNewFileIndexFilePath(Path filePath) {
}
}
return new Path(
filePath.getParent(), fileName.substring(0, dot) + "-" + 1 + INDEX_PATH_SUFFIX);
filePath.getParent(),
fileName.substring(0, dot) + "-" + 1 + INDEX_PATH_SUFFIX)
.setExternalPath(filePath.isExternalPath());
}

public static String formatIdentifier(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public DataFileMeta result() throws IOException {
? DataFileIndexWriter.EMPTY_RESULT
: dataFileIndexWriter.result();

String externalPath = path.isExternalPath() ? path.toString() : null;
return new DataFileMeta(
path.getName(),
fileIO.getFileSize(path),
Expand All @@ -196,7 +197,7 @@ public DataFileMeta result() throws IOException {
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey(),
null);
externalPath);
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public DataFileMeta result() throws IOException {
dataFileIndexWriter == null
? DataFileIndexWriter.EMPTY_RESULT
: dataFileIndexWriter.result();
String externalPath = path.isExternalPath() ? path.toString() : null;
return DataFileMeta.forAppend(
path.getName(),
fileIO.getFileSize(path),
Expand All @@ -125,6 +126,6 @@ public DataFileMeta result() throws IOException {
indexResult.embeddedIndexBytes(),
fileSource,
statsPair.getKey(),
null);
externalPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter()
suggestedFileSize);
}

private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {
public ManifestEntryWriter createManifestEntryWriter(Path manifestPath) {
return new ManifestEntryWriter(writerFactory, manifestPath, compression);
}

/** Writer for {@link ManifestEntry}. */
public class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {

private final SimpleStatsCollector partitionStatsCollector;
private final SimpleStatsConverter partitionStatsSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.function.BiConsumer;

import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TABLE_DATA_PATH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Abstract {@link FileStoreTable}. */
Expand All @@ -98,6 +99,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable {

protected final FileIO fileIO;
protected final Path path;
protected final Path tableDataPath;
protected final TableSchema tableSchema;
protected final CatalogEnvironment catalogEnvironment;

Expand All @@ -109,7 +111,8 @@ protected AbstractFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
CatalogEnvironment catalogEnvironment) {
CatalogEnvironment catalogEnvironment,
Path tableDataPath) {
this.fileIO = fileIO;
this.path = path;
if (!tableSchema.options().containsKey(PATH.key())) {
Expand All @@ -118,8 +121,16 @@ protected AbstractFileStoreTable(
newOptions.put(PATH.key(), path.toString());
tableSchema = tableSchema.copy(newOptions);
}

if (!tableSchema.options().containsKey(TABLE_DATA_PATH.key())) {
Map<String, String> newOptions = new HashMap<>(tableSchema.options());
newOptions.put(TABLE_DATA_PATH.key(), tableDataPath.toString());
tableSchema = tableSchema.copy(newOptions);
}

this.tableSchema = tableSchema;
this.catalogEnvironment = catalogEnvironment;
this.tableDataPath = tableDataPath;
}

public String currentBranch() {
Expand Down Expand Up @@ -336,6 +347,9 @@ private FileStoreTable copyInternal(Map<String, String> dynamicOptions, boolean
// set path always
newOptions.set(PATH, path.toString());

// set tableDataPath always
newOptions.set(TABLE_DATA_PATH, tableDataPath.toString());

// set dynamic options with default values
CoreOptions.setDefaultValues(newOptions);

Expand Down Expand Up @@ -372,9 +386,9 @@ public FileStoreTable copy(TableSchema newTableSchema) {
AbstractFileStoreTable copied =
newTableSchema.primaryKeys().isEmpty()
? new AppendOnlyFileStoreTable(
fileIO, path, newTableSchema, catalogEnvironment)
fileIO, path, newTableSchema, catalogEnvironment, tableDataPath)
: new PrimaryKeyFileStoreTable(
fileIO, path, newTableSchema, catalogEnvironment);
fileIO, path, newTableSchema, catalogEnvironment, tableDataPath);
if (snapshotCache != null) {
copied.setSnapshotCache(snapshotCache);
}
Expand Down Expand Up @@ -407,6 +421,11 @@ public Path location() {
return path;
}

@Override
public Path dataLocation() {
return tableDataPath;
}

@Override
public TableSchema schema() {
return tableSchema;
Expand Down Expand Up @@ -738,7 +757,12 @@ public FileStoreTable switchToBranch(String branchName) {
branchOptions.set(CoreOptions.BRANCH, targetBranch);
branchSchema = branchSchema.copy(branchOptions.toMap());
return FileStoreTableFactory.create(
fileIO(), location(), branchSchema, new Options(), catalogEnvironment());
fileIO(),
location(),
branchSchema,
new Options(),
catalogEnvironment(),
tableDataPath);
}

private RollbackHelper rollbackHelper() {
Expand All @@ -764,6 +788,8 @@ public boolean equals(Object o) {
return false;
}
AbstractFileStoreTable that = (AbstractFileStoreTable) o;
return Objects.equals(path, that.path) && Objects.equals(tableSchema, that.tableSchema);
return Objects.equals(path, that.path)
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(tableDataPath, that.tableDataPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
private transient AppendOnlyFileStore lazyStore;

AppendOnlyFileStoreTable(FileIO fileIO, Path path, TableSchema tableSchema) {
this(fileIO, path, tableSchema, CatalogEnvironment.empty());
this(fileIO, path, tableSchema, CatalogEnvironment.empty(), path);
}

AppendOnlyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
CatalogEnvironment catalogEnvironment) {
super(fileIO, path, tableSchema, catalogEnvironment);
CatalogEnvironment catalogEnvironment,
Path tableDataPath) {
super(fileIO, path, tableSchema, catalogEnvironment, tableDataPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ public interface DataTable extends InnerTable {

Path location();

Path dataLocation();

FileIO fileIO();
}
Loading
Loading