Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 14, 2023
1 parent 57e8dc3 commit 9dfcd3a
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,10 @@ public List<String> sequenceAutoPadding() {
return Arrays.asList(padding.split(","));
}

public boolean incSeqPadding() {
return options.get(SEQUENCE_AUTO_PADDING).equals(SequenceAutoPadding.INC_SEQ.value);
}

public boolean writeOnly() {
return options.get(WRITE_ONLY);
}
Expand Down Expand Up @@ -1933,7 +1937,8 @@ public enum SequenceAutoPadding implements DescribedEnum {
"Pads the sequence field that indicates time with precision of seconds to micro-second."),
MILLIS_TO_MICRO(
"millis-to-micro",
"Pads the sequence field that indicates time with precision of milli-second to micro-second.");
"Pads the sequence field that indicates time with precision of milli-second to micro-second."),
INC_SEQ("inc-seq", "Pads the sequence field with auto increase sequence.");

private final String value;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.table.sink.SequenceGenerator.INC;
import static org.apache.paimon.table.sink.SequenceGenerator.INC_SEQ_MASK;

/** A {@link RecordWriter} to write records and generate {@link CompactIncrement}. */
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

Expand All @@ -72,10 +75,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final LinkedHashSet<DataFileMeta> compactAfter;
private final LinkedHashSet<DataFileMeta> compactChangelog;

private long newSequenceNumber;
private long nextIncSequenceNumber;
private WriteBuffer writeBuffer;

private WriterMetrics writerMetrics;
private final WriterMetrics writerMetrics;
private final boolean autoIncSeqPadding;

public MergeTreeWriter(
boolean writeBufferSpillable,
Expand All @@ -89,14 +93,22 @@ public MergeTreeWriter(
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
WriterMetrics writerMetrics) {
WriterMetrics writerMetrics,
boolean autoIncSeqPadding) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.ioManager = ioManager;
this.keyType = writerFactory.keyType();
this.valueType = writerFactory.valueType();
this.compactManager = compactManager;
this.newSequenceNumber = maxSequenceNumber + 1;
this.autoIncSeqPadding = autoIncSeqPadding;
if (maxSequenceNumber == -1) {
this.nextIncSequenceNumber = 0;
} else if (autoIncSeqPadding) {
this.nextIncSequenceNumber = (maxSequenceNumber & INC_SEQ_MASK) + INC;
} else {
this.nextIncSequenceNumber = maxSequenceNumber + 1;
}
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
this.writerFactory = writerFactory;
Expand All @@ -121,10 +133,6 @@ public MergeTreeWriter(
this.writerMetrics = writerMetrics;
}

private long newSequenceNumber() {
return newSequenceNumber++;
}

@VisibleForTesting
CompactManager compactManager() {
return compactManager;
Expand All @@ -144,10 +152,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {

@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber =
kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
? newSequenceNumber()
: kv.sequenceNumber();
long sequenceNumber = generateSequenceNumber(kv.sequenceNumber());
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
Expand All @@ -162,6 +167,22 @@ public void write(KeyValue kv) throws Exception {
}
}

private long generateSequenceNumber(long userSeq) {
long newSequenceNumber;
if (userSeq == KeyValue.UNKNOWN_SEQUENCE) {
newSequenceNumber = nextIncSequenceNumber;
nextIncSequenceNumber += 1;
} else {
if (autoIncSeqPadding) {
newSequenceNumber = nextIncSequenceNumber | userSeq;
nextIncSequenceNumber += INC;
} else {
newSequenceNumber = userSeq;
}
}
return newSequenceNumber;
}

@Override
public void compact(boolean fullCompaction) throws Exception {
flushWriteBuffer(true, fullCompaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ protected MergeTreeWriter createWriter(
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
getWriterMetrics(partition, bucket));
getWriterMetrics(partition, bucket),
options.incSeqPadding());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
/** Generate sequence number. */
public class SequenceGenerator {

public static final long INC_SEQ_MASK = 0xFFFFFFFF00000000L;
public static final long USER_SEQ_MASK = 0xFFFFFFFFL;
public static final long INC = 1L << 32;

private final int index;
private final List<SequenceAutoPadding> paddings;

Expand Down Expand Up @@ -125,6 +129,9 @@ public long generate(InternalRow row) {
case MILLIS_TO_MICRO:
sequence = millisToMicro(sequence);
break;
case INC_SEQ:
sequence = extractUserSeq(sequence);
break;
default:
throw new UnsupportedOperationException(
"Unknown sequence padding mode " + padding);
Expand Down Expand Up @@ -160,6 +167,12 @@ private static long getCurrentMicroOfSeconds() {
return (currentNanoTime - seconds * 1_000_000_000) / 1000;
}

private long extractUserSeq(long sequence) {
// For timestamp, only support to second accuracy.
return (fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 : sequence)
& USER_SEQ_MASK;
}

private interface Generator {
long generate(InternalRow row, int i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private MergeTreeWriter createMergeTreeWriter(
options.commitForceCompact(),
ChangelogProducer.NONE,
null,
null);
null,
false);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,24 @@ class DeleteFromTableTest extends PaimonSparkTestBase {
val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
assertThat(rows4.toString).isEqualTo("[]")
}

test(s"test delete with sequence field") {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, ts TIMESTAMP)
|TBLPROPERTIES ('primary-key' = 'id', 'sequence.field' = 'ts', 'sequence.auto-padding' = 'inc-seq')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', CAST('2023-12-15 10:00:00' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (2, 'b', CAST('2023-12-15 10:00:01' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (3, 'c', CAST('2023-12-15 10:00:02' AS TIMESTAMP))")

spark.sql("DELETE FROM T WHERE id = 1")

val rows1 = spark.sql("SELECT * FROM T").collectAsList()
assertThat(rows1.toString).isEqualTo(
"[[2,b,2023-12-15 10:00:01.0], [3,c,2023-12-15 10:00:02.0]]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,24 @@ class UpdateTableTest extends PaimonSparkTestBase {
() => spark.sql("UPDATE T SET s.c2 = 'a_new', s = struct(11, 'a_new') WHERE s.c1 = 1"))
.hasMessageContaining("Conflicting update/insert on attrs: s.c2, s")
}

test(s"test update with sequence field") {
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, ts TIMESTAMP)
|TBLPROPERTIES ('primary-key' = 'id', 'sequence.field' = 'ts', 'sequence.auto-padding' = 'inc-seq')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', CAST('2023-12-15 10:00:00' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (2, 'b', CAST('2023-12-15 10:00:01' AS TIMESTAMP))")

spark.sql("INSERT INTO T VALUES (3, 'c', CAST('2023-12-15 10:00:02' AS TIMESTAMP))")

spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1")

val rows1 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
assertThat(rows1.toString).isEqualTo(
"[[1,a_new,2023-12-15 10:00:00.0], [2,b,2023-12-15 10:00:01.0], [3,c,2023-12-15 10:00:02.0]]")
}
}

0 comments on commit 9dfcd3a

Please sign in to comment.