From c20e1a6d3d74f2e9c29d966bf74f831617df5e5a Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 18 Nov 2024 11:37:42 +0800 Subject: [PATCH] [flink] Replace legacy SourceFunction with v2 Source --- .../compact/MultiAwareBucketTableScan.java | 12 +- .../flink/compact/MultiTableScanBase.java | 14 +- .../compact/MultiUnawareBucketTableScan.java | 12 +- .../UnawareBucketCompactionTopoBuilder.java | 2 +- .../flink/service/QueryFileMonitor.java | 89 ++++--- .../source/AbstractNonCoordinatedSource.java | 50 ++++ .../AbstractNonCoordinatedSourceReader.java | 51 ++++ .../source/BucketUnawareCompactSource.java | 113 +++++---- .../CombinedTableCompactorSourceBuilder.java | 16 +- .../flink/source/FlinkSourceBuilder.java | 4 +- .../paimon/flink/source/NoOpEnumState.java | 22 ++ .../flink/source/NoOpEnumStateSerializer.java | 41 ++++ .../paimon/flink/source/NoOpEnumerator.java | 54 +++++ .../flink/source/SimpleSourceSplit.java | 39 ++++ .../source/SimpleSourceSplitSerializer.java | 45 ++++ ...ion.java => CombinedAwareBatchSource.java} | 86 +++---- ...java => CombinedAwareStreamingSource.java} | 88 +++---- ...tion.java => CombinedCompactorSource.java} | 37 +-- ...n.java => CombinedUnawareBatchSource.java} | 93 ++++---- ...va => CombinedUnawareStreamingSource.java} | 92 ++++---- ...onitorFunction.java => MonitorSource.java} | 221 ++++++++---------- .../operator/MultiTablesReadOperator.java | 5 +- .../MultiUnawareTablesReadOperator.java | 2 +- .../flink/source/operator/ReadOperator.java | 4 +- .../source/operator/OperatorSourceTest.java | 89 +++++-- .../operator/TestingSourceOperator.java | 164 +++++++++++++ 26 files changed, 948 insertions(+), 497 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSource.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSourceReader.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{CombinedAwareBatchSourceFunction.java => CombinedAwareBatchSource.java} (66%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{CombinedAwareStreamingSourceFunction.java => CombinedAwareStreamingSource.java} (65%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{CombinedCompactorSourceFunction.java => CombinedCompactorSource.java} (70%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{CombinedUnawareBatchSourceFunction.java => CombinedUnawareBatchSource.java} (71%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{CombinedUnawareStreamingSourceFunction.java => CombinedUnawareStreamingSource.java} (59%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/{MonitorFunction.java => MonitorSource.java} (52%) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java index 747995d20d675..88730132ef68c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java @@ -32,7 +32,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -52,15 +51,8 @@ public MultiAwareBucketTableScan( Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, - boolean isStreaming, - AtomicBoolean isRunning) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + boolean isStreaming) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming); tablesMap = new HashMap<>(); scansMap = new HashMap<>(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java index bd4ffe83a4ca0..f5940740b691c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java @@ -26,12 +26,11 @@ import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.api.connector.source.ReaderOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; @@ -57,7 +56,6 @@ public abstract class MultiTableScanBase implements AutoCloseable { protected transient Catalog catalog; - protected AtomicBoolean isRunning; protected boolean isStreaming; public MultiTableScanBase( @@ -65,14 +63,12 @@ public MultiTableScanBase( Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, - boolean isStreaming, - AtomicBoolean isRunning) { + boolean isStreaming) { catalog = catalogLoader.load(); this.includingPattern = includingPattern; this.excludingPattern = excludingPattern; this.databasePattern = databasePattern; - this.isRunning = isRunning; this.isStreaming = isStreaming; } @@ -104,13 +100,9 @@ protected void updateTableMap() } } - public ScanResult scanTable(SourceFunction.SourceContext ctx) + public ScanResult scanTable(ReaderOutput ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { try { - if (!isRunning.get()) { - return ScanResult.FINISHED; - } - updateTableMap(); List tasks = doScan(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java index 56bf971240e73..da86b93af5120 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; /** @@ -46,15 +45,8 @@ public MultiUnawareBucketTableScan( Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, - boolean isStreaming, - AtomicBoolean isRunning) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + boolean isStreaming) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming); tablesMap = new HashMap<>(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java index 8c6ed4c9f59e1..a572354e89845 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java @@ -126,7 +126,7 @@ private DataStreamSource buildSource() { new BucketUnawareCompactSource( table, isContinuous, scanInterval, partitionPredicate); - return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier); + return BucketUnawareCompactSource.buildSource(env, source, tableIdentifier); } private void sinkFromSource(DataStreamSource input) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java index 724a9201710b4..f0c17f85eceb2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java @@ -21,6 +21,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.utils.InternalTypeInfo; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; @@ -31,10 +34,14 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.system.FileMonitorTable; -import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; @@ -49,19 +56,13 @@ *
  • Assigning them to downstream tasks for further processing. * */ -public class QueryFileMonitor extends RichSourceFunction { +public class QueryFileMonitor extends AbstractNonCoordinatedSource { private static final long serialVersionUID = 1L; private final Table table; private final long monitorInterval; - private transient SourceContext ctx; - private transient StreamTableScan scan; - private transient TableRead read; - - private volatile boolean isRunning = true; - public QueryFileMonitor(Table table) { this.table = table; this.monitorInterval = @@ -71,55 +72,53 @@ public QueryFileMonitor(Table table) { } @Override - public void open(OpenContext openContext) throws Exception { - FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table); - ReadBuilder readBuilder = monitorTable.newReadBuilder(); - this.scan = readBuilder.newStreamScan(); - this.read = readBuilder.newRead(); + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; } @Override - public void run(SourceContext ctx) throws Exception { - this.ctx = ctx; - while (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - isEmpty = doScan(); - } + public SourceReader createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); + } + + private class Reader extends AbstractNonCoordinatedSourceReader { + private transient StreamTableScan scan; + private transient TableRead read; + + @Override + public void start() { + FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table); + ReadBuilder readBuilder = monitorTable.newReadBuilder(); + this.scan = readBuilder.newStreamScan(); + this.read = readBuilder.newRead(); + } + + @Override + public InputStatus pollNext(ReaderOutput readerOutput) throws Exception { + boolean isEmpty = doScan(readerOutput); if (isEmpty) { Thread.sleep(monitorInterval); } + return InputStatus.MORE_AVAILABLE; } - } - private boolean doScan() throws Exception { - List records = new ArrayList<>(); - read.createReader(scan.plan()).forEachRemaining(records::add); - records.forEach(ctx::collect); - return records.isEmpty(); - } - - @Override - public void cancel() { - // this is to cover the case where cancel() is called before the run() - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning = false; - } - } else { - isRunning = false; + private boolean doScan(ReaderOutput readerOutput) throws Exception { + List records = new ArrayList<>(); + read.createReader(scan.plan()).forEachRemaining(records::add); + records.forEach(readerOutput::collect); + return records.isEmpty(); } } public static DataStream build(StreamExecutionEnvironment env, Table table) { - return env.addSource( - new QueryFileMonitor(table), - "FileMonitor-" + table.name(), - InternalTypeInfo.fromRowType(FileMonitorTable.getRowType())); + return env.fromSource( + new QueryFileMonitor(table), + WatermarkStrategy.noWatermarks(), + "FileMonitor-" + table.name(), + InternalTypeInfo.fromRowType(FileMonitorTable.getRowType())) + .setParallelism(1); } public static ChannelComputer createChannelComputer() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSource.java new file mode 100644 index 0000000000000..a9a389e837a21 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSource.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** {@link Source} that does not require coordination between JobManager and TaskManagers. */ +public abstract class AbstractNonCoordinatedSource + implements Source { + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + return new NoOpEnumerator<>(); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, NoOpEnumState checkpoint) { + return new NoOpEnumerator<>(); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new SimpleSourceSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new NoOpEnumStateSerializer(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSourceReader.java new file mode 100644 index 0000000000000..18c278868ffae --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractNonCoordinatedSourceReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.api.connector.source.SourceReader; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Abstract {@link SourceReader} for {@link AbstractNonCoordinatedSource}. */ +public abstract class AbstractNonCoordinatedSourceReader + implements SourceReader { + @Override + public void start() {} + + @Override + public List snapshotState(long l) { + return Collections.emptyList(); + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List list) {} + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void close() throws Exception {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java index 787e608dbaa7b..e588fef0fb79f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java @@ -24,14 +24,16 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.EndOfScanException; -import org.apache.paimon.utils.Preconditions; -import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,15 +42,16 @@ import java.util.List; /** - * Source Function for unaware-bucket Compaction. + * Source for unaware-bucket Compaction. * - *

    Note: The function is the source function of unaware-bucket compactor coordinator. It will - * read the latest snapshot continuously by compactionCoordinator, and generate new compaction - * tasks. The source function is used in unaware-bucket compaction job (both stand-alone and - * write-combined). Besides, we don't need to save state in this function, it will invoke a full - * scan when starting up, and scan continuously for the following snapshot. + *

    Note: The function is the source of unaware-bucket compactor coordinator. It will read the + * latest snapshot continuously by compactionCoordinator, and generate new compaction tasks. The + * source is used in unaware-bucket compaction job (both stand-alone and write-combined). Besides, + * we don't need to save state in this source, it will invoke a full scan when starting up, and scan + * continuously for the following snapshot. */ -public class BucketUnawareCompactSource extends RichSourceFunction { +public class BucketUnawareCompactSource + extends AbstractNonCoordinatedSource { private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class); private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator"; @@ -57,9 +60,6 @@ public class BucketUnawareCompactSource extends RichSourceFunction ctx; - private volatile boolean isRunning = true; public BucketUnawareCompactSource( FileStoreTable table, @@ -73,66 +73,65 @@ public BucketUnawareCompactSource( } @Override - public void open(OpenContext openContext) throws Exception { - compactionCoordinator = - new UnawareAppendTableCompactionCoordinator(table, streaming, filter); + public Boundedness getBoundedness() { + return streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader( + SourceReaderContext readerContext) throws Exception { Preconditions.checkArgument( - this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() == 1, + readerContext.currentParallelism() == 1, "Compaction Operator parallelism in paimon MUST be one."); + return new BucketUnawareCompactSourceReader(table, streaming, filter, scanInterval); } - @Override - public void run(SourceContext sourceContext) throws Exception { - this.ctx = sourceContext; - while (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - // do scan and plan action, emit append-only compaction tasks. - List tasks = compactionCoordinator.run(); - isEmpty = tasks.isEmpty(); - tasks.forEach(ctx::collect); - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; - } - } + /** BucketUnawareCompactSourceReader. */ + public static class BucketUnawareCompactSourceReader + extends AbstractNonCoordinatedSourceReader { + private final UnawareAppendTableCompactionCoordinator compactionCoordinator; + private final long scanInterval; + private long lastFetchTimeMillis = 0L; - if (isEmpty) { - Thread.sleep(scanInterval); - } + public BucketUnawareCompactSourceReader( + FileStoreTable table, boolean streaming, Predicate filter, long scanInterval) { + this.scanInterval = scanInterval; + compactionCoordinator = + new UnawareAppendTableCompactionCoordinator(table, streaming, filter); } - } - @Override - public void cancel() { - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning = false; + @Override + public InputStatus pollNext(ReaderOutput readerOutput) + throws Exception { + long sleepTimeMillis = scanInterval - System.currentTimeMillis() + lastFetchTimeMillis; + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + } + + try { + // do scan and plan action, emit append-only compaction tasks. + List tasks = compactionCoordinator.run(); + tasks.forEach(readerOutput::collect); + return InputStatus.MORE_AVAILABLE; + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return InputStatus.END_OF_INPUT; + } finally { + lastFetchTimeMillis = System.currentTimeMillis(); } - } else { - isRunning = false; } } public static DataStreamSource buildSource( StreamExecutionEnvironment env, BucketUnawareCompactSource source, - boolean streaming, String tableIdentifier) { - final StreamSource sourceOperator = - new StreamSource<>(source); return (DataStreamSource) - new DataStreamSource<>( - env, - new CompactionTaskTypeInfo(), - sourceOperator, - false, + env.fromSource( + source, + WatermarkStrategy.noWatermarks(), COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier, - streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED) + new CompactionTaskTypeInfo()) .setParallelism(1) .setMaxParallelism(1); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java index e5cbbe845ceb0..415eddb037dfe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java @@ -21,10 +21,10 @@ import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.LogicalTypeConversion; -import org.apache.paimon.flink.source.operator.CombinedAwareBatchSourceFunction; -import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSourceFunction; -import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSourceFunction; -import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedAwareBatchSource; +import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSource; +import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSource; +import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSource; import org.apache.paimon.table.system.CompactBucketsTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; @@ -87,7 +87,7 @@ public DataStream buildAwareBucketTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); RowType produceType = CompactBucketsTable.getRowType(); if (isContinuous) { - return CombinedAwareStreamingSourceFunction.buildSource( + return CombinedAwareStreamingSource.buildSource( env, "Combine-MultiBucketTables--StreamingCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), @@ -97,7 +97,7 @@ public DataStream buildAwareBucketTableSource() { databasePattern, monitorInterval); } else { - return CombinedAwareBatchSourceFunction.buildSource( + return CombinedAwareBatchSource.buildSource( env, "Combine-MultiBucketTables-BatchCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), @@ -112,7 +112,7 @@ public DataStream buildAwareBucketTableSource() { public DataStream buildForUnawareBucketsTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); if (isContinuous) { - return CombinedUnawareStreamingSourceFunction.buildSource( + return CombinedUnawareStreamingSource.buildSource( env, "Combined-UnawareBucketTables-StreamingCompactorSource", catalogLoader, @@ -121,7 +121,7 @@ public DataStream buildForUnawareBucketsT databasePattern, monitorInterval); } else { - return CombinedUnawareBatchSourceFunction.buildSource( + return CombinedUnawareBatchSource.buildSource( env, "Combined-UnawareBucketTables-BatchCompactorSource", catalogLoader, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3035f8da0d4ab..625c80b85b2a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -26,7 +26,7 @@ import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; -import org.apache.paimon.flink.source.operator.MonitorFunction; +import org.apache.paimon.flink.source.operator.MonitorSource; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -293,7 +293,7 @@ private DataStream buildContinuousStreamOperator() { "Cannot limit streaming source, please use batch execution mode."); } dataStream = - MonitorFunction.buildSource( + MonitorSource.buildSource( env, sourceName, produceTypeInfo(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java new file mode 100644 index 0000000000000..f07317c155aa5 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +/** The enumerator state class for {@link NoOpEnumerator}. */ +public class NoOpEnumState {} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java new file mode 100644 index 0000000000000..89c0ad6ac1f10 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** {@link SimpleVersionedSerializer} for {@link NoOpEnumState}. */ +public class NoOpEnumStateSerializer implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(NoOpEnumState obj) throws IOException { + return new byte[0]; + } + + @Override + public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException { + return new NoOpEnumState(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java new file mode 100644 index 0000000000000..f29c6d6db76dd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * A {@link SplitEnumerator} that provides no functionality. It is basically used for sources that + * does not require a coordinator. + */ +public class NoOpEnumerator + implements SplitEnumerator { + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + @Override + public void addReader(int subtaskId) {} + + @Override + public NoOpEnumState snapshotState(long checkpointId) throws Exception { + return new NoOpEnumState(); + } + + @Override + public void close() throws IOException {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java new file mode 100644 index 0000000000000..92d5e4a436d5a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.api.connector.source.SourceSplit; + +/** A {@link SourceSplit} that provides basic information through splitId. */ +public class SimpleSourceSplit implements SourceSplit { + private final String splitId; + + public SimpleSourceSplit() { + this(""); + } + + public SimpleSourceSplit(String splitId) { + this.splitId = splitId; + } + + @Override + public String splitId() { + return splitId; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java new file mode 100644 index 0000000000000..234c8fb3f82e6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** {@link SimpleVersionedSerializer} for {@link SimpleSourceSplit}. */ +public class SimpleSourceSplitSerializer implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(SimpleSourceSplit obj) throws IOException { + String splitId = obj.splitId(); + return splitId == null ? new byte[0] : splitId.getBytes(); + } + + @Override + public SimpleSourceSplit deserialize(int version, byte[] serialized) throws IOException { + return serialized.length == 0 + ? new SimpleSourceSplit() + : new SimpleSourceSplit(new String(serialized)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java similarity index 66% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java index d7b8da1ec8d8e..7df2b37fcb00b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java @@ -21,20 +21,23 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; import org.apache.paimon.flink.compact.MultiTableScanBase; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; -import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.table.data.RowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,15 +49,11 @@ import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; /** It is responsible for monitoring compactor source of aware bucket table in batch mode. */ -public class CombinedAwareBatchSourceFunction - extends CombinedCompactorSourceFunction> { +public class CombinedAwareBatchSource extends CombinedCompactorSource> { - private static final Logger LOGGER = - LoggerFactory.getLogger(CombinedAwareBatchSourceFunction.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CombinedAwareBatchSource.class); - private MultiTableScanBase> tableScan; - - public CombinedAwareBatchSourceFunction( + public CombinedAwareBatchSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -63,24 +62,33 @@ public CombinedAwareBatchSourceFunction( } @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - tableScan = - new MultiAwareBucketTableScan( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + public SourceReader, SimpleSourceSplit> createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); } - @Override - void scanTable() throws Exception { - if (isRunning.get()) { - MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + private class Reader extends AbstractNonCoordinatedSourceReader> { + + private MultiTableScanBase> tableScan; + + @Override + public void start() { + super.start(); + tableScan = + new MultiAwareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming); + } + + @Override + public InputStatus pollNext(ReaderOutput> readerOutput) + throws Exception { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput); if (scanResult == FINISHED) { - return; + return InputStatus.END_OF_INPUT; } if (scanResult == IS_EMPTY) { // Currently, in the combined mode, there are two scan tasks for the table of two @@ -89,6 +97,15 @@ void scanTable() throws Exception { // should not be thrown exception here. LOGGER.info("No file were collected for the table of aware-bucket"); } + return InputStatus.END_OF_INPUT; + } + + @Override + public void close() throws Exception { + super.close(); + if (tableScan != null) { + tableScan.close(); + } } } @@ -101,15 +118,14 @@ public static DataStream buildSource( Pattern excludingPattern, Pattern databasePattern, Duration partitionIdleTime) { - CombinedAwareBatchSourceFunction function = - new CombinedAwareBatchSourceFunction( + CombinedAwareBatchSource source = + new CombinedAwareBatchSource( catalogLoader, includingPattern, excludingPattern, databasePattern); - StreamSource, ?> sourceOperator = new StreamSource<>(function); TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo<>( new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO); - return new DataStreamSource<>( - env, tupleTypeInfo, sourceOperator, false, name, Boundedness.BOUNDED) + + return env.fromSource(source, WatermarkStrategy.noWatermarks(), name, tupleTypeInfo) .forceNonParallel() .partitionCustom( (key, numPartitions) -> key % numPartitions, @@ -119,12 +135,4 @@ public static DataStream buildSource( typeInfo, new MultiTablesReadOperator(catalogLoader, false, partitionIdleTime)); } - - @Override - public void close() throws Exception { - super.close(); - if (tableScan != null) { - tableScan.close(); - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java similarity index 65% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java index b550ea4e25db1..9bd4a84f571c3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java @@ -21,20 +21,23 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; import org.apache.paimon.flink.compact.MultiTableScanBase; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; -import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.table.data.RowData; import java.util.regex.Pattern; @@ -43,13 +46,11 @@ import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; /** It is responsible for monitoring compactor source of multi bucket table in stream mode. */ -public class CombinedAwareStreamingSourceFunction - extends CombinedCompactorSourceFunction> { +public class CombinedAwareStreamingSource extends CombinedCompactorSource> { private final long monitorInterval; - private transient MultiTableScanBase> tableScan; - public CombinedAwareStreamingSourceFunction( + public CombinedAwareStreamingSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -60,29 +61,45 @@ public CombinedAwareStreamingSourceFunction( } @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - tableScan = - new MultiAwareBucketTableScan( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + public SourceReader, SimpleSourceSplit> createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); } - @SuppressWarnings("BusyWait") - @Override - void scanTable() throws Exception { - while (isRunning.get()) { - MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + private class Reader extends AbstractNonCoordinatedSourceReader> { + private transient MultiTableScanBase> tableScan; + + @Override + public void start() { + super.start(); + tableScan = + new MultiAwareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming); + } + + @Override + public InputStatus pollNext(ReaderOutput> readerOutput) + throws Exception { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput); if (scanResult == FINISHED) { - return; + return InputStatus.END_OF_INPUT; } if (scanResult == IS_EMPTY) { Thread.sleep(monitorInterval); } + return InputStatus.MORE_AVAILABLE; + } + + @Override + public void close() throws Exception { + super.close(); + if (tableScan != null) { + tableScan.close(); + } } } @@ -96,37 +113,22 @@ public static DataStream buildSource( Pattern databasePattern, long monitorInterval) { - CombinedAwareStreamingSourceFunction function = - new CombinedAwareStreamingSourceFunction( + CombinedAwareStreamingSource source = + new CombinedAwareStreamingSource( catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval); - StreamSource, ?> sourceOperator = new StreamSource<>(function); - boolean isParallel = false; TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo<>( new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO); - return new DataStreamSource<>( - env, - tupleTypeInfo, - sourceOperator, - isParallel, - name, - Boundedness.CONTINUOUS_UNBOUNDED) + + return env.fromSource(source, WatermarkStrategy.noWatermarks(), name, tupleTypeInfo) .forceNonParallel() .partitionCustom( (key, numPartitions) -> key % numPartitions, split -> ((DataSplit) split.f0).bucket()) .transform(name, typeInfo, new MultiTablesReadOperator(catalogLoader, true)); } - - @Override - public void close() throws Exception { - super.close(); - if (tableScan != null) { - tableScan.close(); - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java similarity index 70% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java index 6d4d3e2a2257b..f58d86cdd65eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java @@ -20,12 +20,11 @@ import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.table.source.Split; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.api.connector.source.Boundedness; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; /** @@ -44,8 +43,7 @@ *

    Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the * single (non-parallel) monitoring task, it is responsible for the new Paimon table. */ -public abstract class CombinedCompactorSourceFunction extends RichSourceFunction { - +public abstract class CombinedCompactorSource extends AbstractNonCoordinatedSource { private static final long serialVersionUID = 2L; protected final Catalog.Loader catalogLoader; @@ -54,10 +52,7 @@ public abstract class CombinedCompactorSourceFunction extends RichSourceFunct protected final Pattern databasePattern; protected final boolean isStreaming; - protected transient AtomicBoolean isRunning; - protected transient SourceContext ctx; - - public CombinedCompactorSourceFunction( + public CombinedCompactorSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -71,27 +66,7 @@ public CombinedCompactorSourceFunction( } @Override - public void open(OpenContext openContext) throws Exception { - isRunning = new AtomicBoolean(true); - } - - @Override - public void run(SourceContext sourceContext) throws Exception { - this.ctx = sourceContext; - scanTable(); + public Boundedness getBoundedness() { + return isStreaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; } - - @Override - public void cancel() { - // this is to cover the case where cancel() is called before the run() - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning.set(false); - } - } else { - isRunning.set(false); - } - } - - abstract void scanTable() throws Exception; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java similarity index 71% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java index e606d9b5b4f2c..64f0c38f5a11d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java @@ -25,17 +25,20 @@ import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan; import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.StreamSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +57,12 @@ * It is responsible for the batch compactor source of the table with unaware bucket in combined * mode. */ -public class CombinedUnawareBatchSourceFunction - extends CombinedCompactorSourceFunction { +public class CombinedUnawareBatchSource + extends CombinedCompactorSource { - private static final Logger LOGGER = - LoggerFactory.getLogger(CombinedUnawareBatchSourceFunction.class); - private transient MultiTableScanBase tableScan; + private static final Logger LOGGER = LoggerFactory.getLogger(CombinedUnawareBatchSource.class); - public CombinedUnawareBatchSourceFunction( + public CombinedUnawareBatchSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -70,24 +71,33 @@ public CombinedUnawareBatchSourceFunction( } @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - tableScan = - new MultiUnawareBucketTableScan( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + public SourceReader createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); } - @Override - void scanTable() throws Exception { - if (isRunning.get()) { - MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + private class Reader + extends AbstractNonCoordinatedSourceReader { + private transient MultiTableScanBase tableScan; + + @Override + public void start() { + super.start(); + tableScan = + new MultiUnawareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming); + } + + @Override + public InputStatus pollNext( + ReaderOutput readerOutput) throws Exception { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput); if (scanResult == FINISHED) { - return; + return InputStatus.END_OF_INPUT; } if (scanResult == IS_EMPTY) { // Currently, in the combined mode, there are two scan tasks for the table of two @@ -96,6 +106,15 @@ void scanTable() throws Exception { // should not be thrown exception here. LOGGER.info("No file were collected for the table of unaware-bucket"); } + return InputStatus.END_OF_INPUT; + } + + @Override + public void close() throws Exception { + super.close(); + if (tableScan != null) { + tableScan.close(); + } } } @@ -107,22 +126,18 @@ public static DataStream buildSource( Pattern excludingPattern, Pattern databasePattern, @Nullable Duration partitionIdleTime) { - CombinedUnawareBatchSourceFunction function = - new CombinedUnawareBatchSourceFunction( + CombinedUnawareBatchSource combinedUnawareBatchSource = + new CombinedUnawareBatchSource( catalogLoader, includingPattern, excludingPattern, databasePattern); - StreamSource - sourceOperator = new StreamSource<>(function); MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = new MultiTableCompactionTaskTypeInfo(); SingleOutputStreamOperator source = - new DataStreamSource<>( - env, - compactionTaskTypeInfo, - sourceOperator, - false, + env.fromSource( + combinedUnawareBatchSource, + WatermarkStrategy.noWatermarks(), name, - Boundedness.BOUNDED) + compactionTaskTypeInfo) .forceNonParallel(); if (partitionIdleTime != null) { @@ -167,12 +182,4 @@ private static Long getPartitionInfo( } return partitionInfo.get(partition); } - - @Override - public void close() throws Exception { - super.close(); - if (tableScan != null) { - tableScan.close(); - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java similarity index 59% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java index ceffe788a020d..6ea1ead4db301 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java @@ -23,13 +23,16 @@ import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan; import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.StreamSource; import java.util.regex.Pattern; @@ -39,13 +42,12 @@ /** * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. */ -public class CombinedUnawareStreamingSourceFunction - extends CombinedCompactorSourceFunction { +public class CombinedUnawareStreamingSource + extends CombinedCompactorSource { private final long monitorInterval; - private MultiTableScanBase tableScan; - public CombinedUnawareStreamingSourceFunction( + public CombinedUnawareStreamingSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, @@ -56,29 +58,46 @@ public CombinedUnawareStreamingSourceFunction( } @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - tableScan = - new MultiUnawareBucketTableScan( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - isRunning); + public SourceReader createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); } - @SuppressWarnings("BusyWait") - @Override - void scanTable() throws Exception { - while (isRunning.get()) { - MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + private class Reader + extends AbstractNonCoordinatedSourceReader { + private MultiTableScanBase tableScan; + + @Override + public void start() { + super.start(); + tableScan = + new MultiUnawareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming); + } + + @Override + public InputStatus pollNext( + ReaderOutput readerOutput) throws Exception { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput); if (scanResult == FINISHED) { - return; + return InputStatus.END_OF_INPUT; } if (scanResult == IS_EMPTY) { Thread.sleep(monitorInterval); } + return InputStatus.MORE_AVAILABLE; + } + + @Override + public void close() throws Exception { + super.close(); + if (tableScan != null) { + tableScan.close(); + } } } @@ -91,33 +110,18 @@ public static DataStream buildSource( Pattern databasePattern, long monitorInterval) { - CombinedUnawareStreamingSourceFunction function = - new CombinedUnawareStreamingSourceFunction( + CombinedUnawareStreamingSource source = + new CombinedUnawareStreamingSource( catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval); - StreamSource - sourceOperator = new StreamSource<>(function); - boolean isParallel = false; MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = new MultiTableCompactionTaskTypeInfo(); - return new DataStreamSource<>( - env, - compactionTaskTypeInfo, - sourceOperator, - isParallel, - name, - Boundedness.CONTINUOUS_UNBOUNDED) - .forceNonParallel(); - } - @Override - public void close() throws Exception { - super.close(); - if (tableScan != null) { - tableScan.close(); - } + return env.fromSource( + source, WatermarkStrategy.noWatermarks(), name, compactionTaskTypeInfo) + .forceNonParallel(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java similarity index 52% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index 3805f6f8c536e..4e33376930f8a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -18,6 +18,9 @@ package org.apache.paimon.flink.source.operator; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; @@ -27,22 +30,18 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; -import org.apache.flink.api.common.state.CheckpointListener; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -53,6 +52,7 @@ import java.util.NavigableMap; import java.util.OptionalLong; import java.util.TreeMap; +import java.util.stream.Collectors; import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE; @@ -71,33 +71,23 @@ *

    Currently, there are two features that rely on this monitor: * *

      - *
    1. Consumer-id: rely on this function to do aligned snapshot consumption, and ensure that all + *
    2. Consumer-id: rely on this source to do aligned snapshot consumption, and ensure that all * data in a snapshot is consumed within each checkpoint. *
    3. Snapshot-watermark: when there is no watermark definition, the default Paimon table will * pass the watermark recorded in the snapshot. *
    */ -public class MonitorFunction extends RichSourceFunction - implements CheckpointedFunction, CheckpointListener { +public class MonitorSource extends AbstractNonCoordinatedSource { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class); + private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); private final ReadBuilder readBuilder; private final long monitorInterval; private final boolean emitSnapshotWatermark; - private volatile boolean isRunning = true; - - private transient StreamTableScan scan; - private transient SourceContext ctx; - - private transient ListState checkpointState; - private transient ListState> nextSnapshotState; - private transient TreeMap nextSnapshotPerCheckpoint; - - public MonitorFunction( + public MonitorSource( ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) { this.readBuilder = readBuilder; this.monitorInterval = monitorInterval; @@ -105,40 +95,64 @@ public MonitorFunction( } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - this.scan = readBuilder.newStreamScan(); - - this.checkpointState = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - "next-snapshot", LongSerializer.INSTANCE)); - - @SuppressWarnings("unchecked") - final Class> typedTuple = - (Class>) (Class) Tuple2.class; - this.nextSnapshotState = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - "next-snapshot-per-checkpoint", - new TupleSerializer<>( - typedTuple, - new TypeSerializer[] { - LongSerializer.INSTANCE, LongSerializer.INSTANCE - }))); - - this.nextSnapshotPerCheckpoint = new TreeMap<>(); - - if (context.isRestored()) { - LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new Reader(); + } + + private class Reader extends AbstractNonCoordinatedSourceReader { + private static final String CHECKPOINT_STATE = "CS"; + private static final String NEXT_SNAPSHOT_STATE = "NSS"; + + private final StreamTableScan scan = readBuilder.newStreamScan(); + private final TreeMap nextSnapshotPerCheckpoint = new TreeMap<>(); - List retrievedStates = new ArrayList<>(); - for (Long entry : this.checkpointState.get()) { - retrievedStates.add(entry); + @Override + public void notifyCheckpointComplete(long checkpointId) { + NavigableMap nextSnapshots = + nextSnapshotPerCheckpoint.headMap(checkpointId, true); + OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max(); + max.ifPresent(scan::notifyCheckpointComplete); + nextSnapshots.clear(); + } + + @Override + public List snapshotState(long checkpointId) { + List results = new ArrayList<>(); + + Long nextSnapshot = this.scan.checkpoint(); + if (nextSnapshot != null) { + results.add(new SimpleSourceSplit(CHECKPOINT_STATE + nextSnapshot)); + this.nextSnapshotPerCheckpoint.put(checkpointId, nextSnapshot); + } + + this.nextSnapshotPerCheckpoint.forEach( + (k, v) -> + results.add(new SimpleSourceSplit(NEXT_SNAPSHOT_STATE + k + ":" + v))); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot); } + return results; + } + + @Override + public void addSplits(List list) { + LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + + List retrievedStates = + list.stream() + .map(SimpleSourceSplit::splitId) + .filter(x -> x.startsWith(CHECKPOINT_STATE)) + .map(x -> Long.parseLong(x.substring(CHECKPOINT_STATE.length()))) + .collect(Collectors.toList()); - // given that the parallelism of the function is 1, we can only have 1 retrieved items. + // given that the parallelism of the source is 1, we can only have 1 retrieved items. Preconditions.checkArgument( retrievedStates.size() <= 1, getClass().getSimpleName() + " retrieved invalid state."); @@ -147,83 +161,39 @@ public void initializeState(FunctionInitializationContext context) throws Except this.scan.restore(retrievedStates.get(0)); } - for (Tuple2 tuple2 : nextSnapshotState.get()) { - nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1); - } - } else { - LOG.info("No state to restore for the {}.", getClass().getSimpleName()); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { - this.checkpointState.clear(); - Long nextSnapshot = this.scan.checkpoint(); - if (nextSnapshot != null) { - this.checkpointState.add(nextSnapshot); - this.nextSnapshotPerCheckpoint.put(ctx.getCheckpointId(), nextSnapshot); + list.stream() + .map(SimpleSourceSplit::splitId) + .filter(x -> x.startsWith(NEXT_SNAPSHOT_STATE)) + .map(x -> x.substring(NEXT_SNAPSHOT_STATE.length()).split(":")) + .forEach( + x -> + nextSnapshotPerCheckpoint.put( + Long.parseLong(x[0]), Long.parseLong(x[1]))); } - List> nextSnapshots = new ArrayList<>(); - this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v))); - this.nextSnapshotState.update(nextSnapshots); - - if (LOG.isDebugEnabled()) { - LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot); - } - } - - @SuppressWarnings("BusyWait") - @Override - public void run(SourceContext ctx) throws Exception { - this.ctx = ctx; - while (isRunning) { + @Override + public InputStatus pollNext(ReaderOutput readerOutput) throws Exception { boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - List splits = scan.plan().splits(); - isEmpty = splits.isEmpty(); - splits.forEach(ctx::collect); - - if (emitSnapshotWatermark) { - Long watermark = scan.watermark(); - if (watermark != null) { - ctx.emitWatermark(new Watermark(watermark)); - } + try { + List splits = scan.plan().splits(); + isEmpty = splits.isEmpty(); + splits.forEach(readerOutput::collect); + + if (emitSnapshotWatermark) { + Long watermark = scan.watermark(); + if (watermark != null) { + readerOutput.emitWatermark(new Watermark(watermark)); } - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; } + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return InputStatus.END_OF_INPUT; } if (isEmpty) { Thread.sleep(monitorInterval); } - } - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - NavigableMap nextSnapshots = - nextSnapshotPerCheckpoint.headMap(checkpointId, true); - OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max(); - max.ifPresent(scan::notifyCheckpointComplete); - nextSnapshots.clear(); - } - - @Override - public void cancel() { - // this is to cover the case where cancel() is called before the run() - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning = false; - } - } else { - isRunning = false; + return InputStatus.MORE_AVAILABLE; } } @@ -237,9 +207,10 @@ public static DataStream buildSource( boolean shuffleBucketWithPartition, BucketMode bucketMode) { SingleOutputStreamOperator singleOutputStreamOperator = - env.addSource( - new MonitorFunction( + env.fromSource( + new MonitorSource( readBuilder, monitorInterval, emitSnapshotWatermark), + WatermarkStrategy.noWatermarks(), name + "-Monitor", new JavaTypeInfo<>(Split.class)) .forceNonParallel(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index 73d46ae1e3f19..fbc8bb9d756a3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -52,9 +52,8 @@ /** * The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link - * CombinedAwareBatchSourceFunction} or {@link CombinedAwareStreamingSourceFunction}. Contrary to - * the {@link CombinedCompactorSourceFunction} which has a parallelism of 1, this operator can have - * DOP > 1. + * CombinedAwareBatchSource} or {@link CombinedAwareStreamingSource}. Contrary to the {@link + * CombinedCompactorSource} which has a parallelism of 1, this operator can have DOP > 1. */ public class MultiTablesReadOperator extends AbstractStreamOperator implements OneInputStreamOperator, RowData> { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java index c501c2519b412..0864741a178f1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java @@ -44,7 +44,7 @@ /** * The operator is used for historical partition compaction. It reads {@link * MultiTableUnawareAppendCompactionTask} received from the preceding {@link - * CombinedUnawareBatchSourceFunction} and filter partitions which is not historical. + * CombinedUnawareBatchSource} and filter partitions which is not historical. */ public class MultiUnawareTablesReadOperator extends AbstractStreamOperator diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 80c85f7cdb35a..6caf4544e514b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -38,8 +38,8 @@ /** * The operator that reads the {@link Split splits} received from the preceding {@link - * MonitorFunction}. Contrary to the {@link MonitorFunction} which has a parallelism of 1, this - * operator can have DOP > 1. + * MonitorSource}. Contrary to the {@link MonitorSource} which has a parallelism of 1, this operator + * can have DOP > 1. */ public class ReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 61a03a29a21bd..9d23d1186f1a8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -33,12 +33,18 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataTypes; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.GenericRowData; @@ -46,6 +52,7 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.function.SupplierWithException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,11 +65,13 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.paimon.CoreOptions.CONSUMER_ID; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link MonitorFunction} and {@link ReadOperator}. */ +/** Test for {@link MonitorSource} and {@link ReadOperator}. */ public class OperatorSourceTest { @TempDir Path tempDir; @@ -114,28 +123,39 @@ private List> readSplit(Split split) throws IOException { } @Test - public void testMonitorFunction() throws Exception { + public void testMonitorSource() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. run first OperatorSubtaskState snapshot; { - MonitorFunction function = new MonitorFunction(table.newReadBuilder(), 10, false); - StreamSource src = new StreamSource<>(function); + MonitorSource source = new MonitorSource(table.newReadBuilder(), 10, false); + TestingSourceOperator operator = + (TestingSourceOperator) + TestingSourceOperator.createTestOperator( + source.createReader(null), + WatermarkStrategy.noWatermarks(), + false); AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0); testHarness.open(); - snapshot = testReadSplit(function, () -> testHarness.snapshot(0, 0), 1, 1, 1); + snapshot = testReadSplit(operator, () -> testHarness.snapshot(0, 0), 1, 1, 1); } // 2. restore from state { - MonitorFunction functionCopy1 = new MonitorFunction(table.newReadBuilder(), 10, false); - StreamSource srcCopy1 = new StreamSource<>(functionCopy1); + MonitorSource sourceCopy1 = new MonitorSource(table.newReadBuilder(), 10, false); + TestingSourceOperator operatorCopy1 = + (TestingSourceOperator) + TestingSourceOperator.createTestOperator( + sourceCopy1.createReader(null), + WatermarkStrategy.noWatermarks(), + false); AbstractStreamOperatorTestHarness testHarnessCopy1 = - new AbstractStreamOperatorTestHarness<>(srcCopy1, 1, 1, 0); + new AbstractStreamOperatorTestHarness<>(operatorCopy1, 1, 1, 0); testHarnessCopy1.initializeState(snapshot); testHarnessCopy1.open(); testReadSplit( - functionCopy1, + operatorCopy1, () -> { testHarnessCopy1.snapshot(1, 1); testHarnessCopy1.notifyOfCompletedCheckpoint(1); @@ -148,12 +168,17 @@ public void testMonitorFunction() throws Exception { // 3. restore from consumer id { - MonitorFunction functionCopy2 = new MonitorFunction(table.newReadBuilder(), 10, false); - StreamSource srcCopy2 = new StreamSource<>(functionCopy2); + MonitorSource sourceCopy2 = new MonitorSource(table.newReadBuilder(), 10, false); + TestingSourceOperator operatorCopy2 = + (TestingSourceOperator) + TestingSourceOperator.createTestOperator( + sourceCopy2.createReader(null), + WatermarkStrategy.noWatermarks(), + false); AbstractStreamOperatorTestHarness testHarnessCopy2 = - new AbstractStreamOperatorTestHarness<>(srcCopy2, 1, 1, 0); + new AbstractStreamOperatorTestHarness<>(operatorCopy2, 1, 1, 0); testHarnessCopy2.open(); - testReadSplit(functionCopy2, () -> null, 3, 3, 3); + testReadSplit(operatorCopy2, () -> null, 3, 3, 3); } } @@ -231,7 +256,7 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { } private T testReadSplit( - MonitorFunction function, + SourceOperator operator, SupplierWithException beforeClose, int a, int b, @@ -239,20 +264,36 @@ private T testReadSplit( throws Exception { Throwable[] error = new Throwable[1]; ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); + AtomicReference> iteratorRef = new AtomicReference<>(); - DummySourceContext sourceContext = - new DummySourceContext() { + PushingAsyncDataInput.DataOutput output = + new PushingAsyncDataInput.DataOutput() { @Override - public void collect(Split element) { - queue.add(element); + public void emitRecord(StreamRecord streamRecord) { + queue.add(streamRecord.getValue()); } + + @Override + public void emitWatermark(Watermark watermark) {} + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) {} + + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) {} }; + AtomicBoolean isRunning = new AtomicBoolean(true); Thread runner = new Thread( () -> { try { - function.run(sourceContext); + while (isRunning.get()) { + operator.emitNext(output); + } } catch (Throwable t) { t.printStackTrace(); error[0] = t; @@ -266,7 +307,11 @@ public void collect(Split element) { assertThat(readSplit(split)).containsExactlyInAnyOrder(Arrays.asList(a, b, c)); T t = beforeClose.get(); - function.cancel(); + CloseableIterator iterator = iteratorRef.get(); + if (iterator != null) { + iterator.close(); + } + isRunning.set(false); runner.join(); assertThat(error[0]).isNull(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java new file mode 100644 index 0000000000000..547569cf11195 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.flink.source.SimpleSourceSplit; +import org.apache.paimon.flink.source.SimpleSourceSplitSerializer; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.api.operators.SourceOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; + +import java.util.ArrayList; +import java.util.Collections; + +/** A SourceOperator extension to simplify test setup. */ +public class TestingSourceOperator extends SourceOperator { + + private static final long serialVersionUID = 1L; + + private final int subtaskIndex; + private final int parallelism; + + public TestingSourceOperator( + SourceReader reader, + WatermarkStrategy watermarkStrategy, + ProcessingTimeService timeService, + boolean emitProgressiveWatermarks) { + + this( + reader, + watermarkStrategy, + timeService, + new TestingOperatorEventGateway(), + 1, + 5, + emitProgressiveWatermarks); + } + + public TestingSourceOperator( + SourceReader reader, + WatermarkStrategy watermarkStrategy, + ProcessingTimeService timeService, + OperatorEventGateway eventGateway, + int subtaskIndex, + int parallelism, + boolean emitProgressiveWatermarks) { + + super( + (context) -> reader, + eventGateway, + new SimpleSourceSplitSerializer(), + watermarkStrategy, + timeService, + new Configuration(), + "localhost", + emitProgressiveWatermarks, + () -> false); + + this.subtaskIndex = subtaskIndex; + this.parallelism = parallelism; + this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + initSourceMetricGroup(); + + // unchecked wrapping is okay to keep tests simpler + try { + initReader(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public StreamingRuntimeContext getRuntimeContext() { + return new MockStreamingRuntimeContext(false, parallelism, subtaskIndex); + } + + // this is overridden to avoid complex mock injection through the "containingTask" + @Override + public ExecutionConfig getExecutionConfig() { + ExecutionConfig cfg = new ExecutionConfig(); + cfg.setAutoWatermarkInterval(100); + return cfg; + } + + public static SourceOperator createTestOperator( + SourceReader reader, + WatermarkStrategy watermarkStrategy, + boolean emitProgressiveWatermarks) + throws Exception { + + AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); + Environment env = new MockEnvironmentBuilder().build(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + final OperatorStateStore operatorStateStore = + abstractStateBackend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + env, + "test-operator", + Collections.emptyList(), + cancelStreamRegistry)); + + final StateInitializationContext stateContext = + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); + + TestProcessingTimeService timeService = new TestProcessingTimeService(); + timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero + + final SourceOperator sourceOperator = + new TestingSourceOperator<>( + reader, watermarkStrategy, timeService, emitProgressiveWatermarks); + + sourceOperator.setup( + new SourceOperatorStreamTask(new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + sourceOperator.initializeState(stateContext); + sourceOperator.open(); + + return sourceOperator; + } + + private static class TestingOperatorEventGateway implements OperatorEventGateway { + @Override + public void sendEventToCoordinator(OperatorEvent event) {} + } +}