Skip to content

Commit

Permalink
[core] Move codes of cross partition to paimon-core (#2186)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Oct 27, 2023
1 parent f9cd634 commit 997fdc7
Show file tree
Hide file tree
Showing 28 changed files with 149 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.io.IOException;

/** An iterator for Key and Value. */
public interface KeyValueIterator<K, V> {

boolean advanceNext() throws IOException;

K getKey();

V getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.utils;

import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.io.DataInputDeserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
* limitations under the License.
*/

package org.apache.paimon.flink.utils;
package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerBiFunction;

import java.util.List;
import java.util.Map;
Expand All @@ -33,15 +32,14 @@
import static org.apache.paimon.data.InternalRow.createFieldGetter;

/** Project {@link BinaryRow} fields into {@link InternalRow}. */
public class ProjectToRowDataFunction
implements SerBiFunction<InternalRow, BinaryRow, InternalRow> {
public class ProjectToRowFunction implements SerBiFunction<InternalRow, BinaryRow, InternalRow> {

private final InternalRow.FieldGetter[] fieldGetters;

private final Map<Integer, Integer> projectMapping;
private final InternalRow.FieldGetter[] projectGetters;

public ProjectToRowDataFunction(RowType rowType, List<String> projectFields) {
public ProjectToRowFunction(RowType rowType, List<String> projectFields) {
DataType[] types = rowType.getFieldTypes().toArray(new DataType[0]);
this.fieldGetters =
IntStream.range(0, types.length)
Expand Down
11 changes: 11 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ under the License.
<artifactId>paimon-core</artifactId>
<name>Paimon : Core</name>

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
Expand Down Expand Up @@ -58,6 +62,13 @@ under the License.
<version>${lz4.version}</version>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>frocksdbjni</artifactId>
<version>${frocksdbjni.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.index;
package org.apache.paimon.crosspartition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
Expand All @@ -28,10 +28,9 @@
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.flink.lookup.RocksDBStateFactory;
import org.apache.paimon.flink.lookup.RocksDBValueState;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
Expand All @@ -46,14 +45,14 @@
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.IDMapping;
import org.apache.paimon.utils.KeyValueIterator;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.OffsetRow;
import org.apache.paimon.utils.PositiveIntInt;
import org.apache.paimon.utils.PositiveIntIntSerializer;
import org.apache.paimon.utils.ProjectToRowFunction;
import org.apache.paimon.utils.TypeUtils;

import org.apache.flink.table.runtime.util.KeyValueIterator;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -82,7 +81,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable {
private transient IOManager ioManager;

private transient int bucketIndex;
private transient ProjectToRowDataFunction setPartition;
private transient ProjectToRowFunction setPartition;
private transient boolean bootstrap;
private transient BinaryExternalSortBuffer bootstrapKeys;
private transient RowBuffer bootstrapRecords;
Expand Down Expand Up @@ -120,7 +119,7 @@ public void open(

RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema());
this.bucketIndex = bootstrapType.getFieldCount() - 1;
this.setPartition = new ProjectToRowDataFunction(table.rowType(), table.partitionKeys());
this.setPartition = new ProjectToRowFunction(table.rowType(), table.partitionKeys());

CoreOptions coreOptions = table.coreOptions();
this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum();
Expand Down Expand Up @@ -219,7 +218,7 @@ public byte[] getValue() {
}
};

stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter);
stateFactory.bulkLoad(keyIndex, kvIter);
isEmpty = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.index;
package org.apache.paimon.crosspartition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.index;
package org.apache.paimon.crosspartition;

import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
Expand All @@ -26,13 +26,11 @@
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.types.RowType;

import org.apache.flink.table.data.RowData;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and partiton fields. */
/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and partiton fields. */
public class KeyPartPartitionKeyExtractor implements PartitionKeyExtractor<InternalRow> {

private final Projection partitionProjection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.lookup;

import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.utils.ListDelimitedSerializer;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink;
package org.apache.paimon.lookup;

import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.options.ConfigOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.lookup;

import org.apache.paimon.data.serializer.Serializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.lookup;

import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.io.DataInputDeserializer;
Expand Down Expand Up @@ -77,10 +77,6 @@ public RocksDBState(
.build();
}

public ColumnFamilyHandle columnFamily() {
return columnFamily;
}

public byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.lookup;

import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.utils.KeyValueIterator;

import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
Expand Down Expand Up @@ -82,39 +81,43 @@ public RocksDBStateFactory(
}
}

public void bulkLoad(ColumnFamilyHandle columnFamily, KeyValueIterator<byte[], byte[]> iterator)
throws RocksDBException, IOException {
long targetFileSize = options.targetFileSizeBase();

List<String> files = new ArrayList<>();
SstFileWriter writer = null;
long recordNum = 0;
while (iterator.advanceNext()) {
byte[] key = iterator.getKey();
byte[] value = iterator.getValue();

if (writer == null) {
writer = new SstFileWriter(new EnvOptions(), options);
String path = new File(this.path, "sst-" + (sstIndex++)).getPath();
writer.open(path);
files.add(path);
public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], byte[]> iterator)
throws IOException {
try {
long targetFileSize = options.targetFileSizeBase();

List<String> files = new ArrayList<>();
SstFileWriter writer = null;
long recordNum = 0;
while (iterator.advanceNext()) {
byte[] key = iterator.getKey();
byte[] value = iterator.getValue();

if (writer == null) {
writer = new SstFileWriter(new EnvOptions(), options);
String path = new File(this.path, "sst-" + (sstIndex++)).getPath();
writer.open(path);
files.add(path);
}

writer.put(key, value);
recordNum++;
if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
writer.finish();
writer = null;
recordNum = 0;
}
}

writer.put(key, value);
recordNum++;
if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
if (writer != null) {
writer.finish();
writer = null;
recordNum = 0;
}
}

if (writer != null) {
writer.finish();
}

if (files.size() > 0) {
db.ingestExternalFile(columnFamily, files, new IngestExternalFileOptions());
if (files.size() > 0) {
db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions());
}
} catch (Exception e) {
throw new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;
package org.apache.paimon.lookup;

import org.apache.paimon.data.serializer.Serializer;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -52,7 +51,7 @@ public V get(K key) throws IOException {
}
}

private Reference get(ByteArray keyBytes) throws RocksDBException {
private Reference get(ByteArray keyBytes) throws Exception {
Reference valueRef = cache.getIfPresent(keyBytes);
if (valueRef == null) {
valueRef = ref(db.get(columnFamily, keyBytes.bytes));
Expand All @@ -70,7 +69,7 @@ public void put(K key, V value) throws IOException {
byte[] valueBytes = serializeValue(value);
db.put(columnFamily, writeOptions, keyBytes, valueBytes);
cache.put(wrap(keyBytes), ref(valueBytes));
} catch (RocksDBException e) {
} catch (Exception e) {
throw new IOException(e);
}
}
Expand All @@ -83,7 +82,7 @@ public void delete(K key) throws IOException {
db.delete(columnFamily, writeOptions, keyBytes);
cache.put(keyByteArray, ref(null));
}
} catch (RocksDBException e) {
} catch (Exception e) {
throw new IOException(e);
}
}
Expand Down
Loading

0 comments on commit 997fdc7

Please sign in to comment.