Skip to content

Commit

Permalink
tmp code to verify 2.0-preview
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 22, 2024
1 parent 0bb8152 commit 64e4381
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 16 deletions.
2 changes: 1 addition & 1 deletion paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.20.0</flink.version>
<flink.version>2.0-preview1</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.mongodb.cdc.version>3.1.1</flink.mongodb.cdc.version>
<avro.version>1.11.4</avro.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* A {@link FlinkKafkaProducer} which implements {@link LogSinkFunction} to register {@link
* WriteCallback}.
*/
public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction {
public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction{

private WriteCallback writeCallback;

Expand Down Expand Up @@ -66,7 +66,6 @@ public void setWriteCallback(WriteCallback writeCallback) {
this.writeCallback = writeCallback;
}

@Override
public void open(OpenContext openContext) throws Exception {
open(new Configuration());
}
Expand Down
2 changes: 1 addition & 1 deletion paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>

<properties>
<flink.version>1.20.0</flink.version>
<flink.version>2.0-preview1</flink.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.MultipleParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.MultipleParameterTool;

import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

Expand Down Expand Up @@ -57,6 +58,16 @@ public boolean isKeyType() {
}

@Override
public TypeSerializer<ChangelogCompactTask> createSerializer(
SerializerConfig serializerConfig) {
// we don't need copy for task
return new NoneCopyVersionedSerializerTypeSerializerProxy<ChangelogCompactTask>(
ChangelogCompactTaskSerializer::new) {};
}

/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<ChangelogCompactTask> createSerializer(ExecutionConfig config) {
// we don't need copy for task
return new NoneCopyVersionedSerializerTypeSerializerProxy<ChangelogCompactTask>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
Expand All @@ -40,7 +41,9 @@ public QueryAddressRegister(Table table) {
this.serviceManager = ((FileStoreTable) table).store().newServiceManager();
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public SinkWriter<InternalRow> createWriter(InitContext context) {
return new QueryAddressRegisterSinkWriter(serviceManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public TypeSerializer<Committable> createSerializer(SerializerConfig config) {
() -> new CommittableSerializer(new CommitMessageSerializer())) {};
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<Committable> createSerializer(ExecutionConfig config) {
// no copy, so that data from writer is directly going into committer while chaining
return new NoneCopyVersionedSerializerTypeSerializerProxy<Committable>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public TypeSerializer<UnawareAppendCompactionTask> createSerializer(SerializerCo
() -> new CompactionTaskSimpleSerializer(new CompactionTaskSerializer())) {};
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<UnawareAppendCompactionTask> createSerializer(ExecutionConfig config) {
// we don't need copy for task
return new NoneCopyVersionedSerializerTypeSerializerProxy<UnawareAppendCompactionTask>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.table.sink.SinkRecord;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;

/** Log {@link SinkFunction} with {@link WriteCallback}. */
public interface LogSinkFunction extends SinkFunction<SinkRecord> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public TypeSerializer<MultiTableCommittable> createSerializer(SerializerConfig c
() -> new MultiTableCommittableSerializer(new CommitMessageSerializer())) {};
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<MultiTableCommittable> createSerializer(ExecutionConfig config) {
// no copy, so that data from writer is directly going into committer while chaining
return new NoneCopyVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public MultiTableUnawareAppendCompactionTask copy(
};
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<MultiTableUnawareAppendCompactionTask> createSerializer(
ExecutionConfig executionConfig) {
return new SimpleVersionedSerializerTypeSerializerProxy<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public void open(OpenContext openContext) throws Exception {
* compatibility with Flink 1.18-.
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
shuffleKeyAbstract.open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public TypeSerializer<T> createSerializer(SerializerConfig config) {
return serializer.duplicate();
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return serializer.duplicate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public TypeSerializer<T> createSerializer(SerializerConfig config) {
return new JavaSerializer<>(this.typeClass);
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return new JavaSerializer<>(this.typeClass);
}
Expand Down

0 comments on commit 64e4381

Please sign in to comment.