Skip to content

Commit

Permalink
[core] fix bug of watermark override with append only table (#3872)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Aug 11, 2024
1 parent 574f162 commit 2ec768f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public void withPartitionPredicate(Predicate predicate) {

public void build() {
// build source from UnawareSourceFunction
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);

// from source, construct the full flink job
sinkFromSource(source);
}

public DataStream<Committable> fetchUncommitted(String commitUser) {
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);

// rebalance input to default or assigned parallelism
DataStream<AppendOnlyCompactionTask> rebalanced = rebalanceInput(source);
Expand All @@ -87,11 +87,11 @@ public DataStream<Committable> fetchUncommitted(String commitUser) {
.doWrite(rebalanced, commitUser, rebalanced.getParallelism());
}

private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean emitMaxWatermark) {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, partitionPredicate);
table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends RichSourceFunction<AppendOnlyCom
private transient AppendOnlyTableCompactionCoordinator compactionCoordinator;
private transient SourceContext<AppendOnlyCompactionTask> ctx;
private volatile boolean isRunning = true;
private final boolean emitMaxWatermark;

public BucketUnawareCompactSource(
FileStoreTable table,
boolean isStreaming,
long scanInterval,
@Nullable Predicate filter) {
@Nullable Predicate filter,
boolean emitMaxWatermark) {
this.table = table;
this.streaming = isStreaming;
this.scanInterval = scanInterval;
this.filter = filter;
this.emitMaxWatermark = emitMaxWatermark;
}

@Override
Expand All @@ -94,6 +98,10 @@ public void run(SourceContext<AppendOnlyCompactionTask> sourceContext) throws Ex
List<AppendOnlyCompactionTask> tasks = compactionCoordinator.run();
isEmpty = tasks.isEmpty();
tasks.forEach(ctx::collect);

if (emitMaxWatermark) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is finished.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
Expand Down Expand Up @@ -208,6 +209,35 @@ public void testCompactionInStreamingMode() throws Exception {
assertThat(rows.size()).isEqualTo(10);
}

@Test
public void testCompactionInStreamingModeWithMaxWatermark() throws Exception {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");

sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
sEnv.executeSql(
"CREATE TEMPORARY TABLE Orders_in (\n"
+ " f0 INT,\n"
+ " f1 STRING,\n"
+ " ts TIMESTAMP(3),\n"
+ "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'number-of-rows' = '10'\n"
+ ")");

assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM Orders_in", 60000);
// ensure data gen finished
Thread.sleep(5000);

Snapshot snapshot = findLatestSnapshot("append_table");
Assertions.assertNotNull(snapshot);
Long watermark = snapshot.watermark();
Assertions.assertNotNull(watermark);
Assertions.assertTrue(watermark > Long.MIN_VALUE);
}

@Test
public void testRejectDelete() {
testRejectChanges(RowKind.DELETE);
Expand Down

0 comments on commit 2ec768f

Please sign in to comment.