Skip to content

Commit

Permalink
[core] Optimize MergeSorter to avoid spill all data (#3798)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 24, 2024
1 parent 4184489 commit 10a982e
Show file tree
Hide file tree
Showing 24 changed files with 306 additions and 231 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

import java.io.IOException;

/** Supplier to get {@link RecordReader}. */
@FunctionalInterface
public interface ReaderSupplier<T> {
RecordReader<T> get() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

/** Supplier to get {@link RecordReader} with size. */
public interface SizedReaderSupplier<T> extends ReaderSupplier<T> {

long estimateSize();
}
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
Expand All @@ -33,6 +34,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.schema.SystemColumns.LEVEL;
import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down Expand Up @@ -120,6 +122,15 @@ public static RowType schema(RowType keyType, RowType valueType) {
return new RowType(fields);
}

public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
RowType.Builder builder = RowType.builder();
schema(keyType, valueType)
.getFields()
.forEach(f -> builder.field(f.name(), f.type(), f.description()));
builder.field(LEVEL, DataTypes.INT().notNull());
return builder.build();
}

/**
* Create key-value fields, we need to add a const value to the id of value field to ensure that
* they are consistent when compared by field id. For example, there are two table with key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,11 @@ public class ChannelWithMeta {

private final FileIOChannel.ID channel;
private final int blockCount;
private final int numBytesInLastBlock;
private final long numBytes;

public ChannelWithMeta(
FileIOChannel.ID channel,
int blockCount,
int numBytesInLastBlock,
long numEstimatedBytes) {
public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, long numEstimatedBytes) {
this.channel = channel;
this.blockCount = blockCount;
this.numBytesInLastBlock = numBytesInLastBlock;
this.numBytes = numEstimatedBytes;
}

Expand All @@ -45,10 +39,6 @@ public int getBlockCount() {
return blockCount;
}

public int getNumBytesInLastBlock() {
return numBytesInLastBlock;
}

public long getNumBytes() {
return numBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;

import java.io.Closeable;
import java.io.IOException;

/**
* A {@link DataOutputView} that is backed by a {@link FileIOChannel}, making it effectively a data
* output stream. The view will compress its data before writing it in blocks to the underlying
* channel.
*/
public final class ChannelWriterOutputView extends AbstractPagedOutputView {
public final class ChannelWriterOutputView extends AbstractPagedOutputView implements Closeable {

private final MemorySegment compressedBuffer;
private final BlockCompressor compressor;
Expand Down Expand Up @@ -60,15 +61,15 @@ public FileIOChannel getChannel() {
return writer;
}

public int close() throws IOException {
@Override
public void close() throws IOException {
if (!writer.isClosed()) {
int currentPositionInSegment = getCurrentPositionInSegment();
writeCompressed(currentSegment, currentPositionInSegment);
clear();
this.writeBytes = writer.getSize();
this.writer.close();
}
return -1;
}

public void closeAndDelete() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ private void spill() throws IOException {
new ChannelWithMeta(
channel,
inMemoryBuffer.getNumRecordBuffers(),
inMemoryBuffer.getNumBytesInLastBuffer(),
channelWriterOutputView.getNumBytes()));

inMemoryBuffer.reset();
Expand Down
Loading

0 comments on commit 10a982e

Please sign in to comment.