From db137c8d35df6ccfde8d3f4543cab58bdf8a575a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Tue, 19 Dec 2023 09:12:37 +0800 Subject: [PATCH] add diff sequence generate use hashcode --- .../java/org/apache/paimon/CoreOptions.java | 17 ++++++++++++++++- .../apache/paimon/operation/DiffReader.java | 16 +++++++++++++--- .../paimon/operation/FileStoreExpireImpl.java | 3 +-- .../operation/KeyValueFileStoreRead.java | 6 +++++- .../table/ChangelogWithKeyFileStoreTable.java | 16 +++++++++------- .../StaticFromTimestampStartingScanner.java | 3 +-- .../apache/paimon/flink/FlinkRowWrapper.java | 7 +++++++ .../flink/sink/DynamicBucketCompactSink.java | 3 +-- .../sink/index/GlobalIndexAssignerTest.java | 19 +++++++++---------- 9 files changed, 62 insertions(+), 28 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 2d5c848aac00..9ba675ef0fa6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -416,6 +416,14 @@ public class CoreOptions implements Serializable { "The field that generates the sequence number for primary key table," + " the sequence number determines which data is the most recent."); + public static final ConfigOption SEQUENCE_USE_HASH = + key("sequence.use-hash") + .booleanType() + .defaultValue(false) + .withDescription( + "The field that generates the sequence number for primary key table," + + " the sequence number determines which data is the most recent."); + public static final ConfigOption SEQUENCE_AUTO_PADDING = key("sequence.auto-padding") .stringType() @@ -1226,6 +1234,14 @@ public Optional sequenceField() { return options.getOptional(SEQUENCE_FIELD); } + public Boolean sequenceUseHash() { + Boolean useHash = options.get(SEQUENCE_USE_HASH); + if (useHash == null) { + return false; + } + return useHash; + } + public List sequenceAutoPadding() { String padding = options.get(SEQUENCE_AUTO_PADDING); if (padding == null) { @@ -1362,7 +1378,6 @@ public boolean supportDeleteByType() { return options.get(SUPPORT_DELETE_BY_TYPE); } - /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java index 324bff9351d1..b6fd561af214 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java @@ -44,14 +44,15 @@ public static RecordReader readDiff( RecordReader afterReader, Comparator keyComparator, MergeSorter sorter, - boolean keepDelete) + boolean keepDelete, + boolean sequenceUseHash) throws IOException { return sorter.mergeSort( Arrays.asList( () -> wrapLevelToReader(beforeReader, BEFORE_LEVEL), () -> wrapLevelToReader(afterReader, AFTER_LEVEL)), keyComparator, - new DiffMerger(keepDelete)); + new DiffMerger(keepDelete, sequenceUseHash)); } private static RecordReader wrapLevelToReader( @@ -94,10 +95,13 @@ private static class DiffMerger implements MergeFunctionWrapper { private final boolean keepDelete; + private final boolean sequenceUseHash; + private final List kvs = new ArrayList<>(); - public DiffMerger(boolean keepDelete) { + public DiffMerger(boolean keepDelete, boolean sequenceUseHash) { this.keepDelete = keepDelete; + this.sequenceUseHash = sequenceUseHash; } @Override @@ -123,6 +127,12 @@ public KeyValue getResult() { return kv; } } else if (kvs.size() == 2) { + if (sequenceUseHash) { + if (kvs.get(0).sequenceNumber() == kvs.get(1).sequenceNumber()) { + return null; + } + kvs.sort(Comparator.comparingLong(KeyValue::level)); + } KeyValue latest = kvs.get(1); if (latest.level() == AFTER_LEVEL) { return latest; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java index 3e0a81df23ef..b44a8817092e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java @@ -165,8 +165,7 @@ public void expireUntil(long earliestId, long endExclusiveId) { endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId); if (LOG.isInfoEnabled()) { - LOG.info( - "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); + LOG.info("Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } List taggedSnapshots = tagManager.taggedSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index d3a3615bcd02..e61e59f2fc7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -85,6 +85,8 @@ public class KeyValueFileStoreRead implements FileStoreRead { private boolean forceKeepDelete = false; + private boolean sequenceUseHash = false; + public KeyValueFileStoreRead( FileIO fileIO, SchemaManager schemaManager, @@ -115,6 +117,7 @@ public KeyValueFileStoreRead( this.mergeSorter = new MergeSorter( CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null); + this.sequenceUseHash = options.sequenceUseHash(); } public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) { @@ -213,7 +216,8 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli split.partition(), split.bucket(), split.dataFiles(), false), keyComparator, mergeSorter, - forceKeepDelete); + forceKeepDelete, + sequenceUseHash); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java index 1b2862b46226..1a56f0e628ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java @@ -44,7 +44,6 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.ValueContentRowDataRecordIterator; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -193,16 +192,19 @@ public TableWriteImpl newWrite( final SequenceGenerator sequenceGenerator = SequenceGenerator.create(schema(), store().options()); final KeyValue kv = new KeyValue(); + final boolean sequenceUseHash = store().options().sequenceUseHash(); return new TableWriteImpl<>( store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), record -> { - if(store().options().supportDeleteByType()){ + if (store().options().supportDeleteByType()) { setRowKindByBinlogType(record.row()); } long sequenceNumber = sequenceGenerator == null - ? KeyValue.UNKNOWN_SEQUENCE + ? sequenceUseHash + ? record.row().hashCode() + : KeyValue.UNKNOWN_SEQUENCE : sequenceGenerator.generate(record.row()); return kv.replace( record.primaryKey(), @@ -212,14 +214,14 @@ record -> { }); } - public void setRowKindByBinlogType(InternalRow row){ + public void setRowKindByBinlogType(InternalRow row) { RowType rowType = schema().logicalRowType(); int index = rowType.getFieldNames().indexOf("binlog_eventtype"); - if(index <0){ + if (index < 0) { return; } - String binlog_eventtype = row.getString(index).toString(); - if(binlog_eventtype.equalsIgnoreCase("delete")){ + String binlog_eventtype = row.getString(index).toString(); + if (binlog_eventtype.equalsIgnoreCase("delete")) { row.setRowKind(RowKind.DELETE); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java index 60e39f0d1737..833627850a5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java @@ -56,8 +56,7 @@ public Result scan(SnapshotReader snapshotReader) { startupMillis); return new NoSnapshot(); } - LOG.info( - "get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis); + LOG.info("get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis); return StartingScanner.fromPlan( snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index 08359784db60..480f76862960 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -31,6 +31,8 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.LogicalType; +import java.util.Objects; + import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; @@ -133,6 +135,11 @@ public InternalRow getRow(int pos, int numFields) { return new FlinkRowWrapper(row.getRow(pos, numFields)); } + @Override + public int hashCode() { + return Objects.hash(row); + } + private static class FlinkArrayWrapper implements InternalArray { private final org.apache.flink.table.data.ArrayData array; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java index adef4554e5ff..f92de4449994 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.table.data.RowData; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -27,6 +25,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java index f50214c1447d..9d6e68b7ba4d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java @@ -18,10 +18,6 @@ package org.apache.paimon.flink.sink.index; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.types.RowKind; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.catalog.Identifier; @@ -30,6 +26,11 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.Test; import java.io.File; @@ -43,13 +44,12 @@ /** Test for {@link GlobalIndexAssigner}. */ public class GlobalIndexAssignerTest extends TableTestBase { - private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine) - throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine) throws Exception { return createAssigner(mergeEngine, false); } - private GlobalIndexAssigner createAssigner( - MergeEngine mergeEngine, boolean enableTtl) throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine, boolean enableTtl) + throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(CoreOptions.MERGE_ENGINE, mergeEngine); @@ -89,8 +89,7 @@ private IOManager ioManager() { } private void innerTestBucketAssign(boolean enableTtl) throws Exception { - GlobalIndexAssigner assigner = - createAssigner(MergeEngine.DEDUPLICATE, enableTtl); + GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, enableTtl); List output = new ArrayList<>(); assigner.open( ioManager(),