diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java index 574ff685f3fa..6d9e3a4a7c82 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java @@ -25,7 +25,7 @@ import org.apache.paimon.table.sink.KeyAndBucketExtractor; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; /** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */ public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase { @@ -42,8 +42,8 @@ protected KeyAndBucketExtractor createExtractor(TableSchema schema) { } @Override - protected OneInputStreamOperator, Committable> createWriteOperator( - StoreSinkWrite.Provider writeProvider, String commitUser) { - return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser); + protected OneInputStreamOperatorFactory, Committable> + createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) { + return new CdcDynamicBucketWriteOperator.Factory(table, writeProvider, commitUser); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java index b2fbdc3e93ee..b0b135b3610b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.PrepareCommitOperator; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.TableWriteOperator; @@ -26,6 +27,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; @@ -43,11 +47,12 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(table, storeSinkWriteProvider, initialCommitUser); + super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.retrySleepMillis = table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); } @@ -85,4 +90,30 @@ public void processElement(StreamRecord> element) thr throw new IOException(e); } } + + /** {@link StreamOperatorFactory} of {@link CdcDynamicBucketWriteOperator}. */ + public static class Factory extends TableWriteOperator.Factory> { + + public Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CdcDynamicBucketWriteOperator( + parameters, table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CdcDynamicBucketWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java index 59bdb192beea..bec9508888b4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java @@ -24,7 +24,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; /** * A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema @@ -39,8 +39,8 @@ public CdcFixedBucketSink(FileStoreTable table) { } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new CdcRecordStoreWriteOperator(table, writeProvider, commitUser); + return new CdcRecordStoreWriteOperator.Factory(table, writeProvider, commitUser); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 7d72fe3e801f..5db111a30047 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -38,6 +38,9 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; @@ -74,12 +77,13 @@ public class CdcRecordStoreMultiWriteOperator private String commitUser; private ExecutorService compactExecutor; - public CdcRecordStoreMultiWriteOperator( + private CdcRecordStoreMultiWriteOperator( + StreamOperatorParameters parameters, Catalog.Loader catalogLoader, StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, String initialCommitUser, Options options) { - super(options); + super(parameters, options); this.catalogLoader = catalogLoader; this.storeSinkWriteProvider = storeSinkWriteProvider; this.initialCommitUser = initialCommitUser; @@ -254,4 +258,42 @@ public Map writes() { public String commitUser() { return commitUser; } + + /** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */ + public static class Factory + extends PrepareCommitOperator.Factory { + private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider; + private final String initialCommitUser; + private final Catalog.Loader catalogLoader; + + public Factory( + Catalog.Loader catalogLoader, + StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, + String initialCommitUser, + Options options) { + super(options); + this.catalogLoader = catalogLoader; + this.storeSinkWriteProvider = storeSinkWriteProvider; + this.initialCommitUser = initialCommitUser; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CdcRecordStoreMultiWriteOperator( + parameters, + catalogLoader, + storeSinkWriteProvider, + initialCommitUser, + options); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CdcRecordStoreMultiWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java index dd0aa2e5622c..195e683daaf6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.PrepareCommitOperator; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.TableWriteOperator; @@ -27,6 +28,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; @@ -50,11 +54,12 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator { private final long retrySleepMillis; - public CdcRecordStoreWriteOperator( + protected CdcRecordStoreWriteOperator( + StreamOperatorParameters parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(table, storeSinkWriteProvider, initialCommitUser); + super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.retrySleepMillis = table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); } @@ -92,4 +97,30 @@ public void processElement(StreamRecord element) throws Exception { throw new IOException(e); } } + + /** {@link StreamOperatorFactory} of {@link CdcRecordStoreWriteOperator}. */ + public static class Factory extends TableWriteOperator.Factory { + + public Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CdcRecordStoreWriteOperator( + parameters, table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CdcRecordStoreWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java index 313f4d013ef8..820ef7728f8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java @@ -24,7 +24,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -42,9 +42,9 @@ public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) { } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser); + return new CdcUnawareBucketWriteOperator.Factory(table, writeProvider, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketWriteOperator.java index c57a40f3f71d..26f65fdd09ce 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketWriteOperator.java @@ -18,21 +18,26 @@ package org.apache.paimon.flink.sink.cdc; +import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.PrepareCommitOperator; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowKind; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */ public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator { - public CdcUnawareBucketWriteOperator( + private CdcUnawareBucketWriteOperator( + StreamOperatorParameters parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(table, storeSinkWriteProvider, initialCommitUser); + super(parameters, table, storeSinkWriteProvider, initialCommitUser); } @Override @@ -42,4 +47,30 @@ public void processElement(StreamRecord element) throws Exception { super.processElement(element); } } + + /** {@link StreamOperatorFactory} of {@link CdcUnawareBucketWriteOperator}. */ + public static class Factory extends CdcRecordStoreWriteOperator.Factory { + + public Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CdcUnawareBucketWriteOperator( + parameters, table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CdcUnawareBucketWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 55e987c6055f..f9b7bbc6b910 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -21,7 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.Committer; -import org.apache.paimon.flink.sink.CommitterOperator; +import org.apache.paimon.flink.sink.CommitterOperatorFactory; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; import org.apache.paimon.flink.sink.MultiTableCommittable; @@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -63,19 +63,16 @@ public class FlinkCdcMultiTableSink implements Serializable { private final Catalog.Loader catalogLoader; private final double commitCpuCores; @Nullable private final MemorySize commitHeapMemory; - private final boolean commitChaining; private final String commitUser; public FlinkCdcMultiTableSink( Catalog.Loader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory, - boolean commitChaining, String commitUser) { this.catalogLoader = catalogLoader; this.commitCpuCores = commitCpuCores; this.commitHeapMemory = commitHeapMemory; - this.commitChaining = commitChaining; this.commitUser = commitUser; } @@ -129,10 +126,9 @@ public DataStreamSink sinkFrom( .transform( GLOBAL_COMMITTER_NAME, typeInfo, - new CommitterOperator<>( + new CommitterOperatorFactory<>( true, false, - commitChaining, commitUser, createCommitterFactory(), createCommittableStateManager())) @@ -141,9 +137,10 @@ public DataStreamSink sinkFrom( return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } - protected OneInputStreamOperator createWriteOperator( - StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) { - return new CdcRecordStoreMultiWriteOperator( + protected OneInputStreamOperatorFactory + createWriteOperator( + StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) { + return new CdcRecordStoreMultiWriteOperator.Factory( catalogLoader, writeProvider, commitUser, new Options()); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index ed8fdd113389..a9ad66847b4b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -66,7 +66,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder { @Nullable private Integer parallelism; private double committerCpu; @Nullable private MemorySize committerMemory; - private boolean commitChaining; // Paimon catalog used to check and create tables. There will be two // places where this catalog is used. 1) in processing function, @@ -103,7 +102,6 @@ public FlinkCdcSyncDatabaseSinkBuilder withTableOptions(Options options) { this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU); this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY); - this.commitChaining = options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING); this.commitUser = createCommitUser(options); return this; } @@ -169,7 +167,7 @@ private void buildCombinedCdcSink() { FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink( - catalogLoader, committerCpu, committerMemory, commitChaining, commitUser); + catalogLoader, committerCpu, committerMemory, commitUser); sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index 8c78ab853a60..9f35b25026bb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -689,8 +689,8 @@ public void testUsingTheSameCompactExecutor() throws Exception { private OneInputStreamOperatorTestHarness createTestHarness(Catalog.Loader catalogLoader) throws Exception { - CdcRecordStoreMultiWriteOperator operator = - new CdcRecordStoreMultiWriteOperator( + CdcRecordStoreMultiWriteOperator.Factory operatorFactory = + new CdcRecordStoreMultiWriteOperator.Factory( catalogLoader, (t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl( @@ -709,7 +709,7 @@ public void testUsingTheSameCompactExecutor() throws Exception { TypeSerializer outputSerializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator, inputSerializer); + new OneInputStreamOperatorTestHarness<>(operatorFactory, inputSerializer); harness.setup(outputSerializer); return harness; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index f3693fe405de..f00229d99890 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -253,8 +253,8 @@ public void testUpdateColumnType() throws Exception { private OneInputStreamOperatorTestHarness createTestHarness( FileStoreTable table) throws Exception { - CdcRecordStoreWriteOperator operator = - new CdcRecordStoreWriteOperator( + CdcRecordStoreWriteOperator.Factory operatorFactory = + new CdcRecordStoreWriteOperator.Factory( table, (t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( @@ -272,7 +272,7 @@ private OneInputStreamOperatorTestHarness createTestHarn TypeSerializer outputSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator, inputSerializer); + new OneInputStreamOperatorTestHarness<>(operatorFactory, inputSerializer); harness.setup(outputSerializer); return harness; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java index fd23e500d5e5..e1bd112ca751 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java @@ -60,7 +60,6 @@ public void cancel() {} () -> FlinkCatalogFactory.createPaimonCatalog(new Options()), FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(), null, - true, UUID.randomUUID().toString()); DataStreamSink dataStreamSink = sink.sinkFrom(input); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java index 92cd31ea8aa2..977511920a06 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java @@ -21,7 +21,9 @@ import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Either; @@ -29,9 +31,11 @@ public class AppendBypassCompactWorkerOperator extends AppendCompactWorkerOperator> { - public AppendBypassCompactWorkerOperator(FileStoreTable table, String commitUser) { - super(table, commitUser); - this.chainingStrategy = ChainingStrategy.HEAD; + private AppendBypassCompactWorkerOperator( + StreamOperatorParameters parameters, + FileStoreTable table, + String commitUser) { + super(parameters, table, commitUser); } @Override @@ -49,4 +53,27 @@ public void processElement( unawareBucketCompactor.processElement(element.getValue().right()); } } + + /** {@link StreamOperatorFactory} of {@link AppendBypassCompactWorkerOperator}. */ + public static class Factory + extends AppendCompactWorkerOperator.Factory< + Either> { + + public Factory(FileStoreTable table, String initialCommitUser) { + super(table, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new AppendBypassCompactWorkerOperator(parameters, table, commitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AppendBypassCompactWorkerOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java index 52ab75de6b2c..7a3c0231eb65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java @@ -27,6 +27,8 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +55,11 @@ public abstract class AppendCompactWorkerOperator private transient ExecutorService lazyCompactExecutor; - public AppendCompactWorkerOperator(FileStoreTable table, String commitUser) { - super(Options.fromMap(table.options())); + public AppendCompactWorkerOperator( + StreamOperatorParameters parameters, + FileStoreTable table, + String commitUser) { + super(parameters, Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; } @@ -101,4 +106,17 @@ public void close() throws Exception { this.unawareBucketCompactor.close(); } } + + /** {@link StreamOperatorFactory} of {@link AppendCompactWorkerOperator}. */ + protected abstract static class Factory + extends PrepareCommitOperator.Factory { + protected final FileStoreTable table; + protected final String commitUser; + + protected Factory(FileStoreTable table, String commitUser) { + super(Options.fromMap(table.options())); + this.table = table; + this.commitUser = commitUser; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java index 15e7b9746fe6..83d51f302e51 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -28,6 +28,9 @@ import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,9 +65,12 @@ public class AppendOnlyMultiTableCompactionWorkerOperator private transient Catalog catalog; - public AppendOnlyMultiTableCompactionWorkerOperator( - Catalog.Loader catalogLoader, String commitUser, Options options) { - super(options); + private AppendOnlyMultiTableCompactionWorkerOperator( + StreamOperatorParameters parameters, + Catalog.Loader catalogLoader, + String commitUser, + Options options) { + super(parameters, options); this.commitUser = commitUser; this.catalogLoader = catalogLoader; } @@ -175,4 +181,34 @@ public void close() throws Exception { ExceptionUtils.throwMultiException(exceptions); } + + /** {@link StreamOperatorFactory} of {@link AppendOnlyMultiTableCompactionWorkerOperator}. */ + public static class Factory + extends PrepareCommitOperator.Factory< + MultiTableUnawareAppendCompactionTask, MultiTableCommittable> { + + private final String commitUser; + private final Catalog.Loader catalogLoader; + + public Factory(Catalog.Loader catalogLoader, String commitUser, Options options) { + super(options); + this.commitUser = commitUser; + this.catalogLoader = catalogLoader; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new AppendOnlyMultiTableCompactionWorkerOperator( + parameters, catalogLoader, commitUser, options); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AppendOnlyMultiTableCompactionWorkerOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java index 4d0201d32461..917a7f64f1a0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java @@ -22,6 +22,9 @@ import org.apache.paimon.flink.source.BucketUnawareCompactSource; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -31,12 +34,39 @@ public class AppendOnlySingleTableCompactionWorkerOperator extends AppendCompactWorkerOperator { - public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { - super(table, commitUser); + private AppendOnlySingleTableCompactionWorkerOperator( + StreamOperatorParameters parameters, + FileStoreTable table, + String commitUser) { + super(parameters, table, commitUser); } @Override public void processElement(StreamRecord element) throws Exception { this.unawareBucketCompactor.processElement(element.getValue()); } + + /** {@link StreamOperatorFactory} of {@link AppendOnlySingleTableCompactionWorkerOperator}. */ + public static class Factory + extends AppendCompactWorkerOperator.Factory { + + public Factory(FileStoreTable table, String initialCommitUser) { + super(table, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new AppendOnlySingleTableCompactionWorkerOperator( + parameters, table, commitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AppendOnlySingleTableCompactionWorkerOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 6d27c6019483..0822f0461241 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -32,18 +32,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.SetupableStreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.time.Duration; @@ -58,9 +53,7 @@ * time, tags are automatically created for each flink savepoint. */ public class AutoTagForSavepointCommitterOperator - implements OneInputStreamOperator, - SetupableStreamOperator, - BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { public static final String SAVEPOINT_TAG_PREFIX = "savepoint-"; private static final long serialVersionUID = 1L; @@ -256,19 +249,4 @@ public void setKeyContextElement(StreamRecord record) throws Exception public void endInput() throws Exception { commitOperator.endInput(); } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output output) { - commitOperator.setup(containingTask, config, output); - } - - @Override - public ChainingStrategy getChainingStrategy() { - return commitOperator.getChainingStrategy(); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) { - commitOperator.setChainingStrategy(strategy); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java new file mode 100644 index 000000000000..1787f8e7adce --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SerializableSupplier; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.time.Duration; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * AutoTagForSavepointCommitterOperator}. + */ +public class AutoTagForSavepointCommitterOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + + private final CommitterOperatorFactory commitOperatorFactory; + + private final SerializableSupplier snapshotManagerFactory; + + private final SerializableSupplier tagManagerFactory; + + private final SerializableSupplier tagDeletionFactory; + + private final SerializableSupplier> callbacksSupplier; + + private final NavigableSet identifiersForTags; + + private final Duration tagTimeRetained; + + public AutoTagForSavepointCommitterOperatorFactory( + CommitterOperatorFactory commitOperatorFactory, + SerializableSupplier snapshotManagerFactory, + SerializableSupplier tagManagerFactory, + SerializableSupplier tagDeletionFactory, + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { + this.commitOperatorFactory = commitOperatorFactory; + this.tagManagerFactory = tagManagerFactory; + this.snapshotManagerFactory = snapshotManagerFactory; + this.tagDeletionFactory = tagDeletionFactory; + this.callbacksSupplier = callbacksSupplier; + this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new AutoTagForSavepointCommitterOperator<>( + commitOperatorFactory.createStreamOperator(parameters), + snapshotManagerFactory, + tagManagerFactory, + tagDeletionFactory, + callbacksSupplier, + tagTimeRetained); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AutoTagForSavepointCommitterOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 23202b45077f..1cbcc4b2262f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -28,18 +28,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.SetupableStreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.time.Instant; @@ -53,9 +48,7 @@ * completed, the corresponding tag is generated. */ public class BatchWriteGeneratorTagOperator - implements OneInputStreamOperator, - SetupableStreamOperator, - BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; @@ -250,19 +243,4 @@ public void setKeyContextElement(StreamRecord record) throws Exception public void endInput() throws Exception { commitOperator.endInput(); } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output output) { - commitOperator.setup(containingTask, config, output); - } - - @Override - public ChainingStrategy getChainingStrategy() { - return commitOperator.getChainingStrategy(); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) { - commitOperator.setChainingStrategy(strategy); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java new file mode 100644 index 000000000000..e3c0e5c49168 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * BatchWriteGeneratorTagOperator}. + */ +public class BatchWriteGeneratorTagOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + private final CommitterOperatorFactory commitOperatorFactory; + + protected final FileStoreTable table; + + public BatchWriteGeneratorTagOperatorFactory( + CommitterOperatorFactory commitOperatorFactory, + FileStoreTable table) { + this.table = table; + this.commitOperatorFactory = commitOperatorFactory; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new BatchWriteGeneratorTagOperator<>( + commitOperatorFactory.createStreamOperator(parameters), table); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return BatchWriteGeneratorTagOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index ce4e37305909..c2b4cc0f87e6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.data.RowData; import java.io.Serializable; @@ -119,7 +119,7 @@ public DataStream doWrite( .transform( String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), - new AppendOnlyMultiTableCompactionWorkerOperator( + new AppendOnlyMultiTableCompactionWorkerOperator.Factory( catalogLoader, commitUser, options)) .setParallelism(unawareBucketTableSource.getParallelism()); @@ -160,26 +160,28 @@ protected DataStreamSink doCommit( .transform( GLOBAL_COMMITTER_NAME, new MultiTableCommittableTypeInfo(), - new CommitterOperator<>( + new CommitterOperatorFactory<>( streamingCheckpointEnabled, false, - options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, createCommitterFactory(isStreaming), createCommittableStateManager(), options.get(END_INPUT_WATERMARK))) .setParallelism(written.getParallelism()); + if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { + committed = committed.startNewChain(); + } return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } // TODO:refactor FlinkSink to adopt this sink - protected OneInputStreamOperator + protected OneInputStreamOperatorFactory combinedMultiComacptionWriteOperator( CheckpointConfig checkpointConfig, boolean isStreaming, boolean fullCompaction, String commitUser) { - return new MultiTablesStoreCompactOperator( + return new MultiTablesStoreCompactOperator.Factory( catalogLoader, commitUser, checkpointConfig, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 021a5db413d5..383cbcd6ebf7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -91,26 +91,9 @@ public class CommitterOperator extends AbstractStreamOpe private final Long endInputWatermark; public CommitterOperator( + StreamOperatorParameters parameters, boolean streamingCheckpointEnabled, boolean forceSingleParallelism, - boolean chaining, - String initialCommitUser, - Committer.Factory committerFactory, - CommittableStateManager committableStateManager) { - this( - streamingCheckpointEnabled, - forceSingleParallelism, - chaining, - initialCommitUser, - committerFactory, - committableStateManager, - null); - } - - public CommitterOperator( - boolean streamingCheckpointEnabled, - boolean forceSingleParallelism, - boolean chaining, String initialCommitUser, Committer.Factory committerFactory, CommittableStateManager committableStateManager, @@ -122,7 +105,10 @@ public CommitterOperator( this.committerFactory = checkNotNull(committerFactory); this.committableStateManager = committableStateManager; this.endInputWatermark = endInputWatermark; - setChainingStrategy(chaining ? ChainingStrategy.ALWAYS : ChainingStrategy.HEAD); + this.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java new file mode 100644 index 000000000000..cce3d4e176bf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * CommitterOperator}. + */ +public class CommitterOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + protected final boolean streamingCheckpointEnabled; + + /** Whether to check the parallelism while runtime. */ + protected final boolean forceSingleParallelism; + /** + * This commitUser is valid only for new jobs. After the job starts, this commitUser will be + * recorded into the states of write and commit operators. When the job restarts, commitUser + * will be recovered from states and this value is ignored. + */ + protected final String initialCommitUser; + + /** Group the committable by the checkpoint id. */ + protected final NavigableMap committablesPerCheckpoint; + + protected final Committer.Factory committerFactory; + + protected final CommittableStateManager committableStateManager; + + /** + * Aggregate committables to global committables and commit the global committables to the + * external system. + */ + protected Committer committer; + + protected final Long endInputWatermark; + + public CommitterOperatorFactory( + boolean streamingCheckpointEnabled, + boolean forceSingleParallelism, + String initialCommitUser, + Committer.Factory committerFactory, + CommittableStateManager committableStateManager) { + this( + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + null); + } + + public CommitterOperatorFactory( + boolean streamingCheckpointEnabled, + boolean forceSingleParallelism, + String initialCommitUser, + Committer.Factory committerFactory, + CommittableStateManager committableStateManager, + Long endInputWatermark) { + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.forceSingleParallelism = forceSingleParallelism; + this.initialCommitUser = initialCommitUser; + this.committablesPerCheckpoint = new TreeMap<>(); + this.committerFactory = checkNotNull(committerFactory); + this.committableStateManager = committableStateManager; + this.endInputWatermark = endInputWatermark; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CommitterOperator<>( + parameters, + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + endInputWatermark); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CommitterOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index a0c830d73f58..a9c6031dfa34 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -21,7 +21,7 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.data.RowData; /** {@link FlinkSink} for dedicated compact jobs. */ @@ -37,9 +37,9 @@ public CompactorSink(FileStoreTable table, boolean fullCompaction) { } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new StoreCompactOperator(table, writeProvider, commitUser, fullCompaction); + return new StoreCompactOperator.Factory(table, writeProvider, commitUser, fullCompaction); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java index 53b9be457c3d..b31a1af05224 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java @@ -22,6 +22,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -32,11 +35,12 @@ public class DynamicBucketRowWriteOperator private static final long serialVersionUID = 1L; - public DynamicBucketRowWriteOperator( + private DynamicBucketRowWriteOperator( + StreamOperatorParameters parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(table, storeSinkWriteProvider, initialCommitUser); + super(parameters, table, storeSinkWriteProvider, initialCommitUser); } @Override @@ -49,4 +53,30 @@ public void processElement(StreamRecord> element) throws Exception { write.write(element.getValue().f0, element.getValue().f1); } + + /** {@link StreamOperatorFactory} of {@link DynamicBucketRowWriteOperator}. */ + public static class Factory extends TableWriteOperator.Factory> { + + public Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new DynamicBucketRowWriteOperator( + parameters, table, storeSinkWriteProvider, initialCommitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return DynamicBucketRowWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java index 613bf369b052..402abb4d5aac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java @@ -21,7 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -43,8 +43,9 @@ public FixedBucketSink( } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider, commitUser); + return new RowDataStoreWriteOperator.Factory( + table, logSinkFunction, writeProvider, commitUser); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index dd364c196d8b..8d6c3554c76f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -44,7 +44,7 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.config.ExecutionConfigOptions; import javax.annotation.Nullable; @@ -220,7 +220,7 @@ public DataStream doWrite( + " : " + table.name(), new CommittableTypeInfo(), - createWriteOperator( + createWriteOperatorFactory( createWriteProvider( env.getCheckpointConfig(), isStreaming, @@ -268,11 +268,10 @@ protected DataStreamSink doCommit(DataStream written, String com } Options options = Options.fromMap(table.options()); - OneInputStreamOperator committerOperator = - new CommitterOperator<>( + OneInputStreamOperatorFactory committerOperator = + new CommitterOperatorFactory<>( streamingCheckpointEnabled, true, - options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, createCommitterFactory(), createCommittableStateManager(), @@ -280,8 +279,9 @@ protected DataStreamSink doCommit(DataStream written, String com if (options.get(SINK_AUTO_TAG_FOR_SAVEPOINT)) { committerOperator = - new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) committerOperator, + new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + committerOperator, table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), @@ -291,8 +291,9 @@ protected DataStreamSink doCommit(DataStream written, String com if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { committerOperator = - new BatchWriteGeneratorTagOperator<>( - (CommitterOperator) committerOperator, + new BatchWriteGeneratorTagOperatorFactory<>( + (CommitterOperatorFactory) + committerOperator, table); } SingleOutputStreamOperator committed = @@ -310,6 +311,9 @@ protected DataStreamSink doCommit(DataStream written, String com table.name(), options.get(SINK_OPERATOR_UID_SUFFIX))); } + if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { + committed = committed.startNewChain(); + } configureGlobalCommitter( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); @@ -362,7 +366,7 @@ public static void assertBatchAdaptiveParallelism( } } - protected abstract OneInputStreamOperator createWriteOperator( + protected abstract OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser); protected abstract Committer.Factory createCommitterFactory(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index dcccd0a1a988..5703c408243b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -222,7 +222,7 @@ public DataStreamSink build() { .transform( "local merge", input.getType(), - new LocalMergeOperator(table.schema())) + new LocalMergeOperator.Factory(table.schema())) .setParallelism(input.getParallelism()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index 6931fe907218..070262147643 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -44,10 +44,15 @@ import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -76,13 +81,14 @@ public class LocalMergeOperator extends AbstractStreamOperator private transient boolean endOfInput; - public LocalMergeOperator(TableSchema schema) { + private LocalMergeOperator( + StreamOperatorParameters parameters, TableSchema schema) { Preconditions.checkArgument( schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys"); this.schema = schema; this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); - setChainingStrategy(ChainingStrategy.ALWAYS); + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @Override @@ -235,4 +241,28 @@ LocalMerger merger() { void setOutput(Output> output) { this.output = output; } + + /** {@link StreamOperatorFactory} of {@link LocalMergeOperator}. */ + public static class Factory extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + private final TableSchema schema; + + public Factory(TableSchema schema) { + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.schema = schema; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new LocalMergeOperator(parameters, schema); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return LocalMergeOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 57d2e8413cb5..58f6a3834096 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -33,6 +33,9 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; @@ -76,7 +79,8 @@ public class MultiTablesStoreCompactOperator protected Map writes; protected String commitUser; - public MultiTablesStoreCompactOperator( + private MultiTablesStoreCompactOperator( + StreamOperatorParameters parameters, Catalog.Loader catalogLoader, String initialCommitUser, CheckpointConfig checkpointConfig, @@ -84,7 +88,7 @@ public MultiTablesStoreCompactOperator( boolean ignorePreviousFiles, boolean fullCompaction, Options options) { - super(options); + super(parameters, options); this.catalogLoader = catalogLoader; this.initialCommitUser = initialCommitUser; this.checkpointConfig = checkpointConfig; @@ -316,4 +320,54 @@ private StoreSinkWrite.Provider createWriteProvider( memoryPool, metricGroup); } + + /** {@link StreamOperatorFactory} of {@link MultiTablesStoreCompactOperator}. */ + public static class Factory + extends PrepareCommitOperator.Factory { + private final Catalog.Loader catalogLoader; + private final CheckpointConfig checkpointConfig; + private final boolean isStreaming; + private final boolean ignorePreviousFiles; + private final boolean fullCompaction; + private final String initialCommitUser; + + public Factory( + Catalog.Loader catalogLoader, + String initialCommitUser, + CheckpointConfig checkpointConfig, + boolean isStreaming, + boolean ignorePreviousFiles, + boolean fullCompaction, + Options options) { + super(options); + this.catalogLoader = catalogLoader; + this.initialCommitUser = initialCommitUser; + this.checkpointConfig = checkpointConfig; + this.isStreaming = isStreaming; + this.ignorePreviousFiles = ignorePreviousFiles; + this.fullCompaction = fullCompaction; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new MultiTablesStoreCompactOperator( + parameters, + catalogLoader, + initialCommitUser, + checkpointConfig, + isStreaming, + ignorePreviousFiles, + fullCompaction, + options); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return MultiTablesStoreCompactOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java index 3668386ddc2d..8b114d3e492f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java @@ -26,10 +26,14 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -52,9 +56,9 @@ public abstract class PrepareCommitOperator extends AbstractStreamOpera private final Options options; private boolean endOfInput = false; - public PrepareCommitOperator(Options options) { + public PrepareCommitOperator(StreamOperatorParameters parameters, Options options) { this.options = options; - setChainingStrategy(ChainingStrategy.ALWAYS); + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @Override @@ -103,4 +107,15 @@ private void emitCommittables(boolean waitCompaction, long checkpointId) throws protected abstract List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException; + + /** {@link StreamOperatorFactory} of {@link PrepareCommitOperator}. */ + protected abstract static class Factory extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + protected final Options options; + + protected Factory(Options options) { + this.options = options; + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index 39dcca03c6aa..d9f863c6b919 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -45,11 +45,10 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import javax.annotation.Nullable; @@ -76,34 +75,49 @@ public RewriteFileIndexSink(FileStoreTable table) { } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new FileIndexModificationOperator(table.coreOptions().toConfiguration(), table); + return new FileIndexModificationOperatorFactory( + table.coreOptions().toConfiguration(), table); } - /** File index modification operator to rewrite file index. */ - private static class FileIndexModificationOperator - extends PrepareCommitOperator { - - private static final long serialVersionUID = 1L; - + private static class FileIndexModificationOperatorFactory + extends PrepareCommitOperator.Factory { private final FileStoreTable table; - private transient FileIndexProcessor fileIndexProcessor; - private transient List messages; - - public FileIndexModificationOperator(Options options, FileStoreTable table) { + public FileIndexModificationOperatorFactory(Options options, FileStoreTable table) { super(options); this.table = table; } @Override - public void setup( - StreamTask containingTask, - StreamConfig config, - Output> output) { - super.setup(containingTask, config, output); + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new FileIndexModificationOperator(parameters, options, table); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return FileIndexModificationOperator.class; + } + } + + /** File index modification operator to rewrite file index. */ + private static class FileIndexModificationOperator + extends PrepareCommitOperator { + + private static final long serialVersionUID = 1L; + + private final transient FileIndexProcessor fileIndexProcessor; + private final transient List messages; + private FileIndexModificationOperator( + StreamOperatorParameters parameters, + Options options, + FileStoreTable table) { + super(parameters, options); this.fileIndexProcessor = new FileIndexProcessor(table); this.messages = new ArrayList<>(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index 2b25f074667c..8009bec9677f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -32,13 +32,13 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import javax.annotation.Nullable; @@ -61,21 +61,14 @@ public class RowDataStoreWriteOperator extends TableWriteOperator { /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ private long currentWatermark = Long.MIN_VALUE; - public RowDataStoreWriteOperator( + protected RowDataStoreWriteOperator( + StreamOperatorParameters parameters, FileStoreTable table, @Nullable LogSinkFunction logSinkFunction, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(table, storeSinkWriteProvider, initialCommitUser); + super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.logSinkFunction = logSinkFunction; - } - - @Override - public void setup( - StreamTask containingTask, - StreamConfig config, - Output> output) { - super.setup(containingTask, config, output); if (logSinkFunction != null) { FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext()); } @@ -249,4 +242,38 @@ public Long timestamp() { return timestamp; } } + + /** {@link StreamOperatorFactory} of {@link RowDataStoreWriteOperator}. */ + public static class Factory extends TableWriteOperator.Factory { + + @Nullable private final LogSinkFunction logSinkFunction; + + public Factory( + FileStoreTable table, + @Nullable LogSinkFunction logSinkFunction, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(table, storeSinkWriteProvider, initialCommitUser); + this.logSinkFunction = logSinkFunction; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new RowDataStoreWriteOperator( + parameters, + table, + logSinkFunction, + storeSinkWriteProvider, + initialCommitUser); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return RowDataStoreWriteOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java index bf6c70f0aa29..1f7e62d74916 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java @@ -27,7 +27,7 @@ import org.apache.paimon.utils.SerializableFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -60,8 +60,8 @@ protected ChannelComputer> channelComputer2() { } @Override - protected OneInputStreamOperator, Committable> createWriteOperator( - StoreSinkWrite.Provider writeProvider, String commitUser) { - return new DynamicBucketRowWriteOperator(table, writeProvider, commitUser); + protected OneInputStreamOperatorFactory, Committable> + createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) { + return new DynamicBucketRowWriteOperator.Factory(table, writeProvider, commitUser); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java index 1cd10390c1a0..fea8a382a954 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java @@ -22,7 +22,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import java.util.Map; @@ -38,25 +40,35 @@ public RowUnawareBucketSink( } @Override - protected OneInputStreamOperator createWriteOperator( + protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider, commitUser) { - + return new RowDataStoreWriteOperator.Factory( + table, logSinkFunction, writeProvider, commitUser) { @Override - protected StoreSinkWriteState createState( - StateInitializationContext context, - StoreSinkWriteState.StateValueFilter stateFilter) - throws Exception { - // No conflicts will occur in append only unaware bucket writer, so no state is - // needed. - return new NoopStoreSinkWriteState(stateFilter); - } + public StreamOperator createStreamOperator(StreamOperatorParameters parameters) { + return new RowDataStoreWriteOperator( + parameters, table, logSinkFunction, writeProvider, commitUser) { - @Override - protected String getCommitUser(StateInitializationContext context) throws Exception { - // No conflicts will occur in append only unaware bucket writer, so commitUser does - // not matter. - return commitUser; + @Override + protected StoreSinkWriteState createState( + StateInitializationContext context, + StoreSinkWriteState.StateValueFilter stateFilter) + throws Exception { + // No conflicts will occur in append only unaware bucket writer, so no state + // is + // needed. + return new NoopStoreSinkWriteState(stateFilter); + } + + @Override + protected String getCommitUser(StateInitializationContext context) + throws Exception { + // No conflicts will occur in append only unaware bucket writer, so + // commitUser does + // not matter. + return commitUser; + } + }; } }; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index ac10345bc425..1870a0493c2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -31,6 +31,9 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; @@ -60,12 +63,13 @@ public class StoreCompactOperator extends PrepareCommitOperator> waitToCompact; - public StoreCompactOperator( + private StoreCompactOperator( + StreamOperatorParameters parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser, boolean fullCompaction) { - super(Options.fromMap(table.options())); + super(parameters, Options.fromMap(table.options())); Preconditions.checkArgument( !table.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator."); @@ -163,4 +167,46 @@ public void close() throws Exception { super.close(); write.close(); } + + /** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */ + public static class Factory extends PrepareCommitOperator.Factory { + private final FileStoreTable table; + private final StoreSinkWrite.Provider storeSinkWriteProvider; + private final String initialCommitUser; + private final boolean fullCompaction; + + public Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser, + boolean fullCompaction) { + super(Options.fromMap(table.options())); + Preconditions.checkArgument( + !table.coreOptions().writeOnly(), + CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator."); + this.table = table; + this.storeSinkWriteProvider = storeSinkWriteProvider; + this.initialCommitUser = initialCommitUser; + this.fullCompaction = fullCompaction; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new StoreCompactOperator( + parameters, + table, + storeSinkWriteProvider, + initialCommitUser, + fullCompaction); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StoreCompactOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 32fcdd03bdfd..fd876698c094 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import java.io.IOException; @@ -45,10 +47,11 @@ public abstract class TableWriteOperator extends PrepareCommitOperator parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) { - super(Options.fromMap(table.options())); + super(parameters, Options.fromMap(table.options())); this.table = table; this.storeSinkWriteProvider = storeSinkWriteProvider; this.initialCommitUser = initialCommitUser; @@ -128,4 +131,22 @@ protected List prepareCommit(boolean waitCompaction, long checkpoin public StoreSinkWrite getWrite() { return write; } + + /** {@link StreamOperatorFactory} of {@link TableWriteOperator}. */ + protected abstract static class Factory + extends PrepareCommitOperator.Factory { + protected final FileStoreTable table; + protected final StoreSinkWrite.Provider storeSinkWriteProvider; + protected final String initialCommitUser; + + protected Factory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser) { + super(Options.fromMap(table.options())); + this.table = table; + this.storeSinkWriteProvider = storeSinkWriteProvider; + this.initialCommitUser = initialCommitUser; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index da966d5e5156..7a4095f896cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; /** Compaction Sink for unaware-bucket table. */ public class UnawareBucketCompactionSink extends FlinkSink { @@ -42,9 +42,9 @@ public static DataStreamSink sink( } @Override - protected OneInputStreamOperator createWriteOperator( - StoreSinkWrite.Provider writeProvider, String commitUser) { - return new AppendOnlySingleTableCompactionWorkerOperator(table, commitUser); + protected OneInputStreamOperatorFactory + createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) { + return new AppendOnlySingleTableCompactionWorkerOperator.Factory(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 98b58aa8e96d..7bc40d4c2080 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -74,11 +74,14 @@ public DataStream doWrite( new CommittableTypeInfo(), new CompactionTaskTypeInfo()), new AppendBypassCoordinateOperatorFactory<>(table)) + .startNewChain() .forceNonParallel() .transform( "Compact Worker: " + table.name(), new CommittableTypeInfo(), - new AppendBypassCompactWorkerOperator(table, initialCommitUser)) + new AppendBypassCompactWorkerOperator.Factory( + table, initialCommitUser)) + .startNewChain() .setParallelism(written.getParallelism()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java index 26e080c32e83..7022002a43ba 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java @@ -39,7 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -63,9 +63,9 @@ public GlobalDynamicBucketSink( } @Override - protected OneInputStreamOperator, Committable> createWriteOperator( - StoreSinkWrite.Provider writeProvider, String commitUser) { - return new DynamicBucketRowWriteOperator(table, writeProvider, commitUser); + protected OneInputStreamOperatorFactory, Committable> + createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) { + return new DynamicBucketRowWriteOperator.Factory(table, writeProvider, commitUser); } public DataStreamSink build(DataStream input, @Nullable Integer parallelism) { @@ -89,7 +89,8 @@ public DataStreamSink build(DataStream input, @Nullable Integer new InternalTypeInfo<>( new KeyWithRowSerializer<>( bootstrapSerializer, rowSerializer)), - new IndexBootstrapOperator<>(new IndexBootstrap(table), r -> r)) + new IndexBootstrapOperator.Factory<>( + new IndexBootstrap(table), r -> r)) .setParallelism(input.getParallelism()); // 1. shuffle by key hash diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java index 5c8ba8f9441f..8136565f98cf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java @@ -27,8 +27,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** Operator for {@link IndexBootstrap}. */ @@ -40,11 +45,13 @@ public class IndexBootstrapOperator extends AbstractStreamOperator converter; - public IndexBootstrapOperator( - IndexBootstrap bootstrap, SerializableFunction converter) { + private IndexBootstrapOperator( + StreamOperatorParameters> parameters, + IndexBootstrap bootstrap, + SerializableFunction converter) { this.bootstrap = bootstrap; this.converter = converter; - setChainingStrategy(ChainingStrategy.ALWAYS); + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @Override @@ -65,4 +72,30 @@ private void collect(InternalRow row) { output.collect( new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART, converter.apply(row)))); } + + /** {@link StreamOperatorFactory} of {@link IndexBootstrapOperator}. */ + public static class Factory extends AbstractStreamOperatorFactory> + implements OneInputStreamOperatorFactory> { + private final IndexBootstrap bootstrap; + private final SerializableFunction converter; + + public Factory(IndexBootstrap bootstrap, SerializableFunction converter) { + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.bootstrap = bootstrap; + this.converter = converter; + } + + @Override + @SuppressWarnings("unchecked") + public >> OP createStreamOperator( + StreamOperatorParameters> parameters) { + return (OP) new IndexBootstrapOperator<>(parameters, bootstrap, converter); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return IndexBootstrapOperator.class; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java index 45090f7b68b4..b8b0d61e10a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java @@ -26,8 +26,8 @@ import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.types.Either; @@ -58,10 +58,12 @@ public class AppendBypassCoordinateOperator private transient LinkedBlockingQueue compactTasks; public AppendBypassCoordinateOperator( - FileStoreTable table, ProcessingTimeService processingTimeService) { + StreamOperatorParameters> parameters, + FileStoreTable table, + ProcessingTimeService processingTimeService) { this.table = table; this.processingTimeService = processingTimeService; - this.chainingStrategy = ChainingStrategy.HEAD; + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java index 7c53e01b47e6..a4c51e5b5a9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java @@ -45,11 +45,7 @@ T createStreamOperator( StreamOperatorParameters> parameters) { AppendBypassCoordinateOperator operator = - new AppendBypassCoordinateOperator<>(table, processingTimeService); - operator.setup( - parameters.getContainingTask(), - parameters.getStreamConfig(), - parameters.getOutput()); + new AppendBypassCoordinateOperator<>(parameters, table, processingTimeService); return (T) operator; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java index d589459d9b96..949c2c7a66a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java @@ -25,7 +25,13 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -45,8 +51,17 @@ public class AppendOnlyMultiTableCompactionWorkerOperatorTest extends TableTestB public void testAsyncCompactionWorks() throws Exception { AppendOnlyMultiTableCompactionWorkerOperator workerOperator = - new AppendOnlyMultiTableCompactionWorkerOperator( - () -> catalog, "user", new Options()); + new AppendOnlyMultiTableCompactionWorkerOperator.Factory( + () -> catalog, "user", new Options()) + .createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask( + new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); List> records = new ArrayList<>(); // create table and write diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index d04032817cf0..6238a9cbf3ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -32,7 +32,13 @@ import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.DataTypes; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,7 +55,16 @@ public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest public void testAsyncCompactionWorks() throws Exception { createTableDefault(); AppendOnlySingleTableCompactionWorkerOperator workerOperator = - new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); + new AppendOnlySingleTableCompactionWorkerOperator.Factory(getTableDefault(), "user") + .createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask( + new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -102,7 +117,16 @@ public void testAsyncCompactionWorks() throws Exception { public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); AppendOnlySingleTableCompactionWorkerOperator workerOperator = - new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); + new AppendOnlySingleTableCompactionWorkerOperator.Factory(getTableDefault(), "user") + .createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask( + new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); // write 200 files List commitMessages = writeDataDefault(200, 40); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 3b58c24d16b1..ee930a06fc3d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -198,13 +198,15 @@ private void processCommittable( } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) - super.createCommitterOperator(table, commitUser, committableStateManager), + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( + table, commitUser, committableStateManager), table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), @@ -213,14 +215,15 @@ protected OneInputStreamOperator createCommitterOperat } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager, - ThrowingConsumer initializeFunction) { - return new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) - super.createCommitterOperator( + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager, + ThrowingConsumer initializeFunction) { + return new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( table, commitUser, committableStateManager, initializeFunction), table::snapshotManager, table::tagManager, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index 147110637aef..68162832eac9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -27,13 +27,21 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.junit.jupiter.api.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.HashMap; import java.util.Objects; @@ -54,12 +62,23 @@ public void testBatchWriteGeneratorTag() throws Exception { StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite(); - OneInputStreamOperator committerOperator = - createCommitterOperator( + OneInputStreamOperatorFactory committerOperatorFactory = + createCommitterOperatorFactory( table, initialCommitUser, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); + + OneInputStreamOperator committerOperator = + committerOperatorFactory.createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask(new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); + committerOperator.open(); TableCommitImpl tableCommit = table.newCommit(initialCommitUser); @@ -106,13 +125,15 @@ public void testBatchWriteGeneratorTag() throws Exception { } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new BatchWriteGeneratorTagOperator<>( - (CommitterOperator) - super.createCommitterOperator(table, commitUser, committableStateManager), + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new BatchWriteGeneratorTagOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( + table, commitUser, committableStateManager), table); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 668d651236fd..28c93ca79be0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -51,10 +51,13 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -259,8 +262,8 @@ public void testRestoreCommitUser() throws Exception { // 3. Check whether success List actual = new ArrayList<>(); - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, initialCommitUser, new NoopCommittableStateManager(), @@ -274,7 +277,7 @@ public void testRestoreCommitUser() throws Exception { }); OneInputStreamOperatorTestHarness testHarness1 = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness1.initializeState(snapshot); testHarness1.close(); @@ -315,10 +318,11 @@ public void testRestoreEmptyMarkDoneState() throws Exception { public void testCommitInputEnd() throws Exception { FileStoreTable table = createFileStoreTable(); String commitUser = UUID.randomUUID().toString(); - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness.open(); Assertions.assertThatCode( () -> { @@ -378,10 +382,10 @@ public void testCommitInputEnd() throws Exception { }) .doesNotThrowAnyException(); - if (operator instanceof CommitterOperator) { + if (operatorFactory instanceof CommitterOperator) { Assertions.assertThat( ((ManifestCommittable) - ((CommitterOperator) operator) + ((CommitterOperator) operatorFactory) .committablesPerCheckpoint.get(Long.MAX_VALUE)) .fileCommittables() .size()) @@ -604,14 +608,14 @@ public void testCalcDataBytesSend() throws Exception { public void testCommitMetrics() throws Exception { FileStoreTable table = createFileStoreTable(); - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, null, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness.open(); long timestamp = 0; StreamTableWrite write = @@ -627,7 +631,9 @@ public void testCommitMetrics() throws Exception { testHarness.notifyOfCompletedCheckpoint(cpId); MetricGroup commitMetricGroup = - operator.getMetricGroup() + testHarness + .getOneInputOperator() + .getMetricGroup() .addGroup("paimon") .addGroup("table", table.name()) .addGroup("commit"); @@ -685,10 +691,11 @@ public void testCommitMetrics() throws Exception { public void testParallelism() throws Exception { FileStoreTable table = createFileStoreTable(); String commitUser = UUID.randomUUID().toString(); - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); try (OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator, 10, 10, 3)) { + createTestHarness(operatorFactory, 10, 10, 3)) { Assertions.assertThatCode(testHarness::open) .hasMessage("Committer Operator parallelism in paimon MUST be one."); } @@ -700,13 +707,13 @@ public void testParallelism() throws Exception { protected OneInputStreamOperatorTestHarness createRecoverableTestHarness(FileStoreTable table) throws Exception { - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, null, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); - return createTestHarness(operator); + return createTestHarness(operatorFactory); } private OneInputStreamOperatorTestHarness createLossyTestHarness( @@ -716,18 +723,20 @@ private OneInputStreamOperatorTestHarness createLossyT private OneInputStreamOperatorTestHarness createLossyTestHarness( FileStoreTable table, String commitUser) throws Exception { - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); - return createTestHarness(operator); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); + return createTestHarness(operatorFactory); } private OneInputStreamOperatorTestHarness createTestHarness( - OneInputStreamOperator operator) throws Exception { - return createTestHarness(operator, 1, 1, 0); + OneInputStreamOperatorFactory operatorFactory) + throws Exception { + return createTestHarness(operatorFactory, 1, 1, 0); } private OneInputStreamOperatorTestHarness createTestHarness( - OneInputStreamOperator operator, + OneInputStreamOperatorFactory operatorFactory, int maxParallelism, int parallelism, int subTaskIndex) @@ -736,22 +745,23 @@ private OneInputStreamOperatorTestHarness createTestHa new CommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( - operator, + operatorFactory, maxParallelism, parallelism, subTaskIndex, - serializer, new OperatorID()); + harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer)); + harness.getStreamConfig().serializeAllConfigs(); harness.setup(serializer); return harness; } - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new CommitterOperator<>( - true, + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new CommitterOperatorFactory<>( true, true, commitUser == null ? initialCommitUser : commitUser, @@ -765,13 +775,13 @@ protected OneInputStreamOperator createCommitterOperat committableStateManager); } - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager, - ThrowingConsumer initializeFunction) { - return new CommitterOperator( - true, + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager, + ThrowingConsumer initializeFunction) { + return new CommitterOperatorFactory( true, true, commitUser == null ? initialCommitUser : commitUser, @@ -784,8 +794,24 @@ protected OneInputStreamOperator createCommitterOperat context), committableStateManager) { @Override - public void initializeState(StateInitializationContext context) throws Exception { - initializeFunction.accept(context); + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CommitterOperator( + parameters, + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + endInputWatermark) { + @Override + public void initializeState(StateInitializationContext context) + throws Exception { + initializeFunction.accept(context); + } + }; } }; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index 42293ca2842e..d487d75925eb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -254,8 +254,8 @@ private OneInputStreamOperatorTestHarness createTestHarnes return harness; } - protected StoreCompactOperator createCompactOperator(FileStoreTable table) { - return new StoreCompactOperator( + protected StoreCompactOperator.Factory createCompactOperator(FileStoreTable table) { + return new StoreCompactOperator.Factory( table, (t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl( @@ -272,9 +272,9 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) { true); } - protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator( + protected MultiTablesStoreCompactOperator.Factory createMultiTablesCompactOperator( Catalog.Loader catalogLoader) throws Exception { - return new MultiTablesStoreCompactOperator( + return new MultiTablesStoreCompactOperator.Factory( catalogLoader, commitUser, new CheckpointConfig(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java index c335568344b3..5f21858e61a5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java @@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -82,20 +82,22 @@ private boolean testSpillable( Collections.singletonList(GenericRow.of(1, 1))); FlinkSink flinkSink = new FixedBucketSink(fileStoreTable, null, null); DataStream written = flinkSink.doWrite(source, "123", 1); - RowDataStoreWriteOperator operator = - ((RowDataStoreWriteOperator) - ((SimpleOperatorFactory) - ((OneInputTransformation) written.getTransformation()) - .getOperatorFactory()) - .getOperator()); + OneInputStreamOperatorFactory operatorFactory = + (OneInputStreamOperatorFactory) + ((OneInputTransformation) + written.getTransformation()) + .getOperatorFactory(); TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator); + new OneInputStreamOperatorTestHarness<>(operatorFactory); harness.setup(serializer); harness.initializeEmptyState(); + RowDataStoreWriteOperator operator = + (RowDataStoreWriteOperator) harness.getOneInputOperator(); + return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl) operator.write).write.getWrite()) .bufferSpillable(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java index 1162e20b155b..fc45eceb3fd5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java @@ -26,12 +26,18 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.OutputTag; import org.junit.jupiter.api.Test; @@ -151,7 +157,17 @@ private void prepareHashOperator(Map options) throws Exception { Collections.singletonList("f0"), options, null); - operator = new LocalMergeOperator(schema); + operator = + new LocalMergeOperator.Factory(schema) + .createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask( + new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); operator.open(); assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index f8387e1fc41a..3740033e025e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -48,8 +48,8 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { CompactRememberStoreWrite compactRememberStoreWrite = new CompactRememberStoreWrite(streamingMode); - StoreCompactOperator operator = - new StoreCompactOperator( + StoreCompactOperator.Factory operatorFactory = + new StoreCompactOperator.Factory( getTableDefault(), (table, commitUser, state, ioManager, memoryPool, metricGroup) -> compactRememberStoreWrite, @@ -59,7 +59,7 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator); + new OneInputStreamOperatorTestHarness<>(operatorFactory); harness.setup(serializer); harness.initializeEmptyState(); harness.open(); @@ -70,7 +70,7 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { harness.processElement(new StreamRecord<>(data(1))); harness.processElement(new StreamRecord<>(data(2))); - operator.prepareCommit(true, 1); + ((StoreCompactOperator) harness.getOneInputOperator()).prepareCommit(true, 1); Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 10e432f3c8c2..752679fb5903 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -645,11 +645,10 @@ public void testCommitMetrics() throws Exception { private OneInputStreamOperatorTestHarness createRecoverableTestHarness() throws Exception { - CommitterOperator operator = - new CommitterOperator<>( + CommitterOperatorFactory operator = + new CommitterOperatorFactory<>( true, false, - true, initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new RestoreAndFailCommittableStateManager<>( @@ -659,11 +658,10 @@ public void testCommitMetrics() throws Exception { private OneInputStreamOperatorTestHarness createLossyTestHarness() throws Exception { - CommitterOperator operator = - new CommitterOperator<>( + CommitterOperatorFactory operator = + new CommitterOperatorFactory<>( true, false, - true, initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new CommittableStateManager() { @@ -682,12 +680,13 @@ public void snapshotState( private OneInputStreamOperatorTestHarness createTestHarness( - CommitterOperator operator) + CommitterOperatorFactory + operatorFactory) throws Exception { TypeSerializer serializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator, serializer); + new OneInputStreamOperatorTestHarness<>(operatorFactory, serializer); harness.setup(serializer); return harness; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java new file mode 100644 index 000000000000..a4605b830918 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.CompiledPlanUtils; +import org.apache.flink.util.TimeUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link org.apache.flink.streaming.api.operators.ChainingStrategy} of writer operators. + */ +public class WriterChainingStrategyTest { + private static final String TABLE_NAME = "paimon_table"; + + @TempDir java.nio.file.Path tempDir; + + private StreamTableEnvironment tEnv; + + @BeforeEach + public void beforeEach() { + Configuration config = new Configuration(); + config.setString( + "execution.checkpointing.interval", + TimeUtils.formatWithHighestUnit(Duration.ofMillis(500))); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + tEnv = StreamTableEnvironment.create(env); + + String catalog = "PAIMON"; + Map options = new HashMap<>(); + options.put("type", "paimon"); + options.put("warehouse", tempDir.toString()); + tEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH ( %s )", + catalog, + options.entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")))); + tEnv.useCatalog(catalog); + } + + @Test + public void testAppendTable() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING) " + + "WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true')", + TABLE_NAME)) + .await(); + + verifyChaining(false, true); + } + + @Test + public void testAppendTableWithUnawareBucket() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING) " + + "WITH ('bucket' = '-1', 'write-only' = 'true')", + TABLE_NAME)) + .await(); + + verifyChaining(true, true); + } + + @Test + public void testPrimaryKeyTable() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true')", + TABLE_NAME)) + .await(); + + verifyChaining(false, true); + } + + @Test + public void testPrimaryKeyTableWithDynamicBucket() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH ('bucket' = '-1', 'write-only' = 'true')", + TABLE_NAME)) + .await(); + + verifyChaining(false, true); + } + + @Test + public void testPrimaryKeyTableWithMultipleWriter() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH ('bucket' = '1', 'bucket-key'='id', 'write-only' = 'true', 'sink.parallelism' = '2')", + TABLE_NAME)) + .await(); + + verifyChaining(false, false); + } + + @Test + public void testPrimaryKeyTableWithCrossPartitionUpdate() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) " + + "PARTITIONED BY ( dt ) WITH ('bucket' = '-1', 'write-only' = 'true')", + TABLE_NAME)) + .await(); + + List vertices = verifyChaining(false, true); + JobVertex vertex = findVertex(vertices, "INDEX_BOOTSTRAP"); + assertThat(vertex.toString()).contains("Source"); + } + + @Test + public void testPrimaryKeyTableWithLocalMerge() throws Exception { + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, data STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH ('bucket' = '-1', 'write-only' = 'true', 'local-merge-buffer-size' = '1MB')", + TABLE_NAME)) + .await(); + + List vertices = verifyChaining(false, true); + JobVertex vertex = findVertex(vertices, "local merge"); + assertThat(vertex.toString()).contains("Source"); + } + + private List verifyChaining( + boolean isWriterChainedWithUpstream, boolean isWriterChainedWithDownStream) { + CompiledPlan plan = + tEnv.compilePlanSql( + String.format( + "INSERT INTO %s VALUES (1, 'AAA', ''), (2, 'BBB', '')", + TABLE_NAME)); + List> transformations = CompiledPlanUtils.toTransformations(tEnv, plan); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + transformations.forEach(env::addOperator); + + List vertices = new ArrayList<>(); + env.getStreamGraph().getJobGraph().getVertices().forEach(vertices::add); + JobVertex vertex = findVertex(vertices, "Writer"); + + if (isWriterChainedWithUpstream) { + assertThat(vertex.toString()).contains("Source"); + } else { + assertThat(vertex.toString()).doesNotContain("Source"); + } + + if (isWriterChainedWithDownStream) { + assertThat(vertex.toString()).contains("Committer"); + } else { + assertThat(vertex.toString()).doesNotContain("Committer"); + } + + return vertices; + } + + private JobVertex findVertex(List vertices, String key) { + for (JobVertex vertex : vertices) { + if (vertex.toString().contains(key)) { + return vertex; + } + } + throw new IllegalStateException( + String.format( + "Cannot find vertex with keyword %s among job vertices %s", key, vertices)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index 3a8c1557122f..83af15745078 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -115,9 +115,10 @@ public void testAppendOnlyTableMetrics() throws Exception { private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { String tableName = tablePath.getName(); - RowDataStoreWriteOperator operator = getStoreSinkWriteOperator(fileStoreTable); + RowDataStoreWriteOperator.Factory operatorFactory = + getStoreSinkWriteOperatorFactory(fileStoreTable); OneInputStreamOperatorTestHarness harness = - createHarness(operator); + createHarness(operatorFactory); TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig()); @@ -133,7 +134,7 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { harness.snapshot(1, 2); harness.notifyOfCompletedCheckpoint(1); - OperatorMetricGroup metricGroup = operator.getMetricGroup(); + OperatorMetricGroup metricGroup = harness.getOneInputOperator().getMetricGroup(); MetricGroup writerBufferMetricGroup = metricGroup .addGroup("paimon") @@ -173,9 +174,10 @@ public void testAsyncLookupWithFailure() throws Exception { rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); // we don't wait for compaction because this is async lookup test - RowDataStoreWriteOperator operator = getAsyncLookupWriteOperator(fileStoreTable, false); + RowDataStoreWriteOperator.Factory operatorFactory = + getAsyncLookupWriteOperatorFactory(fileStoreTable, false); OneInputStreamOperatorTestHarness harness = - createHarness(operator); + createHarness(operatorFactory); TableCommitImpl commit = fileStoreTable.newCommit(commitUser); @@ -205,8 +207,8 @@ public void testAsyncLookupWithFailure() throws Exception { harness.close(); // re-create operator from state, this time wait for compaction to check result - operator = getAsyncLookupWriteOperator(fileStoreTable, true); - harness = createHarness(operator); + operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, true); + harness = createHarness(operatorFactory); harness.setup(serializer); harness.initializeState(state); harness.open(); @@ -263,9 +265,10 @@ private void testChangelog(boolean insertOnly) throws Exception { FileStoreTable fileStoreTable = createFileStoreTable( rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); - RowDataStoreWriteOperator operator = getStoreSinkWriteOperator(fileStoreTable); + RowDataStoreWriteOperator.Factory operatorFactory = + getStoreSinkWriteOperatorFactory(fileStoreTable); OneInputStreamOperatorTestHarness harness = - createHarness(operator); + createHarness(operatorFactory); TableCommitImpl commit = fileStoreTable.newCommit(commitUser); @@ -277,7 +280,7 @@ private void testChangelog(boolean insertOnly) throws Exception { if (insertOnly) { Field field = TableWriteOperator.class.getDeclaredField("write"); field.setAccessible(true); - StoreSinkWrite write = (StoreSinkWrite) field.get(operator); + StoreSinkWrite write = (StoreSinkWrite) field.get(harness.getOneInputOperator()); write.withInsertOnly(true); } @@ -339,17 +342,17 @@ public void testNumWritersMetric() throws Exception { options); TableCommitImpl commit = fileStoreTable.newCommit(commitUser); - RowDataStoreWriteOperator rowDataStoreWriteOperator = - getStoreSinkWriteOperator(fileStoreTable); + RowDataStoreWriteOperator.Factory operatorFactory = + getStoreSinkWriteOperatorFactory(fileStoreTable); OneInputStreamOperatorTestHarness harness = - createHarness(rowDataStoreWriteOperator); + createHarness(operatorFactory); TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig()); harness.setup(serializer); harness.open(); - OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup(); + OperatorMetricGroup metricGroup = harness.getOneInputOperator().getMetricGroup(); MetricGroup writerBufferMetricGroup = metricGroup .addGroup("paimon") @@ -408,8 +411,9 @@ public void testNumWritersMetric() throws Exception { // Test utils // ------------------------------------------------------------------------ - private RowDataStoreWriteOperator getStoreSinkWriteOperator(FileStoreTable fileStoreTable) { - return new RowDataStoreWriteOperator( + private RowDataStoreWriteOperator.Factory getStoreSinkWriteOperatorFactory( + FileStoreTable fileStoreTable) { + return new RowDataStoreWriteOperator.Factory( fileStoreTable, null, (table, commitUser, state, ioManager, memoryPool, metricGroup) -> @@ -426,9 +430,9 @@ private RowDataStoreWriteOperator getStoreSinkWriteOperator(FileStoreTable fileS commitUser); } - private RowDataStoreWriteOperator getAsyncLookupWriteOperator( + private RowDataStoreWriteOperator.Factory getAsyncLookupWriteOperatorFactory( FileStoreTable fileStoreTable, boolean waitCompaction) { - return new RowDataStoreWriteOperator( + return new RowDataStoreWriteOperator.Factory( fileStoreTable, null, (table, commitUser, state, ioManager, memoryPool, metricGroup) -> @@ -471,10 +475,11 @@ private FileStoreTable createFileStoreTable( } private OneInputStreamOperatorTestHarness createHarness( - RowDataStoreWriteOperator operator) throws Exception { + RowDataStoreWriteOperator.Factory operatorFactory) throws Exception { InternalTypeInfo internalRowInternalTypeInfo = new InternalTypeInfo<>(new InternalRowTypeSerializer(RowType.builder().build())); return new OneInputStreamOperatorTestHarness<>( - operator, internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); + operatorFactory, + internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); } }