From 1502e83bfcbb529557dc9fa4adc336ec30aa961d Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Wed, 6 Mar 2024 16:40:07 +0800 Subject: [PATCH] [core] Redefine user defined sequence fields (#2945) --- .../primary-key-table/sequence-rowkind.md | 24 +- .../generated/core_configuration.html | 6 - .../java/org/apache/paimon/CoreOptions.java | 32 +-- .../java/org/apache/paimon/types/RowType.java | 23 ++ .../org/apache/paimon/KeyValueFileStore.java | 11 + .../paimon/mergetree/MergeTreeReaders.java | 3 +- .../paimon/mergetree/MergeTreeWriter.java | 12 +- .../mergetree/SortBufferWriteBuffer.java | 33 ++- .../compact/ChangelogMergeTreeRewriter.java | 14 +- ...FullChangelogMergeTreeCompactRewriter.java | 5 + .../LookupMergeTreeCompactRewriter.java | 5 + .../compact/MergeTreeCompactRewriter.java | 7 + .../compact/PartialUpdateMergeFunction.java | 156 +++++++++- .../operation/KeyValueFileStoreRead.java | 43 +-- .../operation/KeyValueFileStoreWrite.java | 32 ++- .../paimon/schema/SchemaValidation.java | 22 +- .../paimon/table/query/LocalTableQuery.java | 2 +- .../paimon/table/sink/SequenceGenerator.java | 236 --------------- .../utils/UserDefinedSeqComparator.java | 81 ++++++ .../paimon/mergetree/MergeTreeTestBase.java | 2 + .../SortBufferWriteBufferTestBase.java | 1 + .../paimon/schema/SchemaManagerTest.java | 2 +- .../table/PrimaryKeyFileStoreTableTest.java | 54 ---- .../table/sink/SequenceGeneratorTest.java | 272 ------------------ .../paimon/table/system/FilesTableTest.java | 10 +- .../flink/lookup/FullCacheLookupTable.java | 53 ++-- .../flink/lookup/LookupStreamingReader.java | 12 +- .../flink/lookup/NoPrimaryKeyLookupTable.java | 7 +- .../flink/lookup/PrimaryKeyLookupTable.java | 8 +- .../lookup/SecondaryIndexLookupTable.java | 8 +- .../paimon/flink/sink/LocalMergeOperator.java | 16 +- .../paimon/flink/lookup/LookupTableTest.java | 70 ++--- .../source/TestChangelogDataReadWrite.java | 24 +- 33 files changed, 483 insertions(+), 803 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java delete mode 100644 paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java diff --git a/docs/content/concepts/primary-key-table/sequence-rowkind.md b/docs/content/concepts/primary-key-table/sequence-rowkind.md index 80f20dce970f..65df770bb506 100644 --- a/docs/content/concepts/primary-key-table/sequence-rowkind.md +++ b/docs/content/concepts/primary-key-table/sequence-rowkind.md @@ -41,32 +41,18 @@ CREATE TABLE my_table ( pk BIGINT PRIMARY KEY NOT ENFORCED, v1 DOUBLE, v2 BIGINT, - dt TIMESTAMP + update_time TIMESTAMP ) WITH ( - 'sequence.field' = 'dt' + 'sequence.field' = 'update_time' ); ``` {{< /tab >}} {{< /tabs >}} -The record with the largest `sequence.field` value will be the last to merge, regardless of the input order. +The record with the largest `sequence.field` value will be the last to merge, if the values are the same, the input +order will be used to determine which one is the last one. -**Sequence Auto Padding**: - -When the record is updated or deleted, the `sequence.field` must become larger and cannot remain unchanged. -For -U and +U, their sequence-fields must be different. If you cannot meet this requirement, Paimon provides -option to automatically pad the sequence field for you. - -1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value for -U and +U, just like "`op_ts`" - (the time that the change was made in the database) in Mysql Binlog. It is recommended to use the automatic - padding for row kind flag, which will automatically distinguish between -U (-D) and +U (+I). - -2. Insufficient precision: If the provided `sequence.field` doesn't meet the precision, like a rough second or - millisecond, you can set `sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the precision - of sequence number will be made up to microsecond by incremental id (Calculate within a single bucket). - -3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add the micro to the second, and then - pad the row kind flag. +You can define multiple fields for `sequence.field`, for example `'update_time,flag'`, multiple fields will be compared in order. ## Row Kind Field diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 2e25fe96b112..a123d3a77a3e 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -527,12 +527,6 @@ Long Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen. - -
sequence.auto-padding
- (none) - String - Specify the way of padding precision, if the provided sequence field is used to indicate "time" but doesn't meet the precise. -
sequence.field
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 6430c919ea83..201525ab04ed 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -486,26 +486,6 @@ public class CoreOptions implements Serializable { "The field that generates the row kind for primary key table," + " the row kind determines which data is '+I', '-U', '+U' or '-D'."); - public static final ConfigOption SEQUENCE_AUTO_PADDING = - key("sequence.auto-padding") - .stringType() - .noDefaultValue() - .withDescription( - Description.builder() - .text( - "Specify the way of padding precision, if the provided sequence field is used to indicate \"time\" but doesn't meet the precise.") - .list( - text("You can specific:"), - text( - "1. \"row-kind-flag\": Pads a bit flag to indicate whether it is retract (0) or add (1) message."), - text( - "2. \"second-to-micro\": Pads the sequence field that indicates time with precision of seconds to micro-second."), - text( - "3. \"millis-to-micro\": Pads the sequence field that indicates time with precision of milli-second to micro-second."), - text( - "4. Composite pattern: for example, \"second-to-micro,row-kind-flag\".")) - .build()); - public static final ConfigOption SCAN_MODE = key("scan.mode") .enumType(StartupMode.class) @@ -1486,22 +1466,14 @@ public Integer dynamicBucketAssignerParallelism() { return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM); } - public Optional sequenceField() { - return options.getOptional(SEQUENCE_FIELD); + public Optional> sequenceField() { + return options.getOptional(SEQUENCE_FIELD).map(s -> Arrays.asList(s.split(","))); } public Optional rowkindField() { return options.getOptional(ROWKIND_FIELD); } - public List sequenceAutoPadding() { - String padding = options.get(SEQUENCE_AUTO_PADDING); - if (padding == null) { - return Collections.emptyList(); - } - return Arrays.asList(padding.split(",")); - } - public boolean writeOnly() { return options.get(WRITE_ONLY); } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index 1462d330e3c8..fe11976de698 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -99,6 +99,29 @@ public int getFieldIndex(String fieldName) { return -1; } + public boolean containsField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return true; + } + } + return false; + } + + public boolean notContainsField(String fieldName) { + return !containsField(fieldName); + } + + public DataField getField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return field; + } + } + + throw new RuntimeException("Cannot find field: " + fieldName); + } + @Override public DataType copy(boolean isNullable) { return new RowType( diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 373bce35c6a4..727bf8f82b7a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -37,10 +37,14 @@ import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.KeyComparatorSupplier; +import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.paimon.utils.ValueEqualiserSupplier; +import javax.annotation.Nullable; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -123,6 +127,7 @@ public KeyValueFileStoreRead newRead() { keyType, valueType, newKeyComparator(), + userDefinedSeqComparator(), mfFactory, newReaderFactoryBuilder()); } @@ -140,6 +145,11 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { options); } + @Nullable + private FieldsComparator userDefinedSeqComparator() { + return UserDefinedSeqComparator.create(valueType, options); + } + @Override public KeyValueFileStoreWrite newWrite(String commitUser) { return newWrite(commitUser, null); @@ -159,6 +169,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma keyType, valueType, keyComparatorSupplier, + this::userDefinedSeqComparator, valueEqualiserSupplier, mfFactory, pathFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java index 186371b81742..1485b72b2c16 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java @@ -47,6 +47,7 @@ public static RecordReader readerForMergeTree( boolean dropDelete, KeyValueFileReaderFactory readerFactory, Comparator userKeyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunction mergeFunction, MergeSorter mergeSorter) throws IOException { @@ -58,7 +59,7 @@ public static RecordReader readerForMergeTree( section, readerFactory, userKeyComparator, - null, + userDefinedSeqComparator, new ReducerMergeFunctionWrapper(mergeFunction), mergeSorter)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 046fad893f2e..537d838d0cbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -33,9 +33,9 @@ import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; -import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.RecordWriter; import javax.annotation.Nullable; @@ -66,7 +66,7 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private final KeyValueFileWriterFactory writerFactory; private final boolean commitForceCompact; private final ChangelogProducer changelogProducer; - @Nullable private final SequenceGenerator sequenceGenerator; + @Nullable private final FieldsComparator userDefinedSeqComparator; private final LinkedHashSet newFiles; private final LinkedHashSet newFilesChangelog; @@ -90,7 +90,7 @@ public MergeTreeWriter( boolean commitForceCompact, ChangelogProducer changelogProducer, @Nullable CommitIncrement increment, - @Nullable SequenceGenerator sequenceGenerator) { + @Nullable FieldsComparator userDefinedSeqComparator) { this.writeBufferSpillable = writeBufferSpillable; this.sortMaxFan = sortMaxFan; this.sortCompression = sortCompression; @@ -104,7 +104,7 @@ public MergeTreeWriter( this.writerFactory = writerFactory; this.commitForceCompact = commitForceCompact; this.changelogProducer = changelogProducer; - this.sequenceGenerator = sequenceGenerator; + this.userDefinedSeqComparator = userDefinedSeqComparator; this.newFiles = new LinkedHashSet<>(); this.newFilesChangelog = new LinkedHashSet<>(); @@ -138,6 +138,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { new SortBufferWriteBuffer( keyType, valueType, + userDefinedSeqComparator, memoryPool, writeBufferSpillable, sortMaxFan, @@ -148,9 +149,6 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { @Override public void write(KeyValue kv) throws Exception { long sequenceNumber = newSequenceNumber(); - if (sequenceGenerator != null) { - sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(), sequenceNumber); - } boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value()); if (!success) { flushWriteBuffer(false, false); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java index ae877f5cb826..f0e9fcf07cf8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java @@ -40,6 +40,8 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.MutableObjectIterator; import javax.annotation.Nullable; @@ -61,6 +63,7 @@ public class SortBufferWriteBuffer implements WriteBuffer { public SortBufferWriteBuffer( RowType keyType, RowType valueType, + @Nullable FieldsComparator userDefinedSeqComparator, MemorySegmentPool memoryPool, boolean spillable, int sortMaxFan, @@ -70,17 +73,33 @@ public SortBufferWriteBuffer( this.valueType = valueType; this.serializer = new KeyValueSerializer(keyType, valueType); - // user key + sequenceNumber - List sortKeyTypes = new ArrayList<>(keyType.getFieldTypes()); - sortKeyTypes.add(new BigIntType(false)); + // key fields + IntStream sortFields = IntStream.range(0, keyType.getFieldCount()); + + // user define sequence fields + if (userDefinedSeqComparator != null) { + IntStream udsFields = + IntStream.of(userDefinedSeqComparator.compareFields()) + .map(operand -> operand + keyType.getFieldCount() + 2); + sortFields = IntStream.concat(sortFields, udsFields); + } + + // sequence field + sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())); + + int[] sortFieldArray = sortFields.toArray(); + + // row type + List fieldTypes = new ArrayList<>(keyType.getFieldTypes()); + fieldTypes.add(new BigIntType(false)); + fieldTypes.add(new TinyIntType(false)); + fieldTypes.addAll(valueType.getFieldTypes()); - // for sort binary buffer - int[] sortFields = IntStream.range(0, sortKeyTypes.size()).toArray(); NormalizedKeyComputer normalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer( - sortKeyTypes, sortFields, "MemTableKeyComputer"); + fieldTypes, sortFieldArray, "MemTableKeyComputer"); RecordComparator keyComparator = - CodeGenUtils.newRecordComparator(sortKeyTypes, sortFields, "MemTableComparator"); + CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, "MemTableComparator"); if (memoryPool.freePages() < 3) { throw new IllegalArgumentException( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 99f5f052d13b..f338ee056268 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.utils.FieldsComparator; + +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; @@ -51,9 +54,16 @@ public ChangelogMergeTreeRewriter( KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter) { - super(readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter); + super( + readerFactory, + writerFactory, + keyComparator, + userDefinedSeqComparator, + mfFactory, + mergeSorter); this.maxLevel = maxLevel; this.mergeEngine = mergeEngine; } @@ -112,7 +122,7 @@ private CompactResult rewriteChangelogCompaction( section, readerFactory, keyComparator, - null, + userDefinedSeqComparator, createMergeWrapper(outputLevel), mergeSorter)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java index 553b0ff5ddb2..3ed7a5d69822 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java @@ -27,8 +27,11 @@ import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.Preconditions; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -48,6 +51,7 @@ public FullChangelogMergeTreeCompactRewriter( KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter, RecordEqualiser valueEqualiser, @@ -58,6 +62,7 @@ public FullChangelogMergeTreeCompactRewriter( readerFactory, writerFactory, keyComparator, + userDefinedSeqComparator, mfFactory, mergeSorter); this.valueEqualiser = valueEqualiser; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index b88c26b99eb7..5335ba82ab66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -28,6 +28,9 @@ import org.apache.paimon.mergetree.LookupLevels; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.utils.FieldsComparator; + +import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; @@ -54,6 +57,7 @@ public LookupMergeTreeCompactRewriter( KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter, MergeFunctionWrapperFactory wrapperFactory) { @@ -63,6 +67,7 @@ public LookupMergeTreeCompactRewriter( readerFactory, writerFactory, keyComparator, + userDefinedSeqComparator, mfFactory, mergeSorter); this.lookupLevels = lookupLevels; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index 228a3aa117b3..bd3338bf74b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -30,6 +30,9 @@ import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.utils.FieldsComparator; + +import javax.annotation.Nullable; import java.util.Comparator; import java.util.List; @@ -40,6 +43,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter { protected final KeyValueFileReaderFactory readerFactory; protected final KeyValueFileWriterFactory writerFactory; protected final Comparator keyComparator; + @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final MergeFunctionFactory mfFactory; protected final MergeSorter mergeSorter; @@ -47,11 +51,13 @@ public MergeTreeCompactRewriter( KeyValueFileReaderFactory readerFactory, KeyValueFileWriterFactory writerFactory, Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, MergeSorter mergeSorter) { this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.keyComparator = keyComparator; + this.userDefinedSeqComparator = userDefinedSeqComparator; this.mfFactory = mfFactory; this.mergeSorter = mergeSorter; } @@ -72,6 +78,7 @@ protected CompactResult rewriteCompaction( dropDelete, readerFactory, keyComparator, + userDefinedSeqComparator, mfFactory.create(), mergeSorter); writer.write(new RecordReaderIterator<>(sectionsReader)); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 6c7fa056735f..dbf6dfd7531f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -24,10 +24,23 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.options.Options; -import org.apache.paimon.table.sink.SequenceGenerator; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeDefaultVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.Projection; import javax.annotation.Nullable; @@ -144,9 +157,9 @@ private void updateWithSequenceGroup(KeyValue kv) { row.setField(i, field); } } else { - Long currentSeq = sequenceGen.generateWithoutPadding(kv.value()); + Long currentSeq = sequenceGen.generate(kv.value()); if (currentSeq != null) { - Long previousSeq = sequenceGen.generateWithoutPadding(row); + Long previousSeq = sequenceGen.generate(row); if (previousSeq == null || currentSeq >= previousSeq) { row.setField( i, aggregator == null ? field : aggregator.agg(accumulator, field)); @@ -162,9 +175,9 @@ private void retractWithSequenceGroup(KeyValue kv) { for (int i = 0; i < getters.length; i++) { SequenceGenerator sequenceGen = fieldSequences.get(i); if (sequenceGen != null) { - Long currentSeq = sequenceGen.generateWithoutPadding(kv.value()); + Long currentSeq = sequenceGen.generate(kv.value()); if (currentSeq != null) { - Long previousSeq = sequenceGen.generateWithoutPadding(row); + Long previousSeq = sequenceGen.generate(row); FieldAggregator aggregator = fieldAggregators.get(i); if (previousSeq == null || currentSeq >= previousSeq) { if (sequenceGen.index() == i) { @@ -391,4 +404,137 @@ private Map createFieldAggregators( return fieldAggregators; } } + + private static class SequenceGenerator { + + private final int index; + + private final Generator generator; + private final DataType fieldType; + + private SequenceGenerator(String field, RowType rowType) { + index = rowType.getFieldNames().indexOf(field); + if (index == -1) { + throw new RuntimeException( + String.format( + "Can not find sequence field %s in table schema: %s", + field, rowType)); + } + fieldType = rowType.getTypeAt(index); + generator = fieldType.accept(new SequenceGeneratorVisitor()); + } + + public SequenceGenerator(int index, DataType dataType) { + this.index = index; + + this.fieldType = dataType; + if (index == -1) { + throw new RuntimeException(String.format("Index : %s is invalid", index)); + } + generator = fieldType.accept(new SequenceGeneratorVisitor()); + } + + public int index() { + return index; + } + + public DataType fieldType() { + return fieldType; + } + + @Nullable + public Long generate(InternalRow row) { + return generator.generateNullable(row, index); + } + + private interface Generator { + long generate(InternalRow row, int i); + + @Nullable + default Long generateNullable(InternalRow row, int i) { + if (row.isNullAt(i)) { + return null; + } + return generate(row, i); + } + } + + private static class SequenceGeneratorVisitor extends DataTypeDefaultVisitor { + + @Override + public Generator visit(CharType charType) { + return stringGenerator(); + } + + @Override + public Generator visit(VarCharType varCharType) { + return stringGenerator(); + } + + private Generator stringGenerator() { + return (row, i) -> Long.parseLong(row.getString(i).toString()); + } + + @Override + public Generator visit(DecimalType decimalType) { + return (row, i) -> + InternalRowUtils.castToIntegral( + row.getDecimal( + i, decimalType.getPrecision(), decimalType.getScale())); + } + + @Override + public Generator visit(TinyIntType tinyIntType) { + return InternalRow::getByte; + } + + @Override + public Generator visit(SmallIntType smallIntType) { + return InternalRow::getShort; + } + + @Override + public Generator visit(IntType intType) { + return InternalRow::getInt; + } + + @Override + public Generator visit(BigIntType bigIntType) { + return InternalRow::getLong; + } + + @Override + public Generator visit(FloatType floatType) { + return (row, i) -> (long) row.getFloat(i); + } + + @Override + public Generator visit(DoubleType doubleType) { + return (row, i) -> (long) row.getDouble(i); + } + + @Override + public Generator visit(DateType dateType) { + return InternalRow::getInt; + } + + @Override + public Generator visit(TimestampType timestampType) { + return (row, i) -> + row.getTimestamp(i, timestampType.getPrecision()).getMillisecond(); + } + + @Override + public Generator visit(LocalZonedTimestampType localZonedTimestampType) { + return (row, i) -> + row.getTimestamp(i, localZonedTimestampType.getPrecision()) + .getMillisecond(); + } + + @Override + protected Generator defaultMethod(DataType dataType) { + throw new UnsupportedOperationException("Unsupported type: " + dataType); + } + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index e8accda5b62f..8852e38a3d1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -24,8 +24,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.format.FileFormatDiscover; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.DropDeleteReader; @@ -41,12 +39,11 @@ import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.ProjectedRow; import javax.annotation.Nullable; @@ -72,6 +69,7 @@ public class KeyValueFileStoreRead implements FileStoreRead { private final Comparator keyComparator; private final MergeFunctionFactory mfFactory; private final MergeSorter mergeSorter; + @Nullable private final FieldsComparator userDefinedSeqComparator; @Nullable private int[][] keyProjectedFields; @@ -84,49 +82,20 @@ public class KeyValueFileStoreRead implements FileStoreRead { private boolean forceKeepDelete = false; - public KeyValueFileStoreRead( - FileIO fileIO, - SchemaManager schemaManager, - long schemaId, - RowType keyType, - RowType valueType, - Comparator keyComparator, - MergeFunctionFactory mfFactory, - FileFormatDiscover formatDiscover, - FileStorePathFactory pathFactory, - KeyValueFieldsExtractor extractor, - CoreOptions options) { - this( - schemaManager, - schemaId, - keyType, - valueType, - keyComparator, - mfFactory, - KeyValueFileReaderFactory.builder( - fileIO, - schemaManager, - schemaId, - keyType, - valueType, - formatDiscover, - pathFactory, - extractor, - options)); - } - public KeyValueFileStoreRead( SchemaManager schemaManager, long schemaId, RowType keyType, RowType valueType, Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, KeyValueFileReaderFactory.Builder readerFactoryBuilder) { this.tableSchema = schemaManager.schema(schemaId); this.readerFactoryBuilder = readerFactoryBuilder; this.keyComparator = keyComparator; this.mfFactory = mfFactory; + this.userDefinedSeqComparator = userDefinedSeqComparator; this.mergeSorter = new MergeSorter( CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null); @@ -227,7 +196,7 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli batchMergeRead( split.partition(), split.bucket(), split.dataFiles(), false), keyComparator, - null, + userDefinedSeqComparator, mergeSorter, forceKeepDelete); } @@ -256,7 +225,7 @@ private RecordReader batchMergeRead( ? overlappedSectionFactory : nonOverlappedSectionFactory, keyComparator, - null, + userDefinedSeqComparator, mergeFuncWrapper, mergeSorter)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 269ff2dc5dc2..8fb99645bdc8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -55,12 +55,12 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.UserDefinedSeqComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,9 +84,9 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; private final KeyValueFileWriterFactory.Builder writerFactoryBuilder; private final Supplier> keyComparatorSupplier; + private final Supplier udsComparatorSupplier; private final Supplier valueEqualiserSupplier; private final MergeFunctionFactory mfFactory; - private final TableSchema schema; private final CoreOptions options; private final FileIO fileIO; private final RowType keyType; @@ -100,6 +100,7 @@ public KeyValueFileStoreWrite( RowType keyType, RowType valueType, Supplier> keyComparatorSupplier, + Supplier udsComparatorSupplier, Supplier valueEqualiserSupplier, MergeFunctionFactory mfFactory, FileStorePathFactory pathFactory, @@ -114,6 +115,7 @@ public KeyValueFileStoreWrite( this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; + this.udsComparatorSupplier = udsComparatorSupplier; this.readerFactoryBuilder = KeyValueFileReaderFactory.builder( fileIO, @@ -137,7 +139,6 @@ public KeyValueFileStoreWrite( this.keyComparatorSupplier = keyComparatorSupplier; this.valueEqualiserSupplier = valueEqualiserSupplier; this.mfFactory = mfFactory; - this.schema = schemaManager.schema(schemaId); this.options = options; } @@ -186,7 +187,7 @@ protected MergeTreeWriter createWriter( options.commitForceCompact(), options.changelogProducer(), restoreIncrement, - SequenceGenerator.create(schema, options)); + UserDefinedSeqComparator.create(valueType, options)); } @VisibleForTesting @@ -204,7 +205,10 @@ private CompactManager createCompactManager( return new NoopCompactManager(); } else { Comparator keyComparator = keyComparatorSupplier.get(); - CompactRewriter rewriter = createRewriter(partition, bucket, keyComparator, levels); + @Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get(); + CompactRewriter rewriter = + createRewriter( + partition, bucket, keyComparator, userDefinedSeqComparator, levels); return new MergeTreeCompactManager( compactExecutor, levels, @@ -220,7 +224,11 @@ private CompactManager createCompactManager( } private MergeTreeCompactRewriter createRewriter( - BinaryRow partition, int bucket, Comparator keyComparator, Levels levels) { + BinaryRow partition, + int bucket, + Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, + Levels levels) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); @@ -235,6 +243,7 @@ private MergeTreeCompactRewriter createRewriter( readerFactory, writerFactory, keyComparator, + userDefinedSeqComparator, mfFactory, mergeSorter, valueEqualiserSupplier.get(), @@ -253,6 +262,7 @@ private MergeTreeCompactRewriter createRewriter( readerFactory, writerFactory, keyComparator, + userDefinedSeqComparator, mfFactory, mergeSorter, new FirstRowMergeFunctionWrapperFactory()); @@ -265,6 +275,7 @@ levels, new KeyValueProcessor(valueType), readerFactory), readerFactory, writerFactory, keyComparator, + userDefinedSeqComparator, mfFactory, mergeSorter, new LookupMergeFunctionWrapperFactory( @@ -273,7 +284,12 @@ levels, new KeyValueProcessor(valueType), readerFactory), } default: return new MergeTreeCompactRewriter( - readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter); + readerFactory, + writerFactory, + keyComparator, + userDefinedSeqComparator, + mfFactory, + mergeSorter); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 9fac381147e1..91ccf9c11b1c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -164,13 +164,13 @@ public static void validateTableSchema(TableSchema schema) { } } - Optional sequenceField = options.sequenceField(); + Optional> sequenceField = options.sequenceField(); sequenceField.ifPresent( - field -> + fields -> checkArgument( - schema.fieldNames().contains(field), - "Nonexistent sequence field: '%s'", - field)); + schema.fieldNames().containsAll(fields), + "Nonexistent sequence fields: '%s'", + fields)); Optional rowkindField = options.rowkindField(); rowkindField.ifPresent( @@ -181,11 +181,13 @@ public static void validateTableSchema(TableSchema schema) { field)); sequenceField.ifPresent( - field -> - checkArgument( - options.fieldAggFunc(field) == null, - "Should not define aggregation on sequence field: '%s'", - field)); + fields -> + fields.forEach( + field -> + checkArgument( + options.fieldAggFunc(field) == null, + "Should not define aggregation on sequence field: '%s'", + field))); CoreOptions.MergeEngine mergeEngine = options.mergeEngine(); if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 9f008eb39746..3ed9a3ac726a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -109,7 +109,7 @@ public void refreshFiles( int bucket, List beforeFiles, List dataFiles) { - LookupLevels lookupLevels = + LookupLevels lookupLevels = tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket); if (lookupLevels == null) { Preconditions.checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java deleted file mode 100644 index 9c6b81159409..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.sink; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.SequenceAutoPadding; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.CharType; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypeDefaultVisitor; -import org.apache.paimon.types.DataTypeFamily; -import org.apache.paimon.types.DateType; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.DoubleType; -import org.apache.paimon.types.FloatType; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.LocalZonedTimestampType; -import org.apache.paimon.types.RowKind; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.SmallIntType; -import org.apache.paimon.types.TimestampType; -import org.apache.paimon.types.TinyIntType; -import org.apache.paimon.types.VarCharType; -import org.apache.paimon.utils.InternalRowUtils; - -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -/** Generate sequence number. */ -public class SequenceGenerator { - - private final int index; - private final List paddings; - - private final Generator generator; - private final DataType fieldType; - - public SequenceGenerator(String field, RowType rowType) { - this(field, rowType, Collections.emptyList()); - } - - public SequenceGenerator(String field, RowType rowType, List paddings) { - index = rowType.getFieldNames().indexOf(field); - this.paddings = paddings; - - if (index == -1) { - throw new RuntimeException( - String.format( - "Can not find sequence field %s in table schema: %s", field, rowType)); - } - fieldType = rowType.getTypeAt(index); - generator = fieldType.accept(new SequenceGeneratorVisitor()); - } - - public SequenceGenerator(int index, DataType dataType) { - this.index = index; - this.paddings = Collections.emptyList(); - - this.fieldType = dataType; - if (index == -1) { - throw new RuntimeException(String.format("Index : %s is invalid", index)); - } - generator = fieldType.accept(new SequenceGeneratorVisitor()); - } - - @Nullable - public static SequenceGenerator create(TableSchema schema, CoreOptions options) { - List sequenceAutoPadding = - options.sequenceAutoPadding().stream() - .map(SequenceAutoPadding::fromString) - .collect(Collectors.toList()); - return options.sequenceField() - .map( - field -> - new SequenceGenerator( - field, schema.logicalRowType(), sequenceAutoPadding)) - .orElse(null); - } - - public int index() { - return index; - } - - public DataType fieldType() { - return fieldType; - } - - @Nullable - public Long generateWithoutPadding(InternalRow row) { - return generator.generateNullable(row, index); - } - - public long generateWithPadding(InternalRow row, long incrSeq) { - long sequence = generator.generate(row, index); - for (SequenceAutoPadding padding : paddings) { - switch (padding) { - case ROW_KIND_FLAG: - sequence = addRowKindFlag(sequence, row.getRowKind()); - break; - case SECOND_TO_MICRO: - sequence = secondToMicro(sequence, incrSeq); - break; - case MILLIS_TO_MICRO: - sequence = millisToMicro(sequence, incrSeq); - break; - default: - throw new UnsupportedOperationException( - "Unknown sequence padding mode " + padding); - } - } - return sequence; - } - - private long addRowKindFlag(long sequence, RowKind rowKind) { - return (sequence << 1) | (rowKind.isAdd() ? 1 : 0); - } - - private long millisToMicro(long sequence, long incrSeq) { - // Generated value is millis - return sequence * 1_000 + (incrSeq % 1_000); - } - - private long secondToMicro(long sequence, long incrSeq) { - // timestamp returns millis - long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 : sequence; - return second * 1_000_000 + (incrSeq % 1_000_000); - } - - private interface Generator { - long generate(InternalRow row, int i); - - @Nullable - default Long generateNullable(InternalRow row, int i) { - if (row.isNullAt(i)) { - return null; - } - return generate(row, i); - } - } - - private static class SequenceGeneratorVisitor extends DataTypeDefaultVisitor { - - @Override - public Generator visit(CharType charType) { - return stringGenerator(); - } - - @Override - public Generator visit(VarCharType varCharType) { - return stringGenerator(); - } - - private Generator stringGenerator() { - return (row, i) -> Long.parseLong(row.getString(i).toString()); - } - - @Override - public Generator visit(DecimalType decimalType) { - return (row, i) -> - InternalRowUtils.castToIntegral( - row.getDecimal(i, decimalType.getPrecision(), decimalType.getScale())); - } - - @Override - public Generator visit(TinyIntType tinyIntType) { - return InternalRow::getByte; - } - - @Override - public Generator visit(SmallIntType smallIntType) { - return InternalRow::getShort; - } - - @Override - public Generator visit(IntType intType) { - return InternalRow::getInt; - } - - @Override - public Generator visit(BigIntType bigIntType) { - return InternalRow::getLong; - } - - @Override - public Generator visit(FloatType floatType) { - return (row, i) -> (long) row.getFloat(i); - } - - @Override - public Generator visit(DoubleType doubleType) { - return (row, i) -> (long) row.getDouble(i); - } - - @Override - public Generator visit(DateType dateType) { - return InternalRow::getInt; - } - - @Override - public Generator visit(TimestampType timestampType) { - return (row, i) -> row.getTimestamp(i, timestampType.getPrecision()).getMillisecond(); - } - - @Override - public Generator visit(LocalZonedTimestampType localZonedTimestampType) { - return (row, i) -> - row.getTimestamp(i, localZonedTimestampType.getPrecision()).getMillisecond(); - } - - @Override - protected Generator defaultMethod(DataType dataType) { - throw new UnsupportedOperationException("Unsupported type: " + dataType); - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java new file mode 100644 index 000000000000..b1ea75ac1d61 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** A {@link FieldsComparator} for user defined sequence fields. */ +public class UserDefinedSeqComparator implements FieldsComparator { + + private final int[] fields; + private final RecordComparator comparator; + + public UserDefinedSeqComparator(int[] fields, RecordComparator comparator) { + this.fields = fields; + this.comparator = comparator; + } + + @Override + public int[] compareFields() { + return fields; + } + + @Override + public int compare(InternalRow o1, InternalRow o2) { + return comparator.compare(o1, o2); + } + + @Nullable + public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) { + Optional> sequenceField = options.sequenceField(); + if (!sequenceField.isPresent()) { + return null; + } + + List fieldNames = rowType.getFieldNames(); + int[] fields = sequenceField.get().stream().mapToInt(fieldNames::indexOf).toArray(); + RecordComparator comparator = + CodeGenUtils.newRecordComparator( + rowType.getFieldTypes(), fields, "UserDefinedSeqComparator"); + return new UserDefinedSeqComparator(fields, comparator); + } + + @Nullable + public static UserDefinedSeqComparator create(RowType rowType, List sequenceFields) { + if (sequenceFields.isEmpty()) { + return null; + } + + List fieldNames = rowType.getFieldNames(); + int[] fields = sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray(); + RecordComparator comparator = + CodeGenUtils.newRecordComparator( + rowType.getFieldTypes(), fields, "UserDefinedSeqComparator"); + return new UserDefinedSeqComparator(fields, comparator); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 1aa708306039..6e718a0855c6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -554,6 +554,7 @@ private List readAll(List files, boolean dropDelete) dropDelete, readerFactory, comparator, + null, DeduplicateMergeFunction.factory().create(), new MergeSorter(options, null, null, null)); List records = new ArrayList<>(); @@ -600,6 +601,7 @@ public CompactResult rewrite( dropDelete, compactReaderFactory, comparator, + null, DeduplicateMergeFunction.factory().create(), new MergeSorter(options, null, null, null)); writer.write(new RecordReaderIterator<>(sectionsReader)); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java index 170a66779476..0d873507f43a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java @@ -66,6 +66,7 @@ public abstract class SortBufferWriteBufferTestBase { new RowType(Collections.singletonList(new DataField(0, "key", new IntType()))), new RowType( Collections.singletonList(new DataField(1, "value", new BigIntType()))), + null, new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024), false, 128, diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9c731ffea794..db37284be961 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -131,7 +131,7 @@ public void testCreateTableIllegal() { "f4"), "")))) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Nonexistent sequence field: 'f4'"); + .hasMessageContaining("Nonexistent sequence fields: '[f4]'"); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index d561ce5ca04e..ff280c27e078 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -209,60 +209,6 @@ public void testSequenceNumber() throws Exception { "1|11|101|binary|varbinary|mapKey:mapVal|multiset")); } - @Test - public void testPaddingSequenceNumber() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.INT(), - DataTypes.INT(), - DataTypes.INT(), - DataTypes.INT(), - DataTypes.STRING() - }, - new String[] {"pt", "a", "b", "sec", "non_time"}); - GenericRow row1 = GenericRow.of(1, 10, 100, 1685530987, BinaryString.fromString("a1")); - GenericRow row2 = GenericRow.of(1, 10, 101, 1685530987, BinaryString.fromString("a2")); - GenericRow row3 = GenericRow.of(1, 10, 101, 1685530987, BinaryString.fromString("a3")); - FileStoreTable table = - createFileStoreTable( - conf -> { - conf.set(CoreOptions.SEQUENCE_FIELD, "sec"); - conf.set( - CoreOptions.SEQUENCE_AUTO_PADDING, - CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString()); - }, - rowType); - StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser); - write.write(row1); - write.write(row2); - commit.commit(0, write.prepareCommit(false, 0)); - write.write(row3); - commit.commit(1, write.prepareCommit(false, 1)); - write.close(); - commit.close(); - - ReadBuilder readBuilder = table.newReadBuilder(); - Function toString = - row -> - row.getInt(0) - + "|" - + row.getInt(1) - + "|" - + row.getInt(2) - + "|" - + row.getInt(3) - + "|" - + row.getString(4); - assertThat( - getResult( - readBuilder.newRead(), - readBuilder.newScan().plan().splits(), - toString)) - .isEqualTo(Collections.singletonList("1|10|101|1685530987|a3")); - } - @Test public void testBatchReadWrite() throws Exception { writeData(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java deleted file mode 100644 index f15ed20c0dee..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.sink; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.Decimal; -import org.apache.paimon.data.GenericArray; -import org.apache.paimon.data.GenericMap; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.Timestamp; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowKind; -import org.apache.paimon.types.RowType; - -import org.junit.jupiter.api.Test; - -import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO; -import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG; -import static org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** Test for {@link SequenceGenerator}. */ -public class SequenceGeneratorTest { - - private static final RowType ALL_DATA_TYPE = - RowType.of( - new DataType[] { - DataTypes.INT(), // _id - DataTypes.DECIMAL(2, 1), // pt - DataTypes.INT(), // second - DataTypes.BOOLEAN(), - DataTypes.TINYINT(), - DataTypes.SMALLINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), // millis - DataTypes.FLOAT(), - DataTypes.DOUBLE(), - DataTypes.STRING(), - DataTypes.DATE(), - DataTypes.TIMESTAMP(0), - DataTypes.TIMESTAMP(3), - DataTypes.TIMESTAMP(6), - DataTypes.CHAR(10), - DataTypes.VARCHAR(20), - DataTypes.BINARY(10), - DataTypes.VARBINARY(20), - DataTypes.BYTES(), - DataTypes.TIME(), - DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), - DataTypes.MAP(DataTypes.INT(), DataTypes.INT()), - DataTypes.ARRAY(DataTypes.STRING()), - DataTypes.MULTISET(DataTypes.VARCHAR(8)) - }, - new String[] { - "_id", - "pt", - "_intsecond", - "_boolean", - "_tinyint", - "_smallint", - "_bigint", - "_bigintmillis", - "_float", - "_double", - "_string", - "_date", - "_timestamp0", - "_timestamp3", - "_timestamp6", - "_char", - "_varchar", - "_binary", - "_varbinary", - "_bytes", - "_time", - "_localtimestamp", - "_map", - "_array", - "_multiset", - }); - private static final InternalRow row = - GenericRow.of( - 1, - Decimal.fromUnscaledLong(10, 2, 1), - 1685548953, - true, - (byte) 2, - (short) 3, - 4000000000000L, - 1685548953000L, - 2.81f, - 3.678008, - BinaryString.fromString("1"), - 375, /* 1971-01-11 */ - Timestamp.fromEpochMillis(1685548953000L), - Timestamp.fromEpochMillis(1685548953123L), - Timestamp.fromMicros(1685548953123456L), - BinaryString.fromString("3"), - BinaryString.fromString("4"), - "5".getBytes(), - "6".getBytes(), - "7".getBytes(), - 123, - Timestamp.fromMicros(1685548953123456L), - new GenericMap( - Collections.singletonMap( - BinaryString.fromString("mapKey"), - BinaryString.fromString("mapVal"))), - new GenericArray( - new BinaryString[] { - BinaryString.fromString("a"), BinaryString.fromString("b") - }), - new GenericMap( - Collections.singletonMap(BinaryString.fromString("multiset"), 1))); - - @Test - public void testGenerate() { - assertThat(getGenerator("_id").generateWithPadding(row, 0)).isEqualTo(1); - assertThat(getGenerator("pt").generateWithPadding(row, 0)).isEqualTo(1); - assertThat(getGenerator("_intsecond").generateWithPadding(row, 0)).isEqualTo(1685548953); - assertThat(getGenerator("_tinyint").generateWithPadding(row, 0)).isEqualTo(2); - assertThat(getGenerator("_smallint").generateWithPadding(row, 0)).isEqualTo(3); - assertThat(getGenerator("_bigint").generateWithPadding(row, 0)).isEqualTo(4000000000000L); - assertThat(getGenerator("_bigintmillis").generateWithPadding(row, 0)) - .isEqualTo(1685548953000L); - assertThat(getGenerator("_float").generateWithPadding(row, 0)).isEqualTo(2); - assertThat(getGenerator("_double").generateWithPadding(row, 0)).isEqualTo(3); - assertThat(getGenerator("_string").generateWithPadding(row, 0)).isEqualTo(1); - assertThat(getGenerator("_date").generateWithPadding(row, 0)).isEqualTo(375); - assertThat(getGenerator("_timestamp0").generateWithPadding(row, 0)) - .isEqualTo(1685548953000L); - assertThat(getGenerator("_timestamp3").generateWithPadding(row, 0)) - .isEqualTo(1685548953123L); - assertThat(getGenerator("_timestamp6").generateWithPadding(row, 0)) - .isEqualTo(1685548953123L); - assertThat(getGenerator("_char").generateWithPadding(row, 0)).isEqualTo(3); - assertThat(getGenerator("_varchar").generateWithPadding(row, 0)).isEqualTo(4); - assertThat(getGenerator("_localtimestamp").generateWithPadding(row, 0)) - .isEqualTo(1685548953123L); - assertUnsupportedDatatype("_boolean"); - assertUnsupportedDatatype("_binary"); - assertUnsupportedDatatype("_varbinary"); - assertUnsupportedDatatype("_bytes"); - assertUnsupportedDatatype("_time"); - assertUnsupportedDatatype("_map"); - assertUnsupportedDatatype("_array"); - assertUnsupportedDatatype("_multiset"); - } - - @Test - public void testGenerateWithPadding() { - assertThat(generateWithPaddingOnSecond("_id", 5)).isEqualTo(1000005L); - assertThat(generateWithPaddingOnSecond("pt", 5)).isEqualTo(1000005L); - - assertThat(generateWithPaddingOnSecond("_intsecond", 5)).isEqualTo(1685548953000005L); - assertThat(generateWithPaddingOnSecond("_tinyint", 5)).isEqualTo(2000005L); - - assertThat(generateWithPaddingOnSecond("_smallint", 5)).isEqualTo(3000005L); - - assertThat(generateWithPaddingOnMillis("_bigint", 5)).isEqualTo(4000000000000005L); - - assertThat(generateWithPaddingOnMillis("_bigintmillis", 5)).isEqualTo(1685548953000005L); - - assertThat(generateWithPaddingOnMillis("_float", 5)).isEqualTo(2005); - - assertThat(generateWithPaddingOnMillis("_double", 5)).isEqualTo(3005); - - assertThat(generateWithPaddingOnMillis("_string", 5)).isEqualTo(1005); - - assertThat(generateWithPaddingOnMillis("_date", 5)).isEqualTo(375005); - - assertThat(generateWithPaddingOnSecond("_timestamp0", 5)).isEqualTo(1685548953000005L); - - assertThat(generateWithPaddingOnMillis("_timestamp3", 5)).isEqualTo(1685548953123005L); - - assertThat(generateWithPaddingOnMillis("_timestamp6", 5)).isEqualTo(1685548953123005L); - - assertThat(generateWithPaddingOnMillis("_char", 5)).isEqualTo(3005); - - assertThat(generateWithPaddingOnMillis("_varchar", 5)).isEqualTo(4005); - assertThat(generateWithPaddingOnSecond("_localtimestamp", 5)).isEqualTo(1685548953000005L); - assertThat(generateWithPaddingOnMillis("_localtimestamp", 5)).isEqualTo(1685548953123005L); - assertUnsupportedDatatype("_boolean"); - assertUnsupportedDatatype("_binary"); - assertUnsupportedDatatype("_varbinary"); - assertUnsupportedDatatype("_bytes"); - assertUnsupportedDatatype("_time"); - assertUnsupportedDatatype("_map"); - assertUnsupportedDatatype("_array"); - assertUnsupportedDatatype("_multiset"); - } - - @Test - public void testGenerateWithPaddingRowKind() { - assertThat(generateWithPaddingOnRowKind(1L, RowKind.INSERT)).isEqualTo(3); - assertThat(generateWithPaddingOnRowKind(1L, RowKind.UPDATE_AFTER)).isEqualTo(3); - assertThat(generateWithPaddingOnRowKind(1L, RowKind.UPDATE_BEFORE)).isEqualTo(2); - assertThat(generateWithPaddingOnRowKind(1L, RowKind.DELETE)).isEqualTo(2); - - long maxMicros = - Timestamp.fromLocalDateTime(LocalDateTime.parse("5000-01-01T00:00:00")).toMicros(); - assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT)) - .isEqualTo(191235168000000001L); - - long sequence = generateWithPaddingOnMicrosAndRowKind(1L, 20, RowKind.INSERT); - assertThat(sequence).isEqualTo(2041); - sequence = generateWithPaddingOnMicrosAndRowKind(1L, 30, RowKind.UPDATE_BEFORE); - System.out.println(sequence); - assertThat(sequence).isEqualTo(2060); - } - - private SequenceGenerator getGenerator(String field) { - return getGenerator(field, Collections.emptyList()); - } - - private SequenceGenerator getGenerator( - String field, List paddings) { - return new SequenceGenerator(field, ALL_DATA_TYPE, paddings); - } - - private void assertUnsupportedDatatype(String field) { - assertThatThrownBy(() -> getGenerator(field).generateWithPadding(row, 0)) - .isInstanceOf(UnsupportedOperationException.class); - } - - private long generateWithPaddingOnSecond(String field, long incrSeq) { - return getGenerator(field, Collections.singletonList(SECOND_TO_MICRO)) - .generateWithPadding(row, incrSeq); - } - - private long generateWithPaddingOnMillis(String field, long incrSeq) { - return getGenerator(field, Collections.singletonList(MILLIS_TO_MICRO)) - .generateWithPadding(row, incrSeq); - } - - private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) { - return getGenerator("_bigint", Collections.singletonList(ROW_KIND_FLAG)) - .generateWithPadding(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence), 0); - } - - private long generateWithPaddingOnMicrosAndRowKind( - long sequence, long incrSeq, RowKind rowKind) { - return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO, ROW_KIND_FLAG)) - .generateWithPadding( - GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence), incrSeq); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java index 068d5d33abd3..afd660fd3471 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java @@ -173,8 +173,8 @@ private List getExceptedResult(long snapshotId) { DataFileMeta file = fileEntry.file(); String minKey = String.valueOf(file.minKey().getInt(0)); String maxKey = String.valueOf(file.maxKey().getInt(0)); - String minSequenceNumber = String.valueOf(file.minSequenceNumber()); - String maxSequenceNumber = String.valueOf(file.maxSequenceNumber()); + String minCol1 = String.valueOf(file.valueStats().minValues().getInt(2)); + String maxCol1 = String.valueOf(file.valueStats().maxValues().getInt(2)); expectedRow.add( GenericRow.of( BinaryString.fromString(Arrays.toString(new String[] {partition})), @@ -191,12 +191,10 @@ private List getExceptedResult(long snapshotId) { String.format("{col1=%s, pk=%s, pt=%s}", 0, 0, 0)), BinaryString.fromString( String.format( - "{col1=%s, pk=%s, pt=%s}", - minSequenceNumber, minKey, partition)), + "{col1=%s, pk=%s, pt=%s}", minCol1, minKey, partition)), BinaryString.fromString( String.format( - "{col1=%s, pk=%s, pt=%s}", - maxSequenceNumber, maxKey, partition)), + "{col1=%s, pk=%s, pt=%s}", maxCol1, maxKey, partition)), file.minSequenceNumber(), file.maxSequenceNumber(), file.creationTime())); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 7f9b019bde75..185cf554b48a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -31,12 +31,13 @@ import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.MutableObjectIterator; import org.apache.paimon.utils.PartialRow; import org.apache.paimon.utils.TypeUtils; +import org.apache.paimon.utils.UserDefinedSeqComparator; import javax.annotation.Nullable; @@ -46,8 +47,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; - -import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER; +import java.util.concurrent.atomic.AtomicInteger; /** Lookup table of full cache. */ public abstract class FullCacheLookupTable implements LookupTable { @@ -55,7 +55,9 @@ public abstract class FullCacheLookupTable implements LookupTable { protected final Context context; protected final RocksDBStateFactory stateFactory; protected final RowType projectedType; - private final boolean sequenceFieldEnabled; + + @Nullable protected final FieldsComparator userDefinedSeqComparator; + protected final int appendUdsFieldNumber; private LookupStreamingReader reader; private Predicate specificPartition; @@ -68,12 +70,31 @@ public FullCacheLookupTable(Context context) throws IOException { context.table.coreOptions().toConfiguration(), null); FileStoreTable table = context.table; - this.sequenceFieldEnabled = - table.primaryKeys().size() > 0 - && new CoreOptions(table.options()).sequenceField().isPresent(); + List sequenceFields = new ArrayList<>(); + if (table.primaryKeys().size() > 0) { + new CoreOptions(table.options()).sequenceField().ifPresent(sequenceFields::addAll); + } RowType projectedType = TypeUtils.project(table.rowType(), context.projection); - if (sequenceFieldEnabled) { - projectedType = projectedType.appendDataField(SEQUENCE_NUMBER, DataTypes.BIGINT()); + if (sequenceFields.size() > 0) { + RowType.Builder builder = RowType.builder(); + projectedType.getFields().forEach(f -> builder.field(f.name(), f.type())); + RowType rowType = table.rowType(); + AtomicInteger appendUdsFieldNumber = new AtomicInteger(0); + sequenceFields.stream() + .filter(projectedType::notContainsField) + .map(rowType::getField) + .forEach( + f -> { + appendUdsFieldNumber.incrementAndGet(); + builder.field(f.name(), f.type()); + }); + projectedType = builder.build(); + this.userDefinedSeqComparator = + UserDefinedSeqComparator.create(projectedType, sequenceFields); + this.appendUdsFieldNumber = appendUdsFieldNumber.get(); + } else { + this.userDefinedSeqComparator = null; + this.appendUdsFieldNumber = 0; } this.projectedType = projectedType; } @@ -93,7 +114,7 @@ public void open() throws Exception { IOManager.create(context.tempPath.toString()), context.table.coreOptions()); Predicate predicate = projectedPredicate(); try (RecordReaderIterator batch = - new RecordReaderIterator<>(reader.nextBatch(true, sequenceFieldEnabled))) { + new RecordReaderIterator<>(reader.nextBatch(true))) { while (batch.hasNext()) { InternalRow row = batch.next(); if (predicate == null || predicate.test(row)) { @@ -124,11 +145,11 @@ public void open() throws Exception { public void refresh() throws Exception { while (true) { try (RecordReaderIterator batch = - new RecordReaderIterator<>(reader.nextBatch(false, sequenceFieldEnabled))) { + new RecordReaderIterator<>(reader.nextBatch(false))) { if (!batch.hasNext()) { return; } - refresh(batch, sequenceFieldEnabled); + refresh(batch); } } } @@ -136,21 +157,21 @@ public void refresh() throws Exception { @Override public final List get(InternalRow key) throws IOException { List values = innerGet(key); - if (!sequenceFieldEnabled) { + if (appendUdsFieldNumber == 0) { return values; } List dropSequence = new ArrayList<>(values.size()); for (InternalRow matchedRow : values) { - dropSequence.add(new PartialRow(matchedRow.getFieldCount() - 1, matchedRow)); + dropSequence.add( + new PartialRow(matchedRow.getFieldCount() - appendUdsFieldNumber, matchedRow)); } return dropSequence; } public abstract List innerGet(InternalRow key) throws IOException; - public abstract void refresh(Iterator input, boolean orderByLastField) - throws IOException; + public abstract void refresh(Iterator input) throws IOException; @Nullable public Predicate projectedPredicate() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ad6f726701a6..f12d78e80a2a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -36,7 +36,6 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FunctionWithIOException; import org.apache.paimon.utils.TypeUtils; @@ -55,7 +54,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; -import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER; /** A streaming reader to load data into {@link LookupTable}. */ public class LookupStreamingReader { @@ -120,19 +118,13 @@ private Table unsetTimeTravelOptions(Table origin) { return fileStoreTable.copy(newSchema); } - public RecordReader nextBatch(boolean useParallelism, boolean readSequenceNumber) - throws Exception { + public RecordReader nextBatch(boolean useParallelism) throws Exception { List splits = scan.plan().splits(); CoreOptions options = CoreOptions.fromMap(table.options()); FunctionWithIOException> readerSupplier = - readSequenceNumber - ? createReaderWithSequenceSupplier() - : split -> readBuilder.newRead().createReader(split); + split -> readBuilder.newRead().createReader(split); RowType readType = TypeUtils.project(table.rowType(), projection); - if (readSequenceNumber) { - readType = readType.appendDataField(SEQUENCE_NUMBER, DataTypes.BIGINT()); - } RecordReader reader; if (useParallelism) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index b931d18b45de..7f5d036b183d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -68,11 +68,10 @@ public List innerGet(InternalRow key) throws IOException { } @Override - public void refresh(Iterator incremental, boolean orderByLastField) - throws IOException { - if (orderByLastField) { + public void refresh(Iterator incremental) throws IOException { + if (userDefinedSeqComparator != null) { throw new IllegalArgumentException( - "Append table does not support order by last field."); + "Append table does not support user defined sequence fields."); } Predicate predicate = projectedPredicate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index 97034070c2e6..889e1e35ba8e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -91,16 +91,14 @@ public List innerGet(InternalRow key) throws IOException { } @Override - public void refresh(Iterator incremental, boolean orderByLastField) - throws IOException { + public void refresh(Iterator incremental) throws IOException { Predicate predicate = projectedPredicate(); while (incremental.hasNext()) { InternalRow row = incremental.next(); primaryKeyRow.replaceRow(row); - if (orderByLastField) { + if (userDefinedSeqComparator != null) { InternalRow previous = tableState.get(primaryKeyRow); - int orderIndex = projectedType.getFieldCount() - 1; - if (previous != null && previous.getLong(orderIndex) > row.getLong(orderIndex)) { + if (previous != null && userDefinedSeqComparator.compare(previous, row) > 0) { continue; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index d0731ebdf3f5..d4fb22c4b76a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -72,8 +72,7 @@ public List innerGet(InternalRow key) throws IOException { } @Override - public void refresh(Iterator incremental, boolean orderByLastField) - throws IOException { + public void refresh(Iterator incremental) throws IOException { Predicate predicate = projectedPredicate(); while (incremental.hasNext()) { InternalRow row = incremental.next(); @@ -81,11 +80,10 @@ public void refresh(Iterator incremental, boolean orderByLastField) boolean previousFetched = false; InternalRow previous = null; - if (orderByLastField) { + if (userDefinedSeqComparator != null) { previous = tableState.get(primaryKeyRow); previousFetched = true; - int orderIndex = projectedType.getFieldCount() - 1; - if (previous != null && previous.getLong(orderIndex) > row.getLong(orderIndex)) { + if (previous != null && userDefinedSeqComparator.compare(previous, row) > 0) { continue; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index caba2daa19d4..a0529c575ed6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -31,12 +31,12 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.PrimaryKeyTableUtils; import org.apache.paimon.table.sink.RowKindGenerator; -import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; @@ -62,7 +62,6 @@ public class LocalMergeOperator extends AbstractStreamOperator private transient RecordComparator keyComparator; private transient long recordCount; - private transient SequenceGenerator sequenceGenerator; private transient RowKindGenerator rowKindGenerator; private transient MergeFunction mergeFunction; @@ -88,12 +87,10 @@ public void open() throws Exception { CoreOptions options = new CoreOptions(schema.options()); keyProjection = - CodeGenUtils.newProjection( - schema.logicalRowType(), schema.projection(schema.primaryKeys())); + CodeGenUtils.newProjection(valueType, schema.projection(schema.primaryKeys())); keyComparator = new KeyComparatorSupplier(keyType).get(); recordCount = 0; - sequenceGenerator = SequenceGenerator.create(schema, options); rowKindGenerator = RowKindGenerator.create(schema, options); mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory( @@ -119,6 +116,7 @@ public List valueFields(TableSchema schema) { new SortBufferWriteBuffer( keyType, valueType, + UserDefinedSeqComparator.create(valueType, options), new HeapMemorySegmentPool( options.localMergeBufferSize(), options.pageSize()), false, @@ -141,13 +139,9 @@ public void processElement(StreamRecord record) throws Exception { row.setRowKind(RowKind.INSERT); InternalRow key = keyProjection.apply(row); - long sequenceNumber = - sequenceGenerator == null - ? recordCount - : sequenceGenerator.generateWithPadding(row, recordCount); - if (!buffer.put(sequenceNumber, rowKind, key, row)) { + if (!buffer.put(recordCount, rowKind, key, row)) { flushBuffer(); - if (!buffer.put(sequenceNumber, rowKind, key, row)) { + if (!buffer.put(recordCount, rowKind, key, row)) { // change row kind back row.setRowKind(rowKind); output.collect(record); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 3af6b722df6f..065ace85a6e9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -130,7 +130,7 @@ public void testPkTable() throws Exception { List> records = new ArrayList<>(); for (int i = 1; i <= 100_000; i++) { InternalRow row = row(i, 11 * i, 111 * i); - records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L)))); + records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row))); } records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey())); TableBulkLoader bulkLoader = table.createBulkLoader(); @@ -146,18 +146,16 @@ public void testPkTable() throws Exception { } // test refresh to update - table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 22, 222)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); // test refresh to delete - table.refresh( - singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator(), false); + table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator()); assertThat(table.get(row(1))).hasSize(0); - table.refresh( - singletonList(sequence(row(RowKind.DELETE, 3, 33, 333), -1L)).iterator(), false); + table.refresh(singletonList(row(RowKind.DELETE, 3, 33, 333)).iterator()); assertThat(table.get(row(3))).hasSize(0); } @@ -179,7 +177,7 @@ public void testPkTableWithSequenceField() throws Exception { List> records = new ArrayList<>(); for (int i = 1; i <= 10; i++) { - InternalRow row = sequence(row(i, 11 * i, 111 * i), -1L); + InternalRow row = row(i, 11 * i, 111 * i); records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row))); } records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey())); @@ -190,20 +188,19 @@ public void testPkTableWithSequenceField() throws Exception { bulkLoader.finish(); // test refresh to update - table.refresh(singletonList(sequence(row(1, 22, 222), 1L)).iterator(), true); + table.refresh(singletonList(row(1, 22, 222)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); // refresh with old sequence - table.refresh(singletonList((sequence(row(1, 33, 333), 0L))).iterator(), true); + table.refresh(singletonList((row(1, 11, 333))).iterator()); result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); // test refresh delete data with old sequence - table.refresh( - singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator(), true); + table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator()); assertThat(table.get(row(1))).hasSize(1); assertRow(result.get(0), 1, 22, 222); } @@ -222,21 +219,20 @@ public void testPkTablePkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); - table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 11, 111); - table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 22, 222)).iterator()); result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); - table.refresh( - singletonList(sequence(row(RowKind.DELETE, 1, 11, 111), -1L)).iterator(), false); + table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator()); assertThat(table.get(row(1))).hasSize(0); - table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator(), false); + table.refresh(singletonList(row(3, 33, 333)).iterator()); assertThat(table.get(row(3))).hasSize(0); } @@ -254,12 +250,12 @@ public void testPkTableNonPkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); - table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 11, 111); - table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 22, 222)).iterator()); result = table.get(row(1)); assertThat(result).hasSize(0); } @@ -285,7 +281,7 @@ public void testSecKeyTable() throws Exception { for (int i = 1; i <= 100_000; i++) { int secKey = rnd.nextInt(i); InternalRow row = row(i, secKey, 111 * i); - records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(sequence(row, -1L)))); + records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row))); secKeyToPk.computeIfAbsent(secKey, k -> new HashSet<>()).add(i); } records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey())); @@ -302,7 +298,7 @@ public void testSecKeyTable() throws Exception { } // add new sec key to pk - table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 22, 222)).iterator()); List result = table.get(row(22)); assertThat(result.stream().map(row -> row.getInt(0))).contains(1); } @@ -345,21 +341,14 @@ public void testSecKeyTableWithSequenceField() throws Exception { .containsExactlyInAnyOrderElementsOf(entry.getValue()); } - JoinedRow joined = new JoinedRow(); // add new sec key to pk - table.refresh( - singletonList((InternalRow) joined.replace(row(1, 22, 222), GenericRow.of(1L))) - .iterator(), - true); + table.refresh(singletonList(row(1, 22, 222)).iterator()); List result = table.get(row(22)); assertThat(result.stream().map(row -> row.getInt(0))).contains(1); assertThat(result.stream().map(InternalRow::getFieldCount)).allMatch(n -> n == 3); // refresh with old value - table.refresh( - singletonList((InternalRow) joined.replace(row(1, 22, 333), GenericRow.of(0L))) - .iterator(), - true); + table.refresh(singletonList(row(1, 11, 333)).iterator()); result = table.get(row(22)); assertThat(result.stream().map(row -> row.getInt(2))).doesNotContain(333); } @@ -378,30 +367,29 @@ public void testSecKeyTablePkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); - table.refresh(singletonList(sequence(row(1, 11, 111), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(11)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 11, 111); - table.refresh(singletonList(sequence(row(1, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(1, 22, 222)).iterator()); assertThat(table.get(row(11))).hasSize(0); result = table.get(row(22)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); - table.refresh(singletonList(sequence(row(2, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(2, 22, 222)).iterator()); result = table.get(row(22)); assertThat(result).hasSize(2); assertRow(result.get(0), 1, 22, 222); assertRow(result.get(1), 2, 22, 222); - table.refresh( - singletonList(sequence(row(RowKind.DELETE, 2, 22, 222), -1L)).iterator(), false); + table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 222)).iterator()); result = table.get(row(22)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 22, 222); - table.refresh(singletonList(sequence(row(3, 33, 333), -1L)).iterator(), false); + table.refresh(singletonList(row(3, 33, 333)).iterator()); assertThat(table.get(row(33))).hasSize(0); } @@ -443,7 +431,7 @@ public void testNoPrimaryKeyTable() throws Exception { } // add new join key value - table.refresh(singletonList(row(1, 22, 333)).iterator(), false); + table.refresh(singletonList(row(1, 22, 333)).iterator()); List result = table.get(row(22)); assertThat(result.stream().map(row -> row.getInt(0))).contains(1); } @@ -462,16 +450,16 @@ public void testNoPrimaryKeyTableFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); - table.refresh(singletonList(row(1, 11, 333)).iterator(), false); + table.refresh(singletonList(row(1, 11, 333)).iterator()); List result = table.get(row(11)); assertThat(result).hasSize(0); - table.refresh(singletonList(row(1, 11, 111)).iterator(), false); + table.refresh(singletonList(row(1, 11, 111)).iterator()); result = table.get(row(11)); assertThat(result).hasSize(1); assertRow(result.get(0), 1, 11, 111); - table.refresh(singletonList(row(1, 11, 111)).iterator(), false); + table.refresh(singletonList(row(1, 11, 111)).iterator()); result = table.get(row(11)); assertThat(result).hasSize(2); assertRow(result.get(0), 1, 11, 111); @@ -596,10 +584,6 @@ private static InternalRow row(RowKind kind, Object... values) { return row; } - private static InternalRow sequence(InternalRow row, long sequenceNumber) { - return new JoinedRow(row.getRowKind(), row, GenericRow.of(sequenceNumber)); - } - private static void assertRow(InternalRow resultRow, int... expected) { int[] results = new int[expected.length]; for (int i = 0; i < results.length; i++) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 55a67187945b..745e92b680fd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; @@ -117,19 +118,27 @@ private TableRead createRead( RecordReader.RecordIterator, RecordReader.RecordIterator> rowDataIteratorCreator) { + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); + long schemaId = 0; KeyValueFileStoreRead read = new KeyValueFileStoreRead( - LocalFileIO.create(), - new SchemaManager(LocalFileIO.create(), tablePath), - 0, + schemaManager, + schemaId, KEY_TYPE, VALUE_TYPE, COMPARATOR, + null, DeduplicateMergeFunction.factory(), - ignore -> avro, - pathFactory, - EXTRACTOR, - new CoreOptions(new HashMap<>())); + KeyValueFileReaderFactory.builder( + LocalFileIO.create(), + schemaManager, + schemaId, + KEY_TYPE, + VALUE_TYPE, + ignore -> avro, + pathFactory, + EXTRACTOR, + new CoreOptions(new HashMap<>()))); return new KeyValueTableRead(read, null) { @Override @@ -176,6 +185,7 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc KEY_TYPE, VALUE_TYPE, () -> COMPARATOR, + () -> null, () -> EQUALISER, DeduplicateMergeFunction.factory(), pathFactory,