Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Avoid deprecated SetupableStreamOperator #4591

Merged
merged 5 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.table.sink.KeyAndBucketExtractor;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
Expand All @@ -42,8 +42,8 @@ protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema schema) {
}

@Override
protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser);
protected OneInputStreamOperatorFactory<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -43,11 +47,12 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

public CdcDynamicBucketWriteOperator(
private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -85,4 +90,30 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcDynamicBucketWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<Tuple2<CdcRecord, Integer>> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcDynamicBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcDynamicBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/**
* A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema
Expand All @@ -39,8 +39,8 @@ public CdcFixedBucketSink(FileStoreTable table) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcRecordStoreWriteOperator(table, writeProvider, commitUser);
return new CdcRecordStoreWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand Down Expand Up @@ -74,12 +77,13 @@ public class CdcRecordStoreMultiWriteOperator
private String commitUser;
private ExecutorService compactExecutor;

public CdcRecordStoreMultiWriteOperator(
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
super(parameters, options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
Expand Down Expand Up @@ -254,4 +258,42 @@ public Map<Identifier, StoreSinkWrite> writes() {
public String commitUser() {
return commitUser;
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<CdcMultiplexRecord, MultiTableCommittable> {
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
private final Catalog.Loader catalogLoader;

public Factory(
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
StreamOperatorParameters<MultiTableCommittable> parameters) {
return (T)
new CdcRecordStoreMultiWriteOperator(
parameters,
catalogLoader,
storeSinkWriteProvider,
initialCommitUser,
options);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreMultiWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
Expand All @@ -27,6 +28,9 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -50,11 +54,12 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {

private final long retrySleepMillis;

public CdcRecordStoreWriteOperator(
protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -92,4 +97,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<CdcRecord> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcRecordStoreWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -42,9 +42,9 @@ public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
return new CdcUnawareBucketWriteOperator.Factory(table, writeProvider, commitUser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
private CdcUnawareBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
Expand All @@ -42,4 +47,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
super.processElement(element);
}
}

/** {@link StreamOperatorFactory} of {@link CdcUnawareBucketWriteOperator}. */
public static class Factory extends CdcRecordStoreWriteOperator.Factory {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcUnawareBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcUnawareBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
Expand All @@ -41,7 +41,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -63,19 +63,16 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final Catalog.Loader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final boolean commitChaining;
private final String commitUser;

public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
boolean commitChaining,
String commitUser) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitChaining = commitChaining;
this.commitUser = commitUser;
}

Expand Down Expand Up @@ -129,10 +126,9 @@ public DataStreamSink<?> sinkFrom(
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
new CommitterOperatorFactory<>(
true,
false,
commitChaining,
yunfengzhou-hub marked this conversation as resolved.
Show resolved Hide resolved
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
Expand All @@ -141,9 +137,10 @@ public DataStreamSink<?> sinkFrom(
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator(
protected OneInputStreamOperatorFactory<CdcMultiplexRecord, MultiTableCommittable>
createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader, writeProvider, commitUser, new Options());
}

Expand Down
Loading
Loading