Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support custom data file name prefix #4041

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>file.prefix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the file name prefix of data files.</td>
</tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Default file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.");

public static final ConfigOption<String> FILE_PREFIX =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also introduce changelog-file.prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key("file.prefix")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data-file.prefix

Default value is "data-".

.stringType()
.noDefaultValue()
.withDescription("Specify the file name prefix of data files.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
Expand Down Expand Up @@ -1448,6 +1454,10 @@ private static String normalizeFileFormat(String fileFormat) {
return fileFormat.toLowerCase();
}

public String filePrefix() {
return options.get(FILE_PREFIX);
}

public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public FileStorePathFactory pathFactory() {
options.path(),
partitionType,
options.partitionDefaultName(),
options.fileFormat().getFormatIdentifier());
options.fileFormat().getFormatIdentifier(),
options.filePrefix());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
options.path(),
partitionType,
options.partitionDefaultName(),
format)));
format,
options.filePrefix())));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ public class DataFilePathFactory {

private final AtomicInteger pathCount;
private final String formatIdentifier;
private final String filePrefix;

public DataFilePathFactory(Path parent, String formatIdentifier) {
public DataFilePathFactory(Path parent, String formatIdentifier, String filePrefix) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();

this.pathCount = new AtomicInteger(0);
this.formatIdentifier = formatIdentifier;
this.filePrefix = filePrefix;
}

public Path newPath() {
if (filePrefix != null) {
return newPath(filePrefix);
}
return newPath(DATA_FILE_PREFIX);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class FileStorePathFactory {
private final String uuid;
private final InternalRowPartitionComputer partitionComputer;
private final String formatIdentifier;
private final String filePrefix;

private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
Expand All @@ -49,12 +50,17 @@ public class FileStorePathFactory {
private final AtomicInteger statsFileCount;

public FileStorePathFactory(
Path root, RowType partitionType, String defaultPartValue, String formatIdentifier) {
Path root,
RowType partitionType,
String defaultPartValue,
String formatIdentifier,
String filePrefix) {
this.root = root;
this.uuid = UUID.randomUUID().toString();

this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
this.formatIdentifier = formatIdentifier;
this.filePrefix = filePrefix;

this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,7 +103,7 @@ public Path toManifestListPath(String manifestListName) {
}

public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
return new DataFilePathFactory(bucketPath(partition, bucket), formatIdentifier);
return new DataFilePathFactory(bucketPath(partition, bucket), formatIdentifier, filePrefix);
}

public Path bucketPath(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ private InternalRow row(int id, String name, String dt) {
private DataFilePathFactory createPathFactory() {
return new DataFilePathFactory(
new Path(tempDir + "/dt=" + PART + "/bucket-0"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
}

private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
assertThat(path.toString().endsWith(format)).isTrue();

DataFilePathFactory dataFilePathFactory =
new DataFilePathFactory(new Path(tempDir + "/dt=1/bucket-1"), format);
new DataFilePathFactory(
new Path(tempDir + "/dt=1/bucket-1"),
format,
CoreOptions.FILE_PREFIX.defaultValue());
FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
CoreOptions options = new CoreOptions(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testNoPartition() {
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand All @@ -60,7 +61,8 @@ public void testWithPartition() {
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/dt=20211224/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
path,
RowType.of(),
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
format);
format,
CoreOptions.FILE_PREFIX.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -262,7 +263,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
path,
RowType.of(),
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString()));
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue()));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void initialize(String identifier) {
new Path(tempDir + "/bucket-0"),
CoreOptions.FILE_FORMAT
.defaultValue()
.toString())
.toString(),
CoreOptions.FILE_PREFIX.defaultValue())
.newPath(),
SCHEMA,
fileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue()),
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.FILE_PREFIX.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private ManifestFile createManifestFile(String pathStr) {
path,
DEFAULT_PART_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private ManifestList createManifestList(String pathStr) {
path,
TestKeyValueGenerator.DEFAULT_PART_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void testCreateDataFilePathFactoryWithPartition() {
new DataType[] {new VarCharType(10), new IntType()},
new String[] {"dt", "hr"}),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -120,6 +121,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
root,
RowType.builder().build(),
PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TestChangelogDataReadWrite(String root) {
tablePath,
RowType.of(new IntType()),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
tableRoot,
RowType.of(),
new CoreOptions(new Options()).partitionDefaultName(),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -224,4 +226,66 @@ public void testReadWriteUnawareBucketTable() {
rows = spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
assertThat(rows.toString()).isEqualTo("[[0]]");
}

@Test
public void testDefaultDataFilePrefix() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");

spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("data-"));
}
}

@Test
public void testDataFilePrefixForAppendOnlyTable() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");

spark.conf().set("spark.paimon.file.prefix", "test-");
spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("test-"));
}
}

@Test
public void testDataFilePrefixForPKTable() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)" + " TBLPROPERTIES ('primary-key'='a')");

spark.conf().set("spark.paimon.file.prefix", "test-");
spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (1, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T order by a").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,3,cc], [2,2,bb]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("test-"));
}
}
}
Loading