Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 15, 2023
1 parent 57e8dc3 commit dc47462
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@
<td><h5>sequence.auto-padding</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the way of padding precision, if the provided sequence field is used to indicate "time" but doesn't meet the precise.<ul><li>You can specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence field that indicates time with precision of seconds to micro-second.</li><li>3. "millis-to-micro": Pads the sequence field that indicates time with precision of milli-second to micro-second.</li><li>4. Composite pattern: for example, "second-to-micro,row-kind-flag".</li></ul></td>
<td>Specify the way of padding precision, if the provided sequence field is used to indicate "time" but doesn't meet the precise.<ul><li>You can specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence field that indicates time with precision of seconds to micro-second.</li><li>3. "millis-to-micro": Pads the sequence field that indicates time with precision of milli-second to micro-second.</li><li>4. "inc-seq": Pads the sequence field with auto increase sequence.</li><li>5. Composite pattern: for example, "second-to-micro,row-kind-flag".</li></ul></td>
</tr>
<tr>
<td><h5>sequence.field</h5></td>
Expand Down
11 changes: 9 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,9 @@ public class CoreOptions implements Serializable {
text(
"3. \"millis-to-micro\": Pads the sequence field that indicates time with precision of milli-second to micro-second."),
text(
"4. Composite pattern: for example, \"second-to-micro,row-kind-flag\"."))
"4. \"inc-seq\": Pads the sequence field with auto increase sequence."),
text(
"5. Composite pattern: for example, \"second-to-micro,row-kind-flag\"."))
.build());

public static final ConfigOption<StartupMode> SCAN_MODE =
Expand Down Expand Up @@ -1338,6 +1340,10 @@ public List<String> sequenceAutoPadding() {
return Arrays.asList(padding.split(","));
}

public boolean incSeqPadding() {
return SequenceAutoPadding.INC_SEQ.value.equals(options.get(SEQUENCE_AUTO_PADDING));
}

public boolean writeOnly() {
return options.get(WRITE_ONLY);
}
Expand Down Expand Up @@ -1933,7 +1939,8 @@ public enum SequenceAutoPadding implements DescribedEnum {
"Pads the sequence field that indicates time with precision of seconds to micro-second."),
MILLIS_TO_MICRO(
"millis-to-micro",
"Pads the sequence field that indicates time with precision of milli-second to micro-second.");
"Pads the sequence field that indicates time with precision of milli-second to micro-second."),
INC_SEQ("inc-seq", "Pads the sequence field with auto increase sequence.");

private final String value;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.table.sink.SequenceGenerator.INC;
import static org.apache.paimon.table.sink.SequenceGenerator.INC_SEQ_MASK;

/** A {@link RecordWriter} to write records and generate {@link CompactIncrement}. */
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

Expand All @@ -72,10 +75,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final LinkedHashSet<DataFileMeta> compactAfter;
private final LinkedHashSet<DataFileMeta> compactChangelog;

private long newSequenceNumber;
private long nextIncSequenceNumber;
private WriteBuffer writeBuffer;

private WriterMetrics writerMetrics;
private final WriterMetrics writerMetrics;
private final boolean incSeqPadding;

public MergeTreeWriter(
boolean writeBufferSpillable,
Expand All @@ -89,14 +93,22 @@ public MergeTreeWriter(
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
WriterMetrics writerMetrics) {
WriterMetrics writerMetrics,
boolean incSeqPadding) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.ioManager = ioManager;
this.keyType = writerFactory.keyType();
this.valueType = writerFactory.valueType();
this.compactManager = compactManager;
this.newSequenceNumber = maxSequenceNumber + 1;
this.incSeqPadding = incSeqPadding;
if (maxSequenceNumber == -1) {
this.nextIncSequenceNumber = 0;
} else if (incSeqPadding) {
this.nextIncSequenceNumber = (maxSequenceNumber & INC_SEQ_MASK) + INC;
} else {
this.nextIncSequenceNumber = maxSequenceNumber + 1;
}
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
this.writerFactory = writerFactory;
Expand All @@ -121,10 +133,6 @@ public MergeTreeWriter(
this.writerMetrics = writerMetrics;
}

private long newSequenceNumber() {
return newSequenceNumber++;
}

@VisibleForTesting
CompactManager compactManager() {
return compactManager;
Expand All @@ -144,10 +152,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {

@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber =
kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
? newSequenceNumber()
: kv.sequenceNumber();
long sequenceNumber = generateSequenceNumber(kv.sequenceNumber());
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
Expand All @@ -162,6 +167,22 @@ public void write(KeyValue kv) throws Exception {
}
}

private long generateSequenceNumber(long userSeq) {
long res;
if (userSeq == KeyValue.UNKNOWN_SEQUENCE) {
res = nextIncSequenceNumber;
nextIncSequenceNumber += 1;
} else {
if (incSeqPadding) {
res = nextIncSequenceNumber | userSeq;
nextIncSequenceNumber += INC;
} else {
res = userSeq;
}
}
return res;
}

@Override
public void compact(boolean fullCompaction) throws Exception {
flushWriteBuffer(true, fullCompaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ protected MergeTreeWriter createWriter(
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
getWriterMetrics(partition, bucket));
getWriterMetrics(partition, bucket),
options.incSeqPadding());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
/** Generate sequence number. */
public class SequenceGenerator {

public static final long INC_SEQ_MASK = 0xFFFFFFFF00000000L;
public static final long USER_SEQ_MASK = 0xFFFFFFFFL;
public static final long INC = 1L << 32;

private final int index;
private final List<SequenceAutoPadding> paddings;

Expand Down Expand Up @@ -125,6 +129,9 @@ public long generate(InternalRow row) {
case MILLIS_TO_MICRO:
sequence = millisToMicro(sequence);
break;
case INC_SEQ:
sequence = truncateUserSeq(sequence);
break;
default:
throw new UnsupportedOperationException(
"Unknown sequence padding mode " + padding);
Expand Down Expand Up @@ -160,6 +167,12 @@ private static long getCurrentMicroOfSeconds() {
return (currentNanoTime - seconds * 1_000_000_000) / 1000;
}

private long truncateUserSeq(long sequence) {
// For timestamp, only support to second accuracy.
return (fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 : sequence)
& USER_SEQ_MASK;
}

private interface Generator {
long generate(InternalRow row, int i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private MergeTreeWriter createMergeTreeWriter(
options.commitForceCompact(),
ChangelogProducer.NONE,
null,
null);
null,
false);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.CoreOptions.SequenceAutoPadding.INC_SEQ;
import static org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
import static org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO;
Expand Down Expand Up @@ -238,6 +239,23 @@ public void testGenerateWithPaddingRowKind() {
.isBetween(2000L, 3998L);
}

@Test
public void testGenerateWithIncSeq() {
GenericRow genericRow =
GenericRow.of(
1,
Timestamp.fromEpochMillis(4294967295000L) /* max ts 2106-02-07T06:28:15 */);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.TIMESTAMP(0)},
new String[] {"id", "ts"});

assertThat(
new SequenceGenerator("ts", rowType, Collections.singletonList(INC_SEQ))
.generate(genericRow))
.isEqualTo(4294967295L);
}

private SequenceGenerator getGenerator(String field) {
return getGenerator(field, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,24 @@ class DeleteFromTableTest extends PaimonSparkTestBase {
val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
assertThat(rows4.toString).isEqualTo("[]")
}

test(s"test delete with sequence field") {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, ts TIMESTAMP)
|TBLPROPERTIES ('primary-key' = 'id', 'sequence.field' = 'ts', 'sequence.auto-padding' = 'inc-seq')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', CAST('2023-12-15 10:00:00' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (2, 'b', CAST('2023-12-15 10:00:01' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (3, 'c', CAST('2023-12-15 10:00:02' AS TIMESTAMP))")

spark.sql("DELETE FROM T WHERE id = 1")

val rows1 = spark.sql("SELECT * FROM T").collectAsList()
assertThat(rows1.toString).isEqualTo(
"[[2,b,2023-12-15 10:00:01.0], [3,c,2023-12-15 10:00:02.0]]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,24 @@ class UpdateTableTest extends PaimonSparkTestBase {
() => spark.sql("UPDATE T SET s.c2 = 'a_new', s = struct(11, 'a_new') WHERE s.c1 = 1"))
.hasMessageContaining("Conflicting update/insert on attrs: s.c2, s")
}

test(s"test update with sequence field") {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, ts TIMESTAMP)
|TBLPROPERTIES ('primary-key' = 'id', 'sequence.field' = 'ts', 'sequence.auto-padding' = 'inc-seq')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', CAST('2023-12-15 10:00:00' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (2, 'b', CAST('2023-12-15 10:00:01' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (3, 'c', CAST('2023-12-15 10:00:02' AS TIMESTAMP))")

spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1")

val rows1 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
assertThat(rows1.toString).isEqualTo(
"[[1,a_new,2023-12-15 10:00:00.0], [2,b,2023-12-15 10:00:01.0], [3,c,2023-12-15 10:00:02.0]]")
}
}

0 comments on commit dc47462

Please sign in to comment.