Skip to content

Commit

Permalink
add diff sequence generate use hashcode
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Dec 19, 2023
1 parent f7e81aa commit db137c8
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 28 deletions.
17 changes: 16 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

public static final ConfigOption<Boolean> SEQUENCE_USE_HASH =
key("sequence.use-hash")
.booleanType()
.defaultValue(false)
.withDescription(
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
.stringType()
Expand Down Expand Up @@ -1226,6 +1234,14 @@ public Optional<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
}

public Boolean sequenceUseHash() {
Boolean useHash = options.get(SEQUENCE_USE_HASH);
if (useHash == null) {
return false;
}
return useHash;
}

public List<String> sequenceAutoPadding() {
String padding = options.get(SEQUENCE_AUTO_PADDING);
if (padding == null) {
Expand Down Expand Up @@ -1362,7 +1378,6 @@ public boolean supportDeleteByType() {
return options.get(SUPPORT_DELETE_BY_TYPE);
}


/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ public static RecordReader<KeyValue> readDiff(
RecordReader<KeyValue> afterReader,
Comparator<InternalRow> keyComparator,
MergeSorter sorter,
boolean keepDelete)
boolean keepDelete,
boolean sequenceUseHash)
throws IOException {
return sorter.mergeSort(
Arrays.asList(
() -> wrapLevelToReader(beforeReader, BEFORE_LEVEL),
() -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
keyComparator,
new DiffMerger(keepDelete));
new DiffMerger(keepDelete, sequenceUseHash));
}

private static RecordReader<KeyValue> wrapLevelToReader(
Expand Down Expand Up @@ -94,10 +95,13 @@ private static class DiffMerger implements MergeFunctionWrapper<KeyValue> {

private final boolean keepDelete;

private final boolean sequenceUseHash;

private final List<KeyValue> kvs = new ArrayList<>();

public DiffMerger(boolean keepDelete) {
public DiffMerger(boolean keepDelete, boolean sequenceUseHash) {
this.keepDelete = keepDelete;
this.sequenceUseHash = sequenceUseHash;
}

@Override
Expand All @@ -123,6 +127,12 @@ public KeyValue getResult() {
return kv;
}
} else if (kvs.size() == 2) {
if (sequenceUseHash) {
if (kvs.get(0).sequenceNumber() == kvs.get(1).sequenceNumber()) {
return null;
}
kvs.sort(Comparator.comparingLong(KeyValue::level));
}
KeyValue latest = kvs.get(1);
if (latest.level() == AFTER_LEVEL) {
return latest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ public void expireUntil(long earliestId, long endExclusiveId) {
endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId);

if (LOG.isInfoEnabled()) {
LOG.info(
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
LOG.info("Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {

private boolean forceKeepDelete = false;

private boolean sequenceUseHash = false;

public KeyValueFileStoreRead(
FileIO fileIO,
SchemaManager schemaManager,
Expand Down Expand Up @@ -115,6 +117,7 @@ public KeyValueFileStoreRead(
this.mergeSorter =
new MergeSorter(
CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null);
this.sequenceUseHash = options.sequenceUseHash();
}

public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
Expand Down Expand Up @@ -213,7 +216,8 @@ private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit spli
split.partition(), split.bucket(), split.dataFiles(), false),
keyComparator,
mergeSorter,
forceKeepDelete);
forceKeepDelete,
sequenceUseHash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -193,16 +192,19 @@ public TableWriteImpl<KeyValue> newWrite(
final SequenceGenerator sequenceGenerator =
SequenceGenerator.create(schema(), store().options());
final KeyValue kv = new KeyValue();
final boolean sequenceUseHash = store().options().sequenceUseHash();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
if(store().options().supportDeleteByType()){
if (store().options().supportDeleteByType()) {
setRowKindByBinlogType(record.row());
}
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
? sequenceUseHash
? record.row().hashCode()
: KeyValue.UNKNOWN_SEQUENCE
: sequenceGenerator.generate(record.row());
return kv.replace(
record.primaryKey(),
Expand All @@ -212,14 +214,14 @@ record -> {
});
}

public void setRowKindByBinlogType(InternalRow row){
public void setRowKindByBinlogType(InternalRow row) {
RowType rowType = schema().logicalRowType();
int index = rowType.getFieldNames().indexOf("binlog_eventtype");
if(index <0){
if (index < 0) {
return;
}
String binlog_eventtype = row.getString(index).toString();
if(binlog_eventtype.equalsIgnoreCase("delete")){
String binlog_eventtype = row.getString(index).toString();
if (binlog_eventtype.equalsIgnoreCase("delete")) {
row.setRowKind(RowKind.DELETE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Result scan(SnapshotReader snapshotReader) {
startupMillis);
return new NoSnapshot();
}
LOG.info(
"get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis);
LOG.info("get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis);
return StartingScanner.fromPlan(
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.Objects;

import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;

Expand Down Expand Up @@ -133,6 +135,11 @@ public InternalRow getRow(int pos, int numFields) {
return new FlinkRowWrapper(row.getRow(pos, numFields));
}

@Override
public int hashCode() {
return Objects.hash(row);
}

private static class FlinkArrayWrapper implements InternalArray {

private final org.apache.flink.table.data.ArrayData array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.paimon.flink.sink.index;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.catalog.Identifier;
Expand All @@ -30,6 +26,11 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;

import java.io.File;
Expand All @@ -43,13 +44,12 @@
/** Test for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerTest extends TableTestBase {

private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine)
throws Exception {
private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine) throws Exception {
return createAssigner(mergeEngine, false);
}

private GlobalIndexAssigner<RowData> createAssigner(
MergeEngine mergeEngine, boolean enableTtl) throws Exception {
private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine, boolean enableTtl)
throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
Expand Down Expand Up @@ -89,8 +89,7 @@ private IOManager ioManager() {
}

private void innerTestBucketAssign(boolean enableTtl) throws Exception {
GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
GlobalIndexAssigner<RowData> assigner = createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
List<Integer> output = new ArrayList<>();
assigner.open(
ioManager(),
Expand Down

0 comments on commit db137c8

Please sign in to comment.