Skip to content

Commit

Permalink
[core] Redefine user defined sequence fields (#2945)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 6, 2024
1 parent c1abbac commit 1502e83
Show file tree
Hide file tree
Showing 33 changed files with 483 additions and 803 deletions.
24 changes: 5 additions & 19 deletions docs/content/concepts/primary-key-table/sequence-rowkind.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,18 @@ CREATE TABLE my_table (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT,
dt TIMESTAMP
update_time TIMESTAMP
) WITH (
'sequence.field' = 'dt'
'sequence.field' = 'update_time'
);
```
{{< /tab >}}
{{< /tabs >}}

The record with the largest `sequence.field` value will be the last to merge, regardless of the input order.
The record with the largest `sequence.field` value will be the last to merge, if the values are the same, the input
order will be used to determine which one is the last one.

**Sequence Auto Padding**:

When the record is updated or deleted, the `sequence.field` must become larger and cannot remain unchanged.
For -U and +U, their sequence-fields must be different. If you cannot meet this requirement, Paimon provides
option to automatically pad the sequence field for you.

1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value for -U and +U, just like "`op_ts`"
(the time that the change was made in the database) in Mysql Binlog. It is recommended to use the automatic
padding for row kind flag, which will automatically distinguish between -U (-D) and +U (+I).

2. Insufficient precision: If the provided `sequence.field` doesn't meet the precision, like a rough second or
millisecond, you can set `sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the precision
of sequence number will be made up to microsecond by incremental id (Calculate within a single bucket).

3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add the micro to the second, and then
pad the row kind flag.
You can define multiple fields for `sequence.field`, for example `'update_time,flag'`, multiple fields will be compared in order.

## Row Kind Field

Expand Down
6 changes: 0 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,6 @@
<td>Long</td>
<td>Optional timestamp used in case of "from-timestamp" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen.</td>
</tr>
<tr>
<td><h5>sequence.auto-padding</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the way of padding precision, if the provided sequence field is used to indicate "time" but doesn't meet the precise.<ul><li>You can specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence field that indicates time with precision of seconds to micro-second.</li><li>3. "millis-to-micro": Pads the sequence field that indicates time with precision of milli-second to micro-second.</li><li>4. Composite pattern: for example, "second-to-micro,row-kind-flag".</li></ul></td>
</tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
32 changes: 2 additions & 30 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,26 +486,6 @@ public class CoreOptions implements Serializable {
"The field that generates the row kind for primary key table,"
+ " the row kind determines which data is '+I', '-U', '+U' or '-D'.");

public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Specify the way of padding precision, if the provided sequence field is used to indicate \"time\" but doesn't meet the precise.")
.list(
text("You can specific:"),
text(
"1. \"row-kind-flag\": Pads a bit flag to indicate whether it is retract (0) or add (1) message."),
text(
"2. \"second-to-micro\": Pads the sequence field that indicates time with precision of seconds to micro-second."),
text(
"3. \"millis-to-micro\": Pads the sequence field that indicates time with precision of milli-second to micro-second."),
text(
"4. Composite pattern: for example, \"second-to-micro,row-kind-flag\"."))
.build());

public static final ConfigOption<StartupMode> SCAN_MODE =
key("scan.mode")
.enumType(StartupMode.class)
Expand Down Expand Up @@ -1486,22 +1466,14 @@ public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}

public Optional<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
public Optional<List<String>> sequenceField() {
return options.getOptional(SEQUENCE_FIELD).map(s -> Arrays.asList(s.split(",")));
}

public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}

public List<String> sequenceAutoPadding() {
String padding = options.get(SEQUENCE_AUTO_PADDING);
if (padding == null) {
return Collections.emptyList();
}
return Arrays.asList(padding.split(","));
}

public boolean writeOnly() {
return options.get(WRITE_ONLY);
}
Expand Down
23 changes: 23 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ public int getFieldIndex(String fieldName) {
return -1;
}

public boolean containsField(String fieldName) {
for (DataField field : fields) {
if (field.name().equals(fieldName)) {
return true;
}
}
return false;
}

public boolean notContainsField(String fieldName) {
return !containsField(fieldName);
}

public DataField getField(String fieldName) {
for (DataField field : fields) {
if (field.name().equals(fieldName)) {
return field;
}
}

throw new RuntimeException("Cannot find field: " + fieldName);
}

@Override
public DataType copy(boolean isNullable) {
return new RowType(
Expand Down
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;

import javax.annotation.Nullable;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -123,6 +127,7 @@ public KeyValueFileStoreRead newRead() {
keyType,
valueType,
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder());
}
Expand All @@ -140,6 +145,11 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
options);
}

@Nullable
private FieldsComparator userDefinedSeqComparator() {
return UserDefinedSeqComparator.create(valueType, options);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
Expand All @@ -159,6 +169,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
keyType,
valueType,
keyComparatorSupplier,
this::userDefinedSeqComparator,
valueEqualiserSupplier,
mfFactory,
pathFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static RecordReader<KeyValue> readerForMergeTree(
boolean dropDelete,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunction<KeyValue> mergeFunction,
MergeSorter mergeSorter)
throws IOException {
Expand All @@ -58,7 +59,7 @@ public static RecordReader<KeyValue> readerForMergeTree(
section,
readerFactory,
userKeyComparator,
null,
userDefinedSeqComparator,
new ReducerMergeFunctionWrapper(mergeFunction),
mergeSorter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.RecordWriter;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final KeyValueFileWriterFactory writerFactory;
private final boolean commitForceCompact;
private final ChangelogProducer changelogProducer;
@Nullable private final SequenceGenerator sequenceGenerator;
@Nullable private final FieldsComparator userDefinedSeqComparator;

private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
Expand All @@ -90,7 +90,7 @@ public MergeTreeWriter(
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
@Nullable SequenceGenerator sequenceGenerator) {
@Nullable FieldsComparator userDefinedSeqComparator) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
Expand All @@ -104,7 +104,7 @@ public MergeTreeWriter(
this.writerFactory = writerFactory;
this.commitForceCompact = commitForceCompact;
this.changelogProducer = changelogProducer;
this.sequenceGenerator = sequenceGenerator;
this.userDefinedSeqComparator = userDefinedSeqComparator;

this.newFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
Expand Down Expand Up @@ -138,6 +138,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
new SortBufferWriteBuffer(
keyType,
valueType,
userDefinedSeqComparator,
memoryPool,
writeBufferSpillable,
sortMaxFan,
Expand All @@ -148,9 +149,6 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
if (sequenceGenerator != null) {
sequenceNumber = sequenceGenerator.generateWithPadding(kv.value(), sequenceNumber);
}
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.MutableObjectIterator;

import javax.annotation.Nullable;
Expand All @@ -61,6 +63,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
public SortBufferWriteBuffer(
RowType keyType,
RowType valueType,
@Nullable FieldsComparator userDefinedSeqComparator,
MemorySegmentPool memoryPool,
boolean spillable,
int sortMaxFan,
Expand All @@ -70,17 +73,33 @@ public SortBufferWriteBuffer(
this.valueType = valueType;
this.serializer = new KeyValueSerializer(keyType, valueType);

// user key + sequenceNumber
List<DataType> sortKeyTypes = new ArrayList<>(keyType.getFieldTypes());
sortKeyTypes.add(new BigIntType(false));
// key fields
IntStream sortFields = IntStream.range(0, keyType.getFieldCount());

// user define sequence fields
if (userDefinedSeqComparator != null) {
IntStream udsFields =
IntStream.of(userDefinedSeqComparator.compareFields())
.map(operand -> operand + keyType.getFieldCount() + 2);
sortFields = IntStream.concat(sortFields, udsFields);
}

// sequence field
sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));

int[] sortFieldArray = sortFields.toArray();

// row type
List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());
fieldTypes.add(new BigIntType(false));
fieldTypes.add(new TinyIntType(false));
fieldTypes.addAll(valueType.getFieldTypes());

// for sort binary buffer
int[] sortFields = IntStream.range(0, sortKeyTypes.size()).toArray();
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(
sortKeyTypes, sortFields, "MemTableKeyComputer");
fieldTypes, sortFieldArray, "MemTableKeyComputer");
RecordComparator keyComparator =
CodeGenUtils.newRecordComparator(sortKeyTypes, sortFields, "MemTableComparator");
CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, "MemTableComparator");

if (memoryPool.freePages() < 3) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.utils.FieldsComparator;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -51,9 +54,16 @@ public ChangelogMergeTreeRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter) {
super(readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter);
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
}
Expand Down Expand Up @@ -112,7 +122,7 @@ private CompactResult rewriteChangelogCompaction(
section,
readerFactory,
keyComparator,
null,
userDefinedSeqComparator,
createMergeWrapper(outputLevel),
mergeSorter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
Expand All @@ -48,6 +51,7 @@ public FullChangelogMergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
Expand All @@ -58,6 +62,7 @@ public FullChangelogMergeTreeCompactRewriter(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter);
this.valueEqualiser = valueEqualiser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -54,6 +57,7 @@ public LookupMergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory) {
Expand All @@ -63,6 +67,7 @@ public LookupMergeTreeCompactRewriter(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter);
this.lookupLevels = lookupLevels;
Expand Down
Loading

0 comments on commit 1502e83

Please sign in to comment.