From e4295a440d6f76d6631934a37ced3890f121dced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Tue, 9 Apr 2024 14:41:40 +0800 Subject: [PATCH] banlance source --- .../java/org/apache/paimon/CoreOptions.java | 1 - .../paimon/operation/FileStoreExpireImpl.java | 6 +++--- .../table/ChangelogWithKeyFileStoreTable.java | 11 +++++------ .../StaticFromTimestampStartingScanner.java | 3 +-- .../flink/sink/DynamicBucketCompactSink.java | 3 +-- .../paimon/flink/sink/TableWriteOperator.java | 8 ++------ .../source/operator/MonitorFunction.java | 14 +++++++++++--- .../sink/index/GlobalIndexAssignerTest.java | 19 +++++++++---------- 8 files changed, 32 insertions(+), 33 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 2d5c848aac00..026e174c3d9b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1362,7 +1362,6 @@ public boolean supportDeleteByType() { return options.get(SUPPORT_DELETE_BY_TYPE); } - /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java index 0ef1961c78c7..aade3eaa9dcd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java @@ -121,7 +121,8 @@ public void expire() { id++) { count++; if (snapshotManager.snapshotExists(id) - && (currentMillis - snapshotManager.snapshot(id).timeMillis() <= millisRetained || count>=expireLimit)) { + && (currentMillis - snapshotManager.snapshot(id).timeMillis() <= millisRetained + || count >= expireLimit)) { // within time threshold, can assume that all snapshots after it are also within // the threshold expireUntil(earliest, id); @@ -166,8 +167,7 @@ public void expireUntil(long earliestId, long endExclusiveId) { endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId); if (LOG.isInfoEnabled()) { - LOG.info( - "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); + LOG.info("Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } List taggedSnapshots = tagManager.taggedSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java index 1b2862b46226..4f78b473d3a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java @@ -44,7 +44,6 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.ValueContentRowDataRecordIterator; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -197,7 +196,7 @@ public TableWriteImpl newWrite( store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), record -> { - if(store().options().supportDeleteByType()){ + if (store().options().supportDeleteByType()) { setRowKindByBinlogType(record.row()); } long sequenceNumber = @@ -212,14 +211,14 @@ record -> { }); } - public void setRowKindByBinlogType(InternalRow row){ + public void setRowKindByBinlogType(InternalRow row) { RowType rowType = schema().logicalRowType(); int index = rowType.getFieldNames().indexOf("binlog_eventtype"); - if(index <0){ + if (index < 0) { return; } - String binlog_eventtype = row.getString(index).toString(); - if(binlog_eventtype.equalsIgnoreCase("delete")){ + String binlog_eventtype = row.getString(index).toString(); + if (binlog_eventtype.equalsIgnoreCase("delete")) { row.setRowKind(RowKind.DELETE); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java index 60e39f0d1737..833627850a5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java @@ -56,8 +56,7 @@ public Result scan(SnapshotReader snapshotReader) { startupMillis); return new NoSnapshot(); } - LOG.info( - "get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis); + LOG.info("get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis); return StartingScanner.fromPlan( snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java index adef4554e5ff..f92de4449994 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.table.data.RowData; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -27,6 +25,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 4d5ea9dbebb6..11da6574de5d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -138,10 +138,7 @@ private void registerMetrics() { .gauge( "paimonSnapshotCleanDelayMinuteGauge", (Gauge) () -> snapshotCleanDelayMinuteGauge); - getMetricGroup() - .gauge( - "paimonSnapshotGapGauge", - (Gauge) () -> snapshotGapGauge); + getMetricGroup().gauge("paimonSnapshotGapGauge", (Gauge) () -> snapshotGapGauge); getMetricGroup() .gauge( "paimonLatestSnapshotIdentify", @@ -159,8 +156,7 @@ private void updateMetrics() { TimeUnit.MILLISECONDS); snapshotCleanDelayMinuteGauge = TimeUnit.MINUTES.convert( - latestSnapShot.timeMillis() - - earliestSnapshot.timeMillis(), + latestSnapShot.timeMillis() - earliestSnapshot.timeMillis(), TimeUnit.MILLISECONDS); snapshotGapGauge = latestSnapShot.id() - earliestSnapshot.id(); // dedicate recovery use state diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 2905d8a3041f..b5e2cef39c98 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -232,14 +233,21 @@ public static DataStream buildSource( ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) { + KeySelector keySelector = + split -> { + if (((DataSplit) split).partition() != null) { + return Math.abs(((DataSplit) split).partition().hashCode()) + + ((DataSplit) split).bucket(); + } else { + return ((DataSplit) split).bucket(); + } + }; return env.addSource( new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark), name + "-Monitor", new JavaTypeInfo<>(Split.class)) .forceNonParallel() - .partitionCustom( - (key, numPartitions) -> key % numPartitions, - split -> ((DataSplit) split).bucket()) + .partitionCustom((key, numPartitions) -> key % numPartitions, keySelector) .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder)); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java index f50214c1447d..9d6e68b7ba4d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java @@ -18,10 +18,6 @@ package org.apache.paimon.flink.sink.index; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.types.RowKind; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.catalog.Identifier; @@ -30,6 +26,11 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.Test; import java.io.File; @@ -43,13 +44,12 @@ /** Test for {@link GlobalIndexAssigner}. */ public class GlobalIndexAssignerTest extends TableTestBase { - private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine) - throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine) throws Exception { return createAssigner(mergeEngine, false); } - private GlobalIndexAssigner createAssigner( - MergeEngine mergeEngine, boolean enableTtl) throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine, boolean enableTtl) + throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(CoreOptions.MERGE_ENGINE, mergeEngine); @@ -89,8 +89,7 @@ private IOManager ioManager() { } private void innerTestBucketAssign(boolean enableTtl) throws Exception { - GlobalIndexAssigner assigner = - createAssigner(MergeEngine.DEDUPLICATE, enableTtl); + GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, enableTtl); List output = new ArrayList<>(); assigner.open( ioManager(),