Skip to content

Commit

Permalink
[core] Expose IndexFile in Split. (#3226)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Apr 26, 2024
1 parent a74f49d commit 01f0764
Show file tree
Hide file tree
Showing 24 changed files with 363 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public BucketFileRead bucketReader(BinaryRow partition, int bucket) {
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.rawConvertible(true)
.withBucketPath(
pathFactory
.bucketPath(partition, bucket)
.toString())
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Input splits. Needed by most batch computation engines. */
Expand All @@ -56,7 +57,8 @@ public class DataSplit implements Split {
private List<DataFileMeta> dataFiles;
@Nullable private List<DeletionFile> dataDeletionFiles;

private List<RawFile> rawFiles = Collections.emptyList();
private boolean rawConvertible;
private String bucketPath;

public DataSplit() {}

Expand Down Expand Up @@ -93,6 +95,14 @@ public boolean isStreaming() {
return isStreaming;
}

public boolean rawConvertible() {
return rawConvertible;
}

public String getBucketPath() {
return bucketPath;
}

public OptionalLong getLatestFileCreationEpochMillis() {
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
}
Expand All @@ -108,13 +118,61 @@ public long rowCount() {

@Override
public Optional<List<RawFile>> convertToRawFiles() {
if (rawFiles.isEmpty()) {
return Optional.empty();
if (rawConvertible) {
return Optional.of(
dataFiles.stream()
.map(f -> makeRawTableFile(bucketPath, f))
.collect(Collectors.toList()));
} else {
return Optional.of(rawFiles);
return Optional.empty();
}
}

private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
return new RawFile(
bucketPath + "/" + meta.fileName(),
0,
meta.fileSize(),
meta.fileFormat()
.map(t -> t.toString().toLowerCase())
.orElseThrow(
() ->
new RuntimeException(
"Can't find format from file: "
+ bucketPath
+ "/"
+ meta.fileName())),
meta.schemaId(),
meta.rowCount());
}

@Override
@Nullable
public Optional<List<IndexFile>> indexFiles() {
List<IndexFile> indexFiles = new ArrayList<>();
boolean hasIndexFile = false;
for (DataFileMeta file : dataFiles) {
List<String> exFiles =
file.extraFiles().stream()
.filter(s -> s.endsWith(INDEX_PATH_SUFFIX))
.collect(Collectors.toList());
if (exFiles.isEmpty()) {
indexFiles.add(null);
} else if (exFiles.size() == 1) {
hasIndexFile = true;
indexFiles.add(new IndexFile(bucketPath + "/" + exFiles.get(0)));
} else {
throw new RuntimeException(
"Wrong number of file index for file "
+ file.fileName()
+ " index files: "
+ String.join(",", exFiles));
}
}

return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -131,7 +189,8 @@ public boolean equals(Object o) {
&& Objects.equals(dataFiles, split.dataFiles)
&& Objects.equals(dataDeletionFiles, split.dataDeletionFiles)
&& isStreaming == split.isStreaming
&& Objects.equals(rawFiles, split.rawFiles);
&& rawConvertible == split.rawConvertible
&& Objects.equals(bucketPath, split.bucketPath);
}

@Override
Expand All @@ -144,7 +203,8 @@ public int hashCode() {
dataFiles,
dataDeletionFiles,
isStreaming,
rawFiles);
rawConvertible,
bucketPath);
}

private void writeObject(ObjectOutputStream out) throws IOException {
Expand All @@ -164,7 +224,8 @@ private void assign(DataSplit other) {
this.dataFiles = other.dataFiles;
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
this.rawFiles = other.rawFiles;
this.rawConvertible = other.rawConvertible;
this.bucketPath = other.bucketPath;
}

public void serialize(DataOutputView out) throws IOException {
Expand All @@ -189,10 +250,8 @@ public void serialize(DataOutputView out) throws IOException {

out.writeBoolean(isStreaming);

out.writeInt(rawFiles.size());
for (RawFile rawFile : rawFiles) {
rawFile.serialize(out);
}
out.writeBoolean(rawConvertible);
out.writeUTF(bucketPath);
}

public static DataSplit deserialize(DataInputView in) throws IOException {
Expand All @@ -218,12 +277,8 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
List<DeletionFile> dataDeletionFiles = DeletionFile.deserializeList(in);

boolean isStreaming = in.readBoolean();

int rawFileNum = in.readInt();
List<RawFile> rawFiles = new ArrayList<>();
for (int i = 0; i < rawFileNum; i++) {
rawFiles.add(RawFile.deserialize(in));
}
boolean rawConvertible = in.readBoolean();
String bucketPath = in.readUTF();

DataSplit.Builder builder =
builder()
Expand All @@ -233,7 +288,9 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
.rawFiles(rawFiles);
.rawConvertible(rawConvertible)
.withBucketPath(bucketPath);

if (beforeDeletionFiles != null) {
builder.withBeforeDeletionFiles(beforeDeletionFiles);
}
Expand Down Expand Up @@ -292,15 +349,21 @@ public Builder isStreaming(boolean isStreaming) {
return this;
}

public Builder rawFiles(List<RawFile> rawFiles) {
this.split.rawFiles = rawFiles;
public Builder rawConvertible(boolean rawConvertible) {
this.split.rawConvertible = rawConvertible;
return this;
}

public Builder withBucketPath(String bucketPath) {
this.split.bucketPath = bucketPath;
return this;
}

public DataSplit build() {
checkArgument(split.partition != null);
checkArgument(split.bucket != -1);
checkArgument(split.dataFiles != null);
checkArgument(split.bucketPath != null);

DataSplit split = new DataSplit();
split.assign(this.split);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.source;

import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;

import java.io.IOException;
import java.util.Objects;

/** Index file for data file. */
public class IndexFile {

private final String path;

public IndexFile(String path) {
this.path = path;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof IndexFile)) {
return false;
}

IndexFile other = (IndexFile) o;
return Objects.equals(path, other.path);
}

public void serialize(DataOutputView out) throws IOException {
out.writeUTF(path);
}

public static IndexFile deserialize(DataInputView in) throws IOException {
String path = in.readUTF();
return new IndexFile(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,14 @@ default Optional<List<RawFile>> convertToRawFiles() {
default Optional<List<DeletionFile>> deletionFiles() {
return Optional.empty();
}

/**
* * Return the index file of the data file, for example, bloom-filter index. All the type of
* indexes and columns will be stored in one single index file.
*
* <p>If there is no corresponding index file, the element will be null.
*/
default Optional<List<IndexFile>> indexFiles() {
return Optional.empty();
}
}
Loading

0 comments on commit 01f0764

Please sign in to comment.