Skip to content

Commit

Permalink
[core] Introduce SimpleFileEntry to reduce memory in FileStoreCommit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 5, 2024
1 parent 59451f7 commit 6eec038
Show file tree
Hide file tree
Showing 14 changed files with 630 additions and 252 deletions.
178 changes: 178 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/** Entry representing a file. */
public interface FileEntry {

FileKind kind();

BinaryRow partition();

int bucket();

int level();

String fileName();

Identifier identifier();

BinaryRow minKey();

BinaryRow maxKey();

/**
* The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
* file.
*/
class Identifier {
public final BinaryRow partition;
public final int bucket;
public final int level;
public final String fileName;

/* Cache the hash code for the string */
private Integer hash;

public Identifier(BinaryRow partition, int bucket, int level, String fileName) {
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Identifier)) {
return false;
}
Identifier that = (Identifier) o;
return Objects.equals(partition, that.partition)
&& bucket == that.bucket
&& level == that.level
&& Objects.equals(fileName, that.fileName);
}

@Override
public int hashCode() {
if (hash == null) {
hash = Objects.hash(partition, bucket, level, fileName);
}
return hash;
}

@Override
public String toString() {
return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
}

public String toString(FileStorePathFactory pathFactory) {
return pathFactory.getPartitionString(partition)
+ ", bucket "
+ bucket
+ ", level "
+ level
+ ", file "
+ fileName;
}
}

static <T extends FileEntry> Collection<T> mergeEntries(Iterable<T> entries) {
LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
mergeEntries(entries, map);
return map.values();
}

static void mergeEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
Map<Identifier, ManifestEntry> map) {
List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
manifestFiles.stream()
.map(
manifestFileMeta ->
CompletableFuture.supplyAsync(
() ->
manifestFile.read(
manifestFileMeta.fileName()),
FileUtils.COMMON_IO_FORK_JOIN_POOL))
.collect(Collectors.toList());

try {
for (CompletableFuture<List<ManifestEntry>> taskResult : manifestReadFutures) {
mergeEntries(taskResult.get(), map);
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to read manifest file.", e);
}
}

static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifier, T> map) {
for (T entry : entries) {
Identifier identifier = entry.identifier();
switch (entry.kind()) {
case ADD:
Preconditions.checkState(
!map.containsKey(identifier),
"Trying to add file %s which is already added.",
identifier);
map.put(identifier, entry);
break;
case DELETE:
// each dataFile will only be added once and deleted once,
// if we know that it is added before then both add and delete entry can be
// removed because there won't be further operations on this file,
// otherwise we have to keep the delete entry because the add entry must be
// in the previous manifest files
if (map.containsKey(identifier)) {
map.remove(identifier);
} else {
map.put(identifier, entry);
}
break;
default:
throw new UnsupportedOperationException(
"Unknown value kind " + entry.kind().name());
}
}
}

static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
for (T entry : entries) {
Preconditions.checkState(
entry.kind() != FileKind.DELETE,
"Trying to delete file %s which is not previously added.",
entry.fileName());
}
}
}
Loading

0 comments on commit 6eec038

Please sign in to comment.