Skip to content

Commit

Permalink
[core] Add hint in local lookup file tracking with remote file (#3814)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Jul 31, 2024
1 parent e917af0 commit 7c4b68e
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -77,4 +78,22 @@ public static Map<String, Object> convertSpecToInternal(
}
return partValues;
}

public static String paritionToString(
RowType partitionType, BinaryRow partition, String delimiter) {
InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < getters.length; i++) {
Object part = getters[i].getFieldOrNull(partition);
if (part != null) {
builder.append(part);
} else {
builder.append("null");
}
if (i != getters.length - 1) {
builder.append(delimiter);
}
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link InternalRowPartitionComputer}. */
public class InternalRowPartitionComputerTest {

@Test
public void testPartitionToString() {
RowType rowType = RowType.of();
BinaryRow binaryRow = new BinaryRow(0);
BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-"))
.isEqualTo("");

rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
binaryRow = new BinaryRow(2);
writer = new BinaryRowWriter(binaryRow);
writer.writeString(0, BinaryString.fromString("20240731"));
writer.writeInt(1, 10);
assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-"))
.isEqualTo("20240731-10");

rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
binaryRow = new BinaryRow(2);
writer = new BinaryRowWriter(binaryRow);
writer.setNullAt(0);
writer.writeInt(1, 10);
assertThat(InternalRowPartitionComputer.paritionToString(rowType, binaryRow, "-"))
.isEqualTo("null-10");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
schemaManager,
schema,
commitUser,
partitionType,
keyType,
valueType,
keyComparatorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public interface FileChannelManager extends AutoCloseable {
/** Creates an ID identifying an underlying file channel and returns it. */
FileIOChannel.ID createChannel();

/** Creates an ID identifying an underlying file channel and returns it. */
FileIOChannel.ID createChannel(String prefix);

/** Creates an enumerator for channels that logically belong together and returns it. */
FileIOChannel.Enumerator createChannelEnumerator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public ID createChannel() {
return new ID(paths[num], num, random);
}

@Override
public ID createChannel(String prefix) {
int num = (int) (nextPath.getAndIncrement() % paths.length);
return new ID(paths[num], num, prefix, random);
}

@Override
public Enumerator createChannelEnumerator() {
return new Enumerator(paths, random);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public ID(File basePath, int bucketNum, Random random) {
this.bucketNum = bucketNum;
}

public ID(File basePath, int bucketNum, String prefix, Random random) {
this.path = new File(basePath, prefix + "-" + randomString(random) + ".channel");
this.bucketNum = bucketNum;
}

/** Returns the path to the underlying temporary file. */
public String getPath() {
return path.getAbsolutePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface IOManager extends AutoCloseable {

ID createChannel();

ID createChannel(String prefix);

String[] tempDirs();

Enumerator createChannelEnumerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public ID createChannel() {
return fileChannelManager.createChannel();
}

@Override
public ID createChannel(String prefix) {
return fileChannelManager.createChannel(prefix);
}

@Override
public String[] tempDirs() {
return tempDirs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,16 @@ private static void removalCallback(String file, LookupFile lookupFile, RemovalC
}
}
}

public static String localFilePrefix(
String partition, int bucket, String remoteFileName, int length) {
String identifier;
if (partition.isEmpty()) {
identifier = String.format("%s-%s", bucket, remoteFileName);
} else {
identifier = String.format("%s-%s-%s", partition, bucket, remoteFileName);
}

return identifier.substring(0, Math.min(identifier.length(), length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
Expand All @@ -61,7 +60,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final RowCompactedSerializer keySerializer;
private final ValueProcessor<T> valueProcessor;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;
private final Supplier<File> localFileFactory;
private final Function<String, File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;

Expand All @@ -74,7 +73,7 @@ public LookupLevels(
RowType keyType,
ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
Expand Down Expand Up @@ -145,7 +144,7 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
}

private LookupFile createLookupFile(DataFileMeta file) throws IOException {
File localFile = localFileFactory.get();
File localFile = localFileFactory.apply(file.fileName());
if (!localFile.createNewFile()) {
throw new IOException("Can not create new file: " + localFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand Down Expand Up @@ -104,6 +105,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final FileIO fileIO;
private final RowType keyType;
private final RowType valueType;
private final RowType partitionType;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private Cache<String, LookupFile> lookupFileCache;

Expand All @@ -112,6 +114,7 @@ public KeyValueFileStoreWrite(
SchemaManager schemaManager,
TableSchema schema,
String commitUser,
RowType partitionType,
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
Expand All @@ -136,6 +139,7 @@ public KeyValueFileStoreWrite(
deletionVectorsMaintainerFactory,
tableName);
this.fileIO = fileIO;
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
this.udsComparatorSupplier = udsComparatorSupplier;
Expand Down Expand Up @@ -325,7 +329,7 @@ private MergeTreeCompactRewriter createRewriter(
return new LookupMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
createLookupLevels(levels, processor, lookupReaderFactory),
createLookupLevels(partition, bucket, levels, processor, lookupReaderFactory),
readerFactory,
writerFactory,
keyComparator,
Expand All @@ -347,6 +351,8 @@ private MergeTreeCompactRewriter createRewriter(
}

private <T> LookupLevels<T> createLookupLevels(
BinaryRow partition,
int bucket,
Levels levels,
LookupLevels.ValueProcessor<T> valueProcessor,
FileReaderFactory<KeyValue> readerFactory) {
Expand All @@ -366,14 +372,22 @@ private <T> LookupLevels<T> createLookupLevels(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}

return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
keyType,
valueProcessor,
readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
file ->
ioManager
.createChannel(
LookupFile.localFilePrefix(
InternalRowPartitionComputer.paritionToString(
partitionType, partition, "-"),
bucket,
file,
100))
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -75,6 +76,8 @@ public class LocalTableQuery implements TableQuery {

@Nullable private Cache<String, LookupFile> lookupFileCache;

private final RowType partitionType;

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
Expand All @@ -86,6 +89,7 @@ public LocalTableQuery(FileStoreTable table) {
KeyValueFileStore store = (KeyValueFileStore) tableStore;

this.readerFactoryBuilder = store.newReaderFactoryBuilder();
this.partitionType = table.schema().logicalPartitionType();
RowType keyType = readerFactoryBuilder.keyType();
this.keyComparatorSupplier = new KeyComparatorSupplier(readerFactoryBuilder.keyType());
this.lookupStoreFactory =
Expand Down Expand Up @@ -155,9 +159,18 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
file.fileName(),
file.fileSize(),
file.level()),
() ->
file ->
Preconditions.checkNotNull(ioManager, "IOManager is required.")
.createChannel()
.createChannel(
LookupFile.localFilePrefix(
InternalRowPartitionComputer
.paritionToString(
partitionType,
partition,
"-"),
bucket,
file,
100))
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private LookupLevels<Boolean> createContainsLevels(Levels levels, MemorySize max
createReaderFactory()
.createRecordReader(
0, file.fileName(), file.fileSize(), file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.01),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link LookupFile}. */
public class LookupFileTest {

@Test
public void testLocalFilePrefix() {
assertThat(
LookupFile.localFilePrefix(
"2024073105",
10,
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
100))
.isEqualTo("2024073105-10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
assertThat(
LookupFile.localFilePrefix(
"", 10, "data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", 100))
.isEqualTo("10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
assertThat(
LookupFile.localFilePrefix(
"2024073105",
10,
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
20))
.isEqualTo("2024073105-10-data-c");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private LookupLevels<KeyValue> createLookupLevels(Levels levels, MemorySize maxD
createReaderFactory()
.createRecordReader(
0, file.fileName(), file.fileSize(), file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.05),
Expand Down
Loading

0 comments on commit 7c4b68e

Please sign in to comment.