Skip to content

Commit

Permalink
add conf
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 31, 2024
1 parent 01bedc0 commit 080b3eb
Show file tree
Hide file tree
Showing 18 changed files with 98 additions and 34 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>file.suffix.include.compression</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to add file compression type in the file name of data file and changelog file.</td>
</tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ public class CoreOptions implements Serializable {
.defaultValue("changelog-")
.withDescription("Specify the file name prefix of changelog files.");

public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
key("file.suffix.include.compression")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to add file compression type in the file name of data file and changelog file.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
Expand Down Expand Up @@ -1598,6 +1605,10 @@ public String changelogFilePrefix() {
return options.get(CHANGELOG_FILE_PREFIX);
}

public boolean fileSuffixIncludeCompression() {
return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
}

public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public FileStorePathFactory pathFactory() {
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression())));
return pathFactoryMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,23 @@ public class DataFilePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;

public DataFilePathFactory(
Path parent,
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix,
boolean fileSuffixIncludeCompression,
String fileCompression) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;
}

Expand All @@ -65,7 +68,12 @@ public Path newChangelogPath() {
}

private Path newPath(String prefix) {
String extension = "." + fileCompression + "." + formatIdentifier;
String extension;
if (fileSuffixIncludeCompression) {
extension = "." + fileCompression + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
return new Path(parent, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FileStorePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;

private final AtomicInteger manifestFileCount;
Expand All @@ -59,6 +60,7 @@ public FileStorePathFactory(
String dataFilePrefix,
String changelogFilePrefix,
boolean legacyPartitionName,
boolean fileSuffixIncludeCompression,
String fileCompression) {
this.root = root;
this.uuid = UUID.randomUUID().toString();
Expand All @@ -68,6 +70,7 @@ public FileStorePathFactory(
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;

this.manifestFileCount = new AtomicInteger(0);
Expand Down Expand Up @@ -117,6 +120,7 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
formatIdentifier,
dataFilePrefix,
changelogFilePrefix,
fileSuffixIncludeCompression,
fileCompression);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ private DataFilePathFactory createPathFactory() {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testNoPartition() {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();

Expand All @@ -52,8 +53,6 @@ public void testNoPartition() {
+ "-"
+ i
+ "."
+ CoreOptions.FILE_COMPRESSION.defaultValue()
+ "."
+ CoreOptions.FILE_FORMAT.defaultValue()));
}
assertThat(pathFactory.toPath("my-data-file-name"))
Expand All @@ -68,6 +67,7 @@ public void testWithPartition() {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();

Expand All @@ -81,8 +81,6 @@ public void testWithPartition() {
+ "-"
+ i
+ "."
+ CoreOptions.FILE_COMPRESSION.defaultValue()
+ "."
+ CoreOptions.FILE_FORMAT.defaultValue()));
}
assertThat(pathFactory.toPath("my-data-file-name"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Expand All @@ -248,6 +249,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()));

return KeyValueFileWriterFactory.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public void initialize(String identifier, boolean statsDenseStore) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX
.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION
.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue())
.newPath(),
SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ protected ManifestFile createManifestFile(String pathStr) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
Long.MAX_VALUE,
null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private ManifestFile createManifestFile(String pathStr) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private ManifestList createManifestList(String pathStr) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testCreateDataFilePathFactoryWithPartition() {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
Expand Down Expand Up @@ -128,6 +129,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public TestChangelogDataReadWrite(String root) {
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -353,33 +352,38 @@ public void testDataFileSuffixName() {

spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (1, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T order by a").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,3,cc], [2,2,bb]]");
// enable file suffix
spark.conf().set("spark.paimon.file.suffix.include.compression", true);
spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
spark.sql("INSERT INTO T VALUES (4, 4, 'dd')");

List<String> beforeCompactFiles =
spark.sql("select file_path from `T$files`").collectAsList().stream()
.map(x -> x.getString(0))
.collect(Collectors.toList());
Assertions.assertEquals(3, beforeCompactFiles.size());
List<Row> data2 = spark.sql("SELECT * FROM T order by a").collectAsList();
assertThat(data2.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc], [4,4,dd]]");

// compact
assertThat(spark.sql("CALL sys.compact(table => 'T')").collectAsList().toString())
.isEqualTo("[[true]]");
List<String> afterCompactFiles =
// check files suffix name
List<String> files =
spark.sql("select file_path from `T$files`").collectAsList().stream()
.map(x -> x.getString(0))
.collect(Collectors.toList());
Assertions.assertEquals(1, afterCompactFiles.size());

// check files suffix name
List<String> files = new ArrayList<>(beforeCompactFiles);
String extension = "." + "zstd" + "." + "parquet";
files.addAll(afterCompactFiles);
for (String fileName : files) {
Assertions.assertTrue(fileName.endsWith(extension));
}
Assertions.assertEquals(4, files.size());

String defaultExtension = "." + "parquet";
String newExtension = "." + "zstd" + "." + "parquet";
// two data files end with ".parquet", two data file end with ".zstd.parquet"
Assertions.assertEquals(
2,
files.stream()
.filter(
name ->
name.endsWith(defaultExtension)
&& !name.endsWith(newExtension))
.count());
Assertions.assertEquals(
2, files.stream().filter(name -> name.endsWith(newExtension)).count());

// reset config
spark.conf().unset("spark.paimon.file.suffix.include.compression");
}

@Test
Expand All @@ -398,14 +402,32 @@ public void testChangelogFileSuffixName() throws Exception {
FileIO fileIO = table.fileIO();

spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
FileStatus[] files = fileIO.listStatus(new Path(tabLocation, "bucket-0"));
Assertions.assertEquals(1, dataFileCount(files, "changelog-"));

// check files suffix name
String extension = "." + "zstd" + "." + "parquet";
for (FileStatus file : files) {
Assertions.assertTrue(file.getPath().getName().endsWith(extension));
}
spark.conf().set("spark.paimon.file.suffix.include.compression", true);
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");

// collect changelog files
List<String> files =
Arrays.stream(fileIO.listStatus(new Path(tabLocation, "bucket-0")))
.map(name -> name.getPath().getName())
.filter(name -> name.startsWith("changelog-"))
.collect(Collectors.toList());
String defaultExtension = "." + "parquet";
String newExtension = "." + "zstd" + "." + "parquet";
// one changelog file end with ".parquet", one changelog file end with ".zstd.parquet"
Assertions.assertEquals(
1,
files.stream()
.filter(
name ->
name.endsWith(defaultExtension)
&& !name.endsWith(newExtension))
.count());
Assertions.assertEquals(
1, files.stream().filter(name -> name.endsWith(newExtension)).count());

// reset config
spark.conf().unset("spark.paimon.file.suffix.include.compression");
}

protected static FileStoreTable getTable(String tableName) {
Expand Down

0 comments on commit 080b3eb

Please sign in to comment.