Skip to content

Commit

Permalink
[flink] Replace legacy SinkFunction with v2 Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 1, 2024
1 parent 475e487 commit be4357f
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

/** Placeholder class to resolve compatibility issues. */
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.v2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;

/**
* A special sink that ignores all elements.
*
* @param <IN> The type of elements received by the sink.
*/
@PublicEvolving
public class DiscardingSink<IN> implements Sink<IN> {
private static final long serialVersionUID = 1L;

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
return new DiscardingElementWriter();
}

private class DiscardingElementWriter implements SinkWriter<IN> {

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
// discard it.
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// this writer has no pending data.
}

@Override
public void close() throws Exception {
// do nothing.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

/** Placeholder class to resolve compatibility issues. */
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.v2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;

/**
* A special sink that ignores all elements.
*
* @param <IN> The type of elements received by the sink.
*/
@PublicEvolving
public class DiscardingSink<IN> implements Sink<IN> {
private static final long serialVersionUID = 1L;

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
return new DiscardingElementWriter();
}

private class DiscardingElementWriter implements SinkWriter<IN> {

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
// discard it.
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// this writer has no pending data.
}

@Override
public void close() throws Exception {
// do nothing.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.annotation.Public;

/** Placeholder class to resolve compatibility issues. */
@Public
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.v2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;

/**
* A special sink that ignores all elements.
*
* @param <IN> The type of elements received by the sink.
*/
@PublicEvolving
public class DiscardingSink<IN> implements Sink<IN>, SupportsConcurrentExecutionAttempts {
private static final long serialVersionUID = 1L;

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
return new DiscardingElementWriter();
}

private class DiscardingElementWriter implements SinkWriter<IN> {

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
// discard it.
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// this writer has no pending data.
}

@Override
public void close() throws Exception {
// do nothing.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

/** Placeholder class to resolve compatibility issues. */
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.v2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;

/**
* A special sink that ignores all elements.
*
* @param <IN> The type of elements received by the sink.
*/
@PublicEvolving
public class DiscardingSink<IN> implements Sink<IN>, SupportsConcurrentExecutionAttempts {
private static final long serialVersionUID = 1L;

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
return new DiscardingElementWriter();
}

private class DiscardingElementWriter implements SinkWriter<IN> {

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
// discard it.
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// this writer has no pending data.
}

@Override
public void close() throws Exception {
// do nothing.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -138,7 +138,7 @@ public DataStreamSink<?> sinkFrom(
createCommittableStateManager()))
.setParallelism(input.getParallelism());
configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
}

protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -65,8 +65,7 @@ public void cancel() {}
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);

// check the transformation graph
LegacySinkTransformation<?> end =
(LegacySinkTransformation<?>) dataStreamSink.getTransformation();
Transformation<?> end = dataStreamSink.getTransformation();
assertThat(end.getName()).isEqualTo("end");

OneInputTransformation<?, ?> committer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -141,7 +141,7 @@ copyFiles, new SnapshotHintChannelComputer(), parallelism)
new SnapshotHintOperator(targetCatalogConfig))
.setParallelism(parallelism);

snapshotHintOperator.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
snapshotHintOperator.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
}

@Override
Expand Down
Loading

0 comments on commit be4357f

Please sign in to comment.