Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data file root location in DataFileMeta #4751

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1008,5 +1008,11 @@
<td>Integer</td>
<td>The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.</td>
</tr>
<tr>
<td><h5>data-file.external-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The location where the data of this table is currently written.</td>
</tr>
</tbody>
</table>
29 changes: 29 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> WAREHOUSE_ROOT_PATH =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you not introduce any option in this pr? Just modify data file meta.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

key("warehouse.root-path")
.stringType()
.noDefaultValue()
.withDescription("The file path of the warehouse in the filesystem.");

public static final ConfigOption<String> DATA_FILE_EXTERNAL_PATH =
key("data-file.external-path")
.stringType()
.noDefaultValue()
.withDescription(
"The location where the data of this table is currently written.");

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

Expand Down Expand Up @@ -2368,6 +2382,21 @@ public boolean dataFileThinMode() {
return options.get(DATA_FILE_THIN_MODE);
}

public String getDataFileExternalPath() {
return options.get(DATA_FILE_EXTERNAL_PATH);
}

public String getWarehouseRootPath() {
return options.get(WAREHOUSE_ROOT_PATH);
}

public String getDataRootLocation() {
if (getDataFileExternalPath() == null || getDataFileExternalPath().isEmpty()) {
return getWarehouseRootPath();
}
return getDataFileExternalPath();
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
62 changes: 48 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public class DataFileMeta {
new DataField(
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
DataTypes.ARRAY(DataTypes.STRING().notNull())),
new DataField(17, "_DATA_ROOT_LOCATION", newStringType(true))));

public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
Expand Down Expand Up @@ -120,6 +121,13 @@ public class DataFileMeta {

private final @Nullable List<String> valueStatsCols;

/**
* the data root location that the file resides in, if it is null, the file is in the default
* warehouse path, when {@link CoreOptions#DATA_FILE_PATH_DIRECTORY} is set, new writen files
* will be persisted in {@link CoreOptions#DATA_FILE_PATH_DIRECTORY}.
*/
private final @Nullable String dataRootLocation;
Copy link
Contributor

@JingsongLi JingsongLi Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to introduce a new version for CommitMessage and DataSplit.

You can refer to #4322

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that since version 0.9 [1], the versions of commitMessage and DataSplit have been changed. Do I need to make another change? I think it only needs to be changed once in version 1.0.

[1] https://github.com/apache/paimon/blob/release-0.9/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
[1] https://github.com/apache/paimon/blob/release-0.9/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep compatibility, even it is just in 1.0-SNAPSHOT, for example, creating DataFileMeta10LegacySerializer for previous DataFileMetaSerializer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, if it's for compatibility with 1.0-SNAPSHOT, it's worth doing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to externalPath?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I'm willing to express that this path is the same root path as the warehouse, not the full path. But it doesn't affect the modification of DataFileMeta this time. I'll change it to extrenalPath first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is better to just full path?


public static DataFileMeta forAppend(
String fileName,
long fileSize,
Expand Down Expand Up @@ -149,7 +157,8 @@ public static DataFileMeta forAppend(
0L,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand Down Expand Up @@ -186,7 +195,8 @@ public DataFileMeta(
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand Down Expand Up @@ -222,7 +232,8 @@ public DataFileMeta(
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand All @@ -242,7 +253,8 @@ public DataFileMeta(
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String dataRootLocation) {
this.fileName = fileName;
this.fileSize = fileSize;

Expand All @@ -264,6 +276,7 @@ public DataFileMeta(
this.deleteRowCount = deleteRowCount;
this.fileSource = fileSource;
this.valueStatsCols = valueStatsCols;
this.dataRootLocation = dataRootLocation;
}

public String fileName() {
Expand Down Expand Up @@ -357,6 +370,19 @@ public String fileFormat() {
return split[split.length - 1];
}

@Nullable
public String getDataRootLocationString() {
return dataRootLocation;
}

@Nullable
public Path getDataRootLocation() {
if (dataRootLocation == null) {
return null;
}
return new Path(dataRootLocation);
}

public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}
Expand Down Expand Up @@ -385,7 +411,8 @@ public DataFileMeta upgrade(int newLevel) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

public DataFileMeta rename(String newFileName) {
Expand All @@ -406,7 +433,8 @@ public DataFileMeta rename(String newFileName) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

public DataFileMeta copyWithoutStats() {
Expand All @@ -427,7 +455,8 @@ public DataFileMeta copyWithoutStats() {
deleteRowCount,
embeddedIndex,
fileSource,
Collections.emptyList());
Collections.emptyList(),
dataRootLocation);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
Expand Down Expand Up @@ -455,7 +484,8 @@ public DataFileMeta copy(List<String> newExtraFiles) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
Expand All @@ -476,7 +506,8 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) {
deleteRowCount,
newEmbeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

@Override
Expand Down Expand Up @@ -504,7 +535,8 @@ public boolean equals(Object o) {
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount)
&& Objects.equals(fileSource, that.fileSource)
&& Objects.equals(valueStatsCols, that.valueStatsCols);
&& Objects.equals(valueStatsCols, that.valueStatsCols)
&& Objects.equals(dataRootLocation, that.dataRootLocation);
}

@Override
Expand All @@ -526,7 +558,8 @@ public int hashCode() {
creationTime,
deleteRowCount,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

@Override
Expand All @@ -536,7 +569,7 @@ public String toString() {
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, "
+ "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s}",
+ "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, dataRootLocation: %s}",
fileName,
fileSize,
rowCount,
Expand All @@ -553,7 +586,8 @@ public String toString() {
creationTime,
deleteRowCount,
fileSource,
valueStatsCols);
valueStatsCols,
dataRootLocation);
}

public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)),
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public InternalRow toRow(DataFileMeta meta) {
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()));
toStringArrayData(meta.valueStatsCols()),
BinaryString.fromString(meta.getDataRootLocationString()));
}

@Override
Expand All @@ -80,6 +81,7 @@ public DataFileMeta fromRow(InternalRow row) {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)));
row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)),
row.isNullAt(17) ? null : row.getString(17).toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public DataFileMeta fromRow(InternalRow row) {
null,
null,
null,
null,
null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private static DataFileMeta newFile(long timeMillis) {
0L,
null,
FileSource.APPEND,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
maxSeq - minSeq + 1,
null,
FileSource.APPEND,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testProduction() throws IOException {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"));
Arrays.asList("field1", "field2", "field3"),
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

LinkedHashMap<String, DeletionVectorMeta> dvMetas = new LinkedHashMap<>();
Expand Down Expand Up @@ -136,7 +137,8 @@ public void testCompatibilityToVersion4() throws IOException {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"));
Arrays.asList("field1", "field2", "field3"),
"hdfs://localhost:9000/path/to/file");
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

LinkedHashMap<String, DeletionVectorMeta> dvMetas = new LinkedHashMap<>();
Expand Down Expand Up @@ -206,6 +208,7 @@ public void testCompatibilityToVersion3() throws IOException {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

Expand Down Expand Up @@ -276,6 +279,7 @@ public void testCompatibilityToVersion2() throws IOException {
11L,
new byte[] {1, 2, 4},
null,
null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

Expand Down Expand Up @@ -346,6 +350,7 @@ public void testCompatibilityToVersion2PaimonV07() throws IOException {
null,
null,
null,
null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected ManifestEntry makeEntry(
0L, // not used
embeddedIndex, // not used
FileSource.APPEND,
null,
null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private DataFileMeta makeInterval(int left, int right) {
0L,
null,
FileSource.APPEND,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void testExpireExtraFiles() throws IOException {
0L,
null,
FileSource.APPEND,
null,
null);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void testSerializerNormal() throws Exception {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"));
Arrays.asList("field1", "field2", "field3"),
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L);
Expand Down Expand Up @@ -194,6 +195,7 @@ public void testSerializerCompatibleV1() throws Exception {
11L,
new byte[] {1, 2, 4},
null,
null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

Expand Down Expand Up @@ -254,6 +256,7 @@ public void testSerializerCompatibleV2() throws Exception {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

Expand Down Expand Up @@ -314,7 +317,8 @@ public void testSerializerCompatibleV3() throws Exception {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"));
Arrays.asList("field1", "field2", "field3"),
"hdfs:///path/to/warehouse");
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null);
Expand Down
Binary file modified paimon-core/src/test/resources/compatibility/datasplit-v3
Binary file not shown.
Binary file not shown.
Loading