Skip to content

Commit

Permalink
[core] Refactor FileStorePathFactory to clean test methods (apache#3093)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored and zhuangchong committed Mar 26, 2024
1 parent 860b021 commit abda5a7
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 132 deletions.
70 changes: 33 additions & 37 deletions paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

import java.nio.ByteOrder;

Expand Down Expand Up @@ -80,38 +77,6 @@ public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}

/**
* If it is a fixed-length field, we can call this BinaryRow's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
public static boolean isInFixedLengthPart(DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
case DECIMAL:
return Decimal.isCompact(((DecimalType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return Timestamp.isCompact(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return Timestamp.isCompact(((LocalZonedTimestampType) type).getPrecision());
default:
return false;
}
}

public static boolean isMutable(DataType type) {
return isInFixedLengthPart(type) || type.getTypeRoot() == DataTypeRoot.DECIMAL;
}

private final int arity;
private final int nullBitsSizeInBytes;

Expand Down Expand Up @@ -446,4 +411,35 @@ public boolean equals(Object o) {
public int hashCode() {
return MemorySegmentUtils.hashByWords(segments, offset, sizeInBytes);
}

public static BinaryRow singleColumn(@Nullable Integer i) {
BinaryRow row = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.reset();
if (i == null) {
writer.setNullAt(0);
} else {
writer.writeInt(0, i);
}
writer.complete();
return row;
}

public static BinaryRow singleColumn(@Nullable String string) {
BinaryString binaryString = string == null ? null : BinaryString.fromString(string);
return singleColumn(binaryString);
}

public static BinaryRow singleColumn(@Nullable BinaryString string) {
BinaryRow row = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.reset();
if (string == null) {
writer.setNullAt(0);
} else {
writer.writeString(0, string);
}
writer.complete();
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@ public class DataFilePathFactory {

public static final String CHANGELOG_FILE_PREFIX = "changelog-";

public static final String BUCKET_PATH_PREFIX = "bucket-";

private final Path bucketDir;
private final Path parent;
private final String uuid;

private final AtomicInteger pathCount;
private final String formatIdentifier;

public DataFilePathFactory(Path root, String partition, int bucket, String formatIdentifier) {
this.bucketDir = bucketPath(root, partition, bucket);
public DataFilePathFactory(Path parent, String formatIdentifier) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();

this.pathCount = new AtomicInteger(0);
Expand All @@ -60,22 +58,18 @@ public Path newChangelogPath() {

private Path newPath(String prefix) {
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "." + formatIdentifier;
return new Path(bucketDir, name);
return new Path(parent, name);
}

public Path toPath(String fileName) {
return new Path(bucketDir + "/" + fileName);
return new Path(parent + "/" + fileName);
}

@VisibleForTesting
public String uuid() {
return uuid;
}

public static Path bucketPath(Path tablePath, String partition, int bucket) {
return new Path(tablePath + "/" + partition + "/" + BUCKET_PATH_PREFIX + bucket);
}

public static String formatIdentifier(String fileName) {
int index = fileName.lastIndexOf('.');
if (index == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileEntry;
Expand Down Expand Up @@ -277,7 +278,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
? "partition "
+ FileStorePathFactory.getPartitionComputer(
partitionType,
FileStorePathFactory.PARTITION_DEFAULT_NAME
CoreOptions.PARTITION_DEFAULT_NAME
.defaultValue())
.generatePartValues(file.partition())
: "table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.io.DataFilePathFactory.BUCKET_PATH_PREFIX;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;

/**
* To remove the data files and metadata files that are not used by table (so-called "orphan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -74,13 +73,9 @@ public class ManifestsTable implements ReadonlyTable {
new DataField(3, "num_deleted_files", new BigIntType(false)),
new DataField(4, "schema_id", new BigIntType(false))));

private final FileIO fileIO;
private final Path location;
private final Table dataTable;
private final FileStoreTable dataTable;

public ManifestsTable(FileIO fileIO, Path location, Table dataTable) {
this.fileIO = fileIO;
this.location = location;
public ManifestsTable(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

Expand All @@ -91,12 +86,12 @@ public InnerTableScan newScan() {

@Override
public InnerTableRead newRead() {
return new ManifestsRead(fileIO, dataTable);
return new ManifestsRead(dataTable);
}

@Override
public String name() {
return location.getName() + SYSTEM_TABLE_SPLITTER + MANIFESTS;
return dataTable.name() + SYSTEM_TABLE_SPLITTER + MANIFESTS;
}

@Override
Expand All @@ -111,7 +106,7 @@ public List<String> primaryKeys() {

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new ManifestsTable(fileIO, location, dataTable.copy(dynamicOptions));
return new ManifestsTable(dataTable.copy(dynamicOptions));
}

private class ManifestsScan extends ReadOnceTableScan {
Expand All @@ -125,9 +120,7 @@ public InnerTableScan withFilter(Predicate predicate) {
@Override
protected Plan innerPlan() {
return () ->
Collections.singletonList(
new ManifestsSplit(
allManifests(fileIO, location, dataTable).size(), location));
Collections.singletonList(new ManifestsSplit(allManifests(dataTable).size()));
}
}

Expand All @@ -136,11 +129,9 @@ private static class ManifestsSplit implements Split {
private static final long serialVersionUID = 1L;

private final long rowCount;
private final Path location;

private ManifestsSplit(long rowCount, Path location) {
private ManifestsSplit(long rowCount) {
this.rowCount = rowCount;
this.location = location;
}

@Override
Expand All @@ -157,25 +148,22 @@ public boolean equals(Object o) {
return false;
}
ManifestsSplit that = (ManifestsSplit) o;
return Objects.equals(location, that.location);
return Objects.equals(rowCount, that.rowCount);
}

@Override
public int hashCode() {
return Objects.hash(location);
return Objects.hash(rowCount);
}
}

private static class ManifestsRead implements InnerTableRead {

private int[][] projection;

private final FileIO fileIO;
private final FileStoreTable dataTable;

private final Table dataTable;

public ManifestsRead(FileIO fileIO, Table dataTable) {
this.fileIO = fileIO;
public ManifestsRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

Expand All @@ -201,8 +189,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (!(split instanceof ManifestsSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((ManifestsSplit) split).location;
List<ManifestFileMeta> manifestFileMetas = allManifests(fileIO, location, dataTable);
List<ManifestFileMeta> manifestFileMetas = allManifests(dataTable);

Iterator<InternalRow> rows =
Iterators.transform(manifestFileMetas.iterator(), this::toRow);
Expand All @@ -224,10 +211,9 @@ private InternalRow toRow(ManifestFileMeta manifestFileMeta) {
}
}

private static List<ManifestFileMeta> allManifests(
FileIO fileIO, Path location, Table dataTable) {
private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options());
SnapshotManager snapshotManager = new SnapshotManager(fileIO, location);
SnapshotManager snapshotManager = dataTable.snapshotManager();
Long snapshotId = coreOptions.scanSnapshotId();
Snapshot snapshot = null;
if (snapshotId != null && snapshotManager.snapshotExists(snapshotId)) {
Expand All @@ -239,10 +225,11 @@ private static List<ManifestFileMeta> allManifests(
if (snapshot == null) {
return Collections.emptyList();
}
FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(location);
FileStorePathFactory fileStorePathFactory = dataTable.store().pathFactory();
FileFormat fileFormat = coreOptions.manifestFormat();
ManifestList manifestList =
new ManifestList.Factory(fileIO, fileFormat, fileStorePathFactory, null).create();
new ManifestList.Factory(dataTable.fileIO(), fileFormat, fileStorePathFactory, null)
.create();
return snapshot.allManifests(manifestList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) {
Path location = dataTable.location();
switch (type.toLowerCase()) {
case MANIFESTS:
return new ManifestsTable(fileIO, location, dataTable);
return new ManifestsTable(dataTable);
case SNAPSHOTS:
return new SnapshotsTable(fileIO, location, dataTable);
case OPTIONS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

package org.apache.paimon.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.types.RowType;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -33,19 +31,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.paimon.options.ConfigOptions.key;

/** Factory which produces {@link Path}s for manifest files. */
@ThreadSafe
public class FileStorePathFactory {

public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
key("partition.default-name")
.stringType()
.defaultValue("__DEFAULT_PARTITION__")
.withDescription(
"The default partition name in case the dynamic partition"
+ " column value is null/empty string.");
public static final String BUCKET_PATH_PREFIX = "bucket-";

private final Path root;
private final String uuid;
Expand All @@ -58,15 +48,6 @@ public class FileStorePathFactory {
private final AtomicInteger indexFileCount;
private final AtomicInteger statsFileCount;

public FileStorePathFactory(Path root) {
this(
root,
RowType.builder().build(),
PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString());
}

// for tables without partition, partitionType should be a row type with 0 columns (not null)
public FileStorePathFactory(
Path root, RowType partitionType, String defaultPartValue, String formatIdentifier) {
this.root = root;
Expand Down Expand Up @@ -116,12 +97,12 @@ public Path toManifestListPath(String manifestListName) {
}

public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
return new DataFilePathFactory(
root, getPartitionString(partition), bucket, formatIdentifier);
return new DataFilePathFactory(bucketPath(partition, bucket), formatIdentifier);
}

public Path bucketPath(BinaryRow partition, int bucket) {
return DataFilePathFactory.bucketPath(root, getPartitionString(partition), bucket);
return new Path(
root + "/" + getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket);
}

/** IMPORTANT: This method is NOT THREAD SAFE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,7 @@ private InternalRow row(int id, String name, String dt) {

private DataFilePathFactory createPathFactory() {
return new DataFilePathFactory(
new Path(tempDir.toString()),
"dt=" + PART,
0,
new Path(tempDir + "/dt=" + PART + "/bucket-0"),
CoreOptions.FILE_FORMAT.defaultValue().toString());
}

Expand Down
Loading

0 comments on commit abda5a7

Please sign in to comment.