From c0336b5a954446695881ceb75771903ae96cccb4 Mon Sep 17 00:00:00 2001
From: monster <60029759+MonsterChenzhuo@users.noreply.github.com>
Date: Tue, 27 Aug 2024 13:48:12 +0800
Subject: [PATCH 1/3] [core] Introduce avgCompactionTime metrics (#4062)
---
docs/content/maintenance/metrics.md | 5 ++++
.../apache/paimon/compact/CompactTask.java | 10 ++++++++
.../operation/metrics/CompactionMetrics.java | 25 ++++++++++++++++++-
.../metrics/CompactionMetricsTest.java | 8 +++++-
4 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md
index 203326c407b4..3a221f9037c6 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -249,6 +249,11 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Gauge |
The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time). |
+
+ avgCompactionTime |
+ Gauge |
+ The average runtime of compaction threads, calculated based on recorded compaction time data in milliseconds. The value represents the average duration of compaction operations. Higher values indicate longer average compaction times, which may suggest the need for performance optimization. |
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index dac1005172c2..3e5079c03d73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -47,6 +47,16 @@ public CompactResult call() throws Exception {
try {
long startMillis = System.currentTimeMillis();
CompactResult result = doCompact();
+
+ MetricUtils.safeCall(
+ () -> {
+ if (metricsReporter != null) {
+ metricsReporter.reportCompactionTime(
+ System.currentTimeMillis() - startMillis);
+ }
+ },
+ LOG);
+
if (LOG.isDebugEnabled()) {
logMetric(startMillis, result.before(), result.after());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index 4ca54a6c32d0..dd899e9bf2c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -26,7 +26,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.DoubleStream;
import java.util.stream.LongStream;
@@ -38,16 +40,20 @@ public class CompactionMetrics {
public static final String MAX_LEVEL0_FILE_COUNT = "maxLevel0FileCount";
public static final String AVG_LEVEL0_FILE_COUNT = "avgLevel0FileCount";
public static final String COMPACTION_THREAD_BUSY = "compactionThreadBusy";
+ public static final String AVG_COMPACTION_TIME = "avgCompactionTime";
private static final long BUSY_MEASURE_MILLIS = 60_000;
+ private static final int COMPACTION_TIME_WINDOW = 100;
private final MetricGroup metricGroup;
private final Map reporters;
private final Map compactTimers;
+ private final Queue compactionTimes;
public CompactionMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
this.reporters = new HashMap<>();
this.compactTimers = new ConcurrentHashMap<>();
+ this.compactionTimes = new ConcurrentLinkedQueue<>();
registerGenericCompactionMetrics();
}
@@ -61,7 +67,8 @@ private void registerGenericCompactionMetrics() {
metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().max().orElse(-1));
metricGroup.gauge(
AVG_LEVEL0_FILE_COUNT, () -> getLevel0FileCountStream().average().orElse(-1));
-
+ metricGroup.gauge(
+ AVG_COMPACTION_TIME, () -> getCompactionTimeStream().average().orElse(0.0));
metricGroup.gauge(COMPACTION_THREAD_BUSY, () -> getCompactBusyStream().sum());
}
@@ -74,6 +81,10 @@ private DoubleStream getCompactBusyStream() {
.mapToDouble(t -> 100.0 * t.calculateLength() / BUSY_MEASURE_MILLIS);
}
+ private DoubleStream getCompactionTimeStream() {
+ return compactionTimes.stream().mapToDouble(Long::doubleValue);
+ }
+
public void close() {
metricGroup.close();
}
@@ -85,6 +96,8 @@ public interface Reporter {
void reportLevel0FileCount(long count);
+ void reportCompactionTime(long time);
+
void unregister();
}
@@ -105,6 +118,16 @@ public CompactTimer getCompactTimer() {
ignore -> new CompactTimer(BUSY_MEASURE_MILLIS));
}
+ @Override
+ public void reportCompactionTime(long time) {
+ synchronized (compactionTimes) {
+ compactionTimes.add(time);
+ if (compactionTimes.size() > COMPACTION_TIME_WINDOW) {
+ compactionTimes.poll();
+ }
+ }
+ }
+
@Override
public void reportLevel0FileCount(long count) {
this.level0FileCount = count;
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
index b18250c9e327..2ce1cc4eabd2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -35,7 +35,7 @@ public void testReportMetrics() {
assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(-1L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0);
assertThat(getMetric(metrics, CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
-
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME)).isEqualTo(0.0);
CompactionMetrics.Reporter[] reporters = new CompactionMetrics.Reporter[3];
for (int i = 0; i < reporters.length; i++) {
reporters[i] = metrics.createReporter(BinaryRow.EMPTY_ROW, i);
@@ -54,6 +54,12 @@ public void testReportMetrics() {
reporters[0].reportLevel0FileCount(8);
assertThat(getMetric(metrics, CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(8L);
assertThat(getMetric(metrics, CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(5.0);
+
+ reporters[0].reportCompactionTime(300000);
+ reporters[0].reportCompactionTime(250000);
+ reporters[0].reportCompactionTime(270000);
+ assertThat(getMetric(metrics, CompactionMetrics.AVG_COMPACTION_TIME))
+ .isEqualTo(273333.3333333333);
}
private Object getMetric(CompactionMetrics metrics, String metricName) {
From 98d2617bf0c270c706b61d4fd5c4d26363ece810 Mon Sep 17 00:00:00 2001
From: YeJunHao <41894543+leaves12138@users.noreply.github.com>
Date: Tue, 27 Aug 2024 14:19:26 +0800
Subject: [PATCH 2/3] [arrow] Add ArrowBundleRecords for write. (#4070)
---
.../paimon/arrow/ArrowBundleRecords.java | 55 +++++++++++++++++++
.../arrow/vector/ArrowFormatWriterTest.java | 33 +++++++++++
2 files changed, 88 insertions(+)
create mode 100644 paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
new file mode 100644
index 000000000000..073672c75024
--- /dev/null
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow;
+
+import org.apache.paimon.arrow.reader.ArrowBatchReader;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import java.util.Iterator;
+
+/** Batch records for vector schema root. */
+public class ArrowBundleRecords implements BundleRecords {
+
+ private final VectorSchemaRoot vectorSchemaRoot;
+ private final RowType rowType;
+
+ public ArrowBundleRecords(VectorSchemaRoot vectorSchemaRoot, RowType rowType) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ this.rowType = rowType;
+ }
+
+ public VectorSchemaRoot getVectorSchemaRoot() {
+ return vectorSchemaRoot;
+ }
+
+ @Override
+ public long rowCount() {
+ return vectorSchemaRoot.getRowCount();
+ }
+
+ @Override
+ public Iterator iterator() {
+ ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType);
+ return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
+ }
+}
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index c27d167d3a7b..8642bebe511b 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.arrow.vector;
+import org.apache.paimon.arrow.ArrowBundleRecords;
import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
@@ -157,6 +158,38 @@ public void testReadWithSchemaMessUp() {
}
}
+ @Test
+ public void testArrowBundleRecords() {
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 4096)) {
+ List list = new ArrayList<>();
+ List fieldGetters = new ArrayList<>();
+
+ for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
+ fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
+ }
+ for (int i = 0; i < 1000; i++) {
+ list.add(GenericRow.of(randomRowValues(null)));
+ }
+
+ list.forEach(writer::write);
+
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+ Iterator iterator =
+ new ArrowBundleRecords(vectorSchemaRoot, PRIMITIVE_TYPE).iterator();
+ for (int i = 0; i < 1000; i++) {
+ InternalRow actual = iterator.next();
+ InternalRow expectec = list.get(i);
+
+ for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
+ Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
+ .isEqualTo(fieldGetter.getFieldOrNull(expectec));
+ }
+ }
+ }
+ }
+
@Test
public void testCWriter() {
try (ArrowFormatCWriter writer = new ArrowFormatCWriter(PRIMITIVE_TYPE, 4096)) {
From 482bf3b81af95202951afc03376245d8ed2af8ae Mon Sep 17 00:00:00 2001
From: xuzifu666 <1206332514@qq.com>
Date: Tue, 27 Aug 2024 17:16:07 +0800
Subject: [PATCH 3/3] [core] Support stream read with delay duration (#4067)
---
.../generated/core_configuration.html | 6 +++
.../java/org/apache/paimon/CoreOptions.java | 11 +++++
.../table/source/DataTableStreamScan.java | 28 +++++++++++
.../paimon/table/source/StartupModeTest.java | 48 +++++++++++++++++++
4 files changed, 93 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ee13b427bc57..66bc354c9fba 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -647,6 +647,12 @@
Long |
Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode |
+
+ streaming.read.snapshot.delay |
+ (none) |
+ Duration |
+ The delay duration of stream read when scan incremental snapshots. |
+
scan.tag-name |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c8ee9e4aa881..4c69942b64c5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -658,6 +658,13 @@ public class CoreOptions implements Serializable {
+ "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
+ "We can consider downsize it when we encounter an out of memory exception while scanning");
+ public static final ConfigOption STREAMING_READ_SNAPSHOT_DELAY =
+ key("streaming.read.snapshot.delay")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The delay duration of stream read when scan incremental snapshots.");
+
@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption LOG_CONSISTENCY =
key("log.consistency")
@@ -1846,6 +1853,10 @@ public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
+ public Duration streamingReadDelay() {
+ return options.get(STREAMING_READ_SNAPSHOT_DELAY);
+ }
+
public Integer dynamicBucketInitialBuckets() {
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 11e92f903f35..6c02f5746255 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -67,6 +67,8 @@ public class DataTableStreamScan extends AbstractDataTableScan implements Stream
@Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
+ @Nullable private Long scanDelayMillis;
+
public DataTableStreamScan(
CoreOptions options,
SnapshotReader snapshotReader,
@@ -119,6 +121,9 @@ private void initScanner() {
if (boundedChecker == null) {
boundedChecker = createBoundedChecker();
}
+ if (scanDelayMillis == null) {
+ scanDelayMillis = getScanDelayMillis();
+ }
initialized = true;
}
@@ -177,6 +182,10 @@ private Plan nextPlan() {
throw new EndOfScanException();
}
+ if (shouldDelaySnapshot(nextSnapshotId)) {
+ return SnapshotNotExistPlan.INSTANCE;
+ }
+
// first check changes of overwrite
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE
&& supportStreamingReadOverwrite) {
@@ -198,6 +207,19 @@ private Plan nextPlan() {
}
}
+ private boolean shouldDelaySnapshot(long snapshotId) {
+ if (scanDelayMillis == null) {
+ return false;
+ }
+
+ long snapshotMills = System.currentTimeMillis() - scanDelayMillis;
+ if (snapshotManager.snapshotExists(snapshotId)
+ && snapshotManager.snapshot(snapshotId).timeMillis() > snapshotMills) {
+ return true;
+ }
+ return false;
+ }
+
private FollowUpScanner createFollowUpScanner() {
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
@@ -237,6 +259,12 @@ private BoundedChecker createBoundedChecker() {
: BoundedChecker.neverEnd();
}
+ private Long getScanDelayMillis() {
+ return options.streamingReadDelay() == null
+ ? null
+ : options.streamingReadDelay().toMillis();
+ }
+
@Nullable
@Override
public Long checkpoint() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 470e61292131..7197b9b0fb41 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -228,6 +228,54 @@ public void testStartFromSnapshot() throws Exception {
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
}
+ @Test
+ public void testStartFromSnapshotWithDelayDuration() throws Exception {
+ Map properties = new HashMap<>();
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ properties.put(CoreOptions.STREAMING_READ_SNAPSHOT_DELAY.key(), "5 s");
+ initializeTable(StartupMode.FROM_SNAPSHOT, properties);
+ initializeTestData(); // initialize 3 commits
+
+ // streaming Mode
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ assertThat(firstPlan.splits()).isEmpty();
+
+ Thread.sleep(3000);
+ TableScan.Plan secondPlan = dataTableScan.plan();
+ assertThat(secondPlan.splits()).isEmpty();
+
+ Thread.sleep(5000);
+ TableScan.Plan thirdPlan = dataTableScan.plan();
+
+ assertThat(thirdPlan.splits())
+ .isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
+ }
+
+ @Test
+ public void testStartFromSnapshotWithoutDelayDuration() throws Exception {
+ Map properties = new HashMap<>();
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ initializeTable(StartupMode.FROM_SNAPSHOT, properties);
+ initializeTestData(); // initialize 3 commits
+
+ // streaming Mode
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+
+ long startTime = System.currentTimeMillis();
+ TableScan.Plan secondPlan = dataTableScan.plan();
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // without delay read test
+ assertThat(duration).isLessThan(100);
+
+ assertThat(firstPlan.splits()).isEmpty();
+ assertThat(secondPlan.splits())
+ .isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
+ }
+
@Test
public void testTimeTravelFromExpiredSnapshot() throws Exception {
Map properties = new HashMap<>();