Skip to content

Commit

Permalink
add file.prefix conf
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Aug 22, 2024
1 parent 65f400e commit e6c856a
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 19 deletions.
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Default file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.");

public static final ConfigOption<String> FILE_PREFIX =
key("file.prefix")
.stringType()
.noDefaultValue()
.withDescription("Specify the file name prefix of data files.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
Expand Down Expand Up @@ -1433,6 +1439,10 @@ private static String normalizeFileFormat(String fileFormat) {
return fileFormat.toLowerCase();
}

public String filePrefix() {
return options.get(FILE_PREFIX);
}

public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public FileStorePathFactory pathFactory() {
options.path(),
partitionType,
options.partitionDefaultName(),
options.fileFormat().getFormatIdentifier());
options.fileFormat().getFormatIdentifier(),
options.filePrefix());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
options.path(),
partitionType,
options.partitionDefaultName(),
format)));
format,
options.filePrefix())));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ public class DataFilePathFactory {

private final AtomicInteger pathCount;
private final String formatIdentifier;
private final String filePrefix;

public DataFilePathFactory(Path parent, String formatIdentifier) {
public DataFilePathFactory(Path parent, String formatIdentifier, String filePrefix) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();

this.pathCount = new AtomicInteger(0);
this.formatIdentifier = formatIdentifier;
this.filePrefix = filePrefix;
}

public Path newPath() {
if (filePrefix != null) {
return newPath(filePrefix);
}
return newPath(DATA_FILE_PREFIX);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class FileStorePathFactory {
private final String uuid;
private final InternalRowPartitionComputer partitionComputer;
private final String formatIdentifier;
private final String filePrefix;

private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
Expand All @@ -49,12 +50,17 @@ public class FileStorePathFactory {
private final AtomicInteger statsFileCount;

public FileStorePathFactory(
Path root, RowType partitionType, String defaultPartValue, String formatIdentifier) {
Path root,
RowType partitionType,
String defaultPartValue,
String formatIdentifier,
String filePrefix) {
this.root = root;
this.uuid = UUID.randomUUID().toString();

this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
this.formatIdentifier = formatIdentifier;
this.filePrefix = filePrefix;

this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,7 +103,7 @@ public Path toManifestListPath(String manifestListName) {
}

public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
return new DataFilePathFactory(bucketPath(partition, bucket), formatIdentifier);
return new DataFilePathFactory(bucketPath(partition, bucket), formatIdentifier, filePrefix);
}

public Path bucketPath(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ private InternalRow row(int id, String name, String dt) {
private DataFilePathFactory createPathFactory() {
return new DataFilePathFactory(
new Path(tempDir + "/dt=" + PART + "/bucket-0"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
}

private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
assertThat(path.toString().endsWith(format)).isTrue();

DataFilePathFactory dataFilePathFactory =
new DataFilePathFactory(new Path(tempDir + "/dt=1/bucket-1"), format);
new DataFilePathFactory(
new Path(tempDir + "/dt=1/bucket-1"),
format,
CoreOptions.FILE_PREFIX.defaultValue());
FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
CoreOptions options = new CoreOptions(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testNoPartition() {
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand All @@ -60,7 +61,8 @@ public void testWithPartition() {
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/dt=20211224/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
path,
RowType.of(),
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
format);
format,
CoreOptions.FILE_PREFIX.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -262,7 +263,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
path,
RowType.of(),
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString()));
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue()));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void initialize(String identifier) {
new Path(tempDir + "/bucket-0"),
CoreOptions.FILE_FORMAT
.defaultValue()
.toString())
.toString(),
CoreOptions.FILE_PREFIX.defaultValue())
.newPath(),
SCHEMA,
fileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue()),
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.FILE_PREFIX.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private ManifestFile createManifestFile(String pathStr) {
path,
DEFAULT_PART_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private ManifestList createManifestList(String pathStr) {
path,
TestKeyValueGenerator.DEFAULT_PART_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void testCreateDataFilePathFactoryWithPartition() {
new DataType[] {new VarCharType(10), new IntType()},
new String[] {"dt", "hr"}),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -120,6 +121,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
root,
RowType.builder().build(),
PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TestChangelogDataReadWrite(String root) {
tablePath,
RowType.of(new IntType()),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
tableRoot,
RowType.of(),
new CoreOptions(new Options()).partitionDefaultName(),
CoreOptions.FILE_FORMAT.defaultValue().toString());
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.FILE_PREFIX.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -224,4 +226,66 @@ public void testReadWriteUnawareBucketTable() {
rows = spark.sql("SELECT max(bucket) FROM `T$FILES`").collectAsList();
assertThat(rows.toString()).isEqualTo("[[0]]");
}

@Test
public void testDefaultDataFilePrefix() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");

spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("data-"));
}
}

@Test
public void testDataFilePrefixForAppendOnlyTable() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)");

spark.conf().set("spark.paimon.file.prefix", "test-");
spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("test-"));
}
}

@Test
public void testDataFilePrefixForPKTable() {
spark.sql("CREATE TABLE T (a INT, b INT, c STRING)" + " TBLPROPERTIES ('primary-key'='a')");

spark.conf().set("spark.paimon.file.prefix", "test-");
spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
spark.sql("INSERT INTO T VALUES (1, 3, 'cc')");

List<Row> data = spark.sql("SELECT * FROM T order by a").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,3,cc], [2,2,bb]]");

List<Row> rows = spark.sql("select file_path from `T$files`").collectAsList();
List<String> fileNames =
rows.stream().map(x -> x.getString(0)).collect(Collectors.toList());
Assertions.assertEquals(3, fileNames.size());
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("test-"));
}
}
}

0 comments on commit e6c856a

Please sign in to comment.