Skip to content

Commit

Permalink
Merge pull request #14 from tongcheng-elong/balance-source
Browse files Browse the repository at this point in the history
Balance MonitorFunction data shuffle and format code
  • Loading branch information
wxplovecc authored Apr 9, 2024
2 parents 5fde536 + e4295a4 commit 6640d48
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,6 @@ public boolean supportDeleteByType() {
return options.get(SUPPORT_DELETE_BY_TYPE);
}


/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void expire() {
id++) {
count++;
if (snapshotManager.snapshotExists(id)
&& (currentMillis - snapshotManager.snapshot(id).timeMillis() <= millisRetained || count>=expireLimit)) {
&& (currentMillis - snapshotManager.snapshot(id).timeMillis() <= millisRetained
|| count >= expireLimit)) {
// within time threshold, can assume that all snapshots after it are also within
// the threshold
expireUntil(earliest, id);
Expand Down Expand Up @@ -166,8 +167,7 @@ public void expireUntil(long earliestId, long endExclusiveId) {
endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId);

if (LOG.isInfoEnabled()) {
LOG.info(
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
LOG.info("Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -197,7 +196,7 @@ public TableWriteImpl<KeyValue> newWrite(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
if(store().options().supportDeleteByType()){
if (store().options().supportDeleteByType()) {
setRowKindByBinlogType(record.row());
}
long sequenceNumber =
Expand All @@ -212,14 +211,14 @@ record -> {
});
}

public void setRowKindByBinlogType(InternalRow row){
public void setRowKindByBinlogType(InternalRow row) {
RowType rowType = schema().logicalRowType();
int index = rowType.getFieldNames().indexOf("binlog_eventtype");
if(index <0){
if (index < 0) {
return;
}
String binlog_eventtype = row.getString(index).toString();
if(binlog_eventtype.equalsIgnoreCase("delete")){
String binlog_eventtype = row.getString(index).toString();
if (binlog_eventtype.equalsIgnoreCase("delete")) {
row.setRowKind(RowKind.DELETE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Result scan(SnapshotReader snapshotReader) {
startupMillis);
return new NoSnapshot();
}
LOG.info(
"get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis);
LOG.info("get startingSnapshot:{}, from timestamp:{}", startingSnapshotId, startupMillis);
return StartingScanner.fromPlan(
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ private void registerMetrics() {
.gauge(
"paimonSnapshotCleanDelayMinuteGauge",
(Gauge<Long>) () -> snapshotCleanDelayMinuteGauge);
getMetricGroup()
.gauge(
"paimonSnapshotGapGauge",
(Gauge<Long>) () -> snapshotGapGauge);
getMetricGroup().gauge("paimonSnapshotGapGauge", (Gauge<Long>) () -> snapshotGapGauge);
getMetricGroup()
.gauge(
"paimonLatestSnapshotIdentify",
Expand All @@ -159,8 +156,7 @@ private void updateMetrics() {
TimeUnit.MILLISECONDS);
snapshotCleanDelayMinuteGauge =
TimeUnit.MINUTES.convert(
latestSnapShot.timeMillis()
- earliestSnapshot.timeMillis(),
latestSnapShot.timeMillis() - earliestSnapshot.timeMillis(),
TimeUnit.MILLISECONDS);
snapshotGapGauge = latestSnapShot.id() - earliestSnapshot.id();
// dedicate recovery use state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
Expand Down Expand Up @@ -232,14 +233,21 @@ public static DataStream<RowData> buildSource(
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark) {
KeySelector<Split, Integer> keySelector =
split -> {
if (((DataSplit) split).partition() != null) {
return Math.abs(((DataSplit) split).partition().hashCode())
+ ((DataSplit) split).bucket();
} else {
return ((DataSplit) split).bucket();
}
};
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> key % numPartitions,
split -> ((DataSplit) split).bucket())
.partitionCustom((key, numPartitions) -> key % numPartitions, keySelector)
.transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.paimon.flink.sink.index;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.catalog.Identifier;
Expand All @@ -30,6 +26,11 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;

import java.io.File;
Expand All @@ -43,13 +44,12 @@
/** Test for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerTest extends TableTestBase {

private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine)
throws Exception {
private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine) throws Exception {
return createAssigner(mergeEngine, false);
}

private GlobalIndexAssigner<RowData> createAssigner(
MergeEngine mergeEngine, boolean enableTtl) throws Exception {
private GlobalIndexAssigner<RowData> createAssigner(MergeEngine mergeEngine, boolean enableTtl)
throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
Expand Down Expand Up @@ -89,8 +89,7 @@ private IOManager ioManager() {
}

private void innerTestBucketAssign(boolean enableTtl) throws Exception {
GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
GlobalIndexAssigner<RowData> assigner = createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
List<Integer> output = new ArrayList<>();
assigner.open(
ioManager(),
Expand Down

0 comments on commit 6640d48

Please sign in to comment.