Skip to content

Commit

Permalink
tmp code to verify 2.0-preview
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 22, 2024
1 parent fa33b76 commit 48db90f
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 41 deletions.
2 changes: 1 addition & 1 deletion paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.20.0</flink.version>
<flink.version>2.0-preview1</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.mongodb.cdc.version>3.1.1</flink.mongodb.cdc.version>
<avro.version>1.11.4</avro.version>
Expand Down
8 changes: 7 additions & 1 deletion paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>

<properties>
<flink.version>1.20.0</flink.version>
<flink.version>2.0-preview1</flink.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -169,6 +169,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>2.0-preview1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.MultipleParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.MultipleParameterTool;

import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
Expand All @@ -40,7 +41,9 @@ public QueryAddressRegister(Table table) {
this.serviceManager = ((FileStoreTable) table).store().newServiceManager();
}

@Override
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.20-.
*/
public SinkWriter<InternalRow> createWriter(InitContext context) {
return new QueryAddressRegisterSinkWriter(serviceManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.table.sink.SinkRecord;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;

/** Log {@link SinkFunction} with {@link WriteCallback}. */
public interface LogSinkFunction extends SinkFunction<SinkRecord> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -106,9 +106,7 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData);

// step5: resume streaming job
sEnv.getConfig()
.getConfiguration()
.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
sEnv.getConfig().getConfiguration().set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
JobClient resumedJobClient =
startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id());
// stop job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ public void testAlterTableNonPhysicalColumn() {
+ " `ts` TIMESTAMP(3),\n"
+ " `ee` VARCHAR(2147483647) METADATA,\n"
+ " WATERMARK FOR `ts` AS `ts`\n"
+ ") ")
+ ")")
.doesNotContain("schema");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.paimon.flink.source;

import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testParallelSourceExecution() throws Exception {
"iterator source");

final List<RowData> result =
DataStreamUtils.collectBoundedStream(stream, "Iterator Source Test");
IteratorUtils.toList(stream.executeAndCollect("Iterator Source Test"));

verifySequence(result, 1L, 1_000L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
Expand All @@ -59,12 +60,14 @@ public class TestingSourceOperator<T> extends SourceOperator<T, SimpleSourceSpli
private final int parallelism;

public TestingSourceOperator(
StreamOperatorParameters<T> parameters,
SourceReader<T, SimpleSourceSplit> reader,
WatermarkStrategy<T> watermarkStrategy,
ProcessingTimeService timeService,
boolean emitProgressiveWatermarks) {

this(
parameters,
reader,
watermarkStrategy,
timeService,
Expand All @@ -75,6 +78,7 @@ public TestingSourceOperator(
}

public TestingSourceOperator(
StreamOperatorParameters<T> parameters,
SourceReader<T, SimpleSourceSplit> reader,
WatermarkStrategy<T> watermarkStrategy,
ProcessingTimeService timeService,
Expand All @@ -84,6 +88,7 @@ public TestingSourceOperator(
boolean emitProgressiveWatermarks) {

super(
parameters,
(context) -> reader,
eventGateway,
new SimpleSourceSplitSerializer(),
Expand Down Expand Up @@ -145,12 +150,18 @@ public static <T> SourceOperator<T, SimpleSourceSplit> createTestOperator(

final SourceOperator<T, SimpleSourceSplit> sourceOperator =
new TestingSourceOperator<>(
reader, watermarkStrategy, timeService, emitProgressiveWatermarks);
new StreamOperatorParameters(
new SourceOperatorStreamTask<Integer>(new DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),
null,
null,
null),
reader,
watermarkStrategy,
timeService,
emitProgressiveWatermarks);

sourceOperator.setup(
new SourceOperatorStreamTask<Integer>(new DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()));
sourceOperator.initializeState(stateContext);
sourceOperator.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.util.TestEnvironment;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
Expand Down Expand Up @@ -167,17 +166,12 @@ private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtensi
.getOptional(CoreOptions.DEFAULT_PARALLELISM)
.orElse(internalMiniClusterExtension.getNumberSlots());

TestEnvironment executionEnvironment =
new TestEnvironment(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism, false);
executionEnvironment.setAsContext();
TestStreamEnvironment.setAsContext(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism);
}

private void unregisterEnv(InternalMiniClusterExtension internalMiniClusterExtension) {
TestStreamEnvironment.unsetAsContext();
TestEnvironment.unsetAsContext();
}

private MiniClusterClient createMiniClusterClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -36,6 +36,7 @@
import javax.annotation.Nullable;

import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -52,7 +53,7 @@
/** Test util for {@link ReadWriteTableITCase}. */
public class ReadWriteTableTestUtil {

private static final Time TIME_OUT = Time.seconds(10);
private static final Duration TIME_OUT = Duration.ofSeconds(10);

public static final int DEFAULT_PARALLELISM = 2;

Expand All @@ -75,12 +76,16 @@ public static void init(String warehouse) {
}

public static void init(String warehouse, int parallelism) {
StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism);
sExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
StreamExecutionEnvironment sExeEnv =
buildStreamEnv(
parallelism,
RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY);
sEnv = StreamTableEnvironment.create(sExeEnv);

bExeEnv = buildBatchEnv(parallelism);
bExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
bExeEnv =
buildBatchEnv(
parallelism,
RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY);
bEnv = StreamTableEnvironment.create(bExeEnv, EnvironmentSettings.inBatchMode());

ReadWriteTableTestUtil.warehouse = warehouse;
Expand All @@ -95,16 +100,26 @@ public static void init(String warehouse, int parallelism) {
bEnv.useCatalog(catalog);
}

public static StreamExecutionEnvironment buildStreamEnv(int parallelism) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static StreamExecutionEnvironment buildStreamEnv(
int parallelism, RestartStrategyOptions.RestartStrategyType restartStrategyType) {
Configuration configuration = new Configuration();
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY, restartStrategyType.getMainValue());
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(100);
env.setParallelism(parallelism);
return env;
}

public static StreamExecutionEnvironment buildBatchEnv(int parallelism) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static StreamExecutionEnvironment buildBatchEnv(
int parallelism, RestartStrategyOptions.RestartStrategyType restartStrategyType) {
Configuration configuration = new Configuration();
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY, restartStrategyType.getMainValue());
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(parallelism);
return env;
Expand Down Expand Up @@ -270,7 +285,7 @@ public static void testBatchRead(String query, List<Row> expected) throws Except
try (BlockingIterator<Row, Row> iterator = BlockingIterator.of(resultItr)) {
if (!expected.isEmpty()) {
List<Row> result =
iterator.collect(expected.size(), TIME_OUT.getSize(), TIME_OUT.getUnit());
iterator.collect(expected.size(), TIME_OUT.getSeconds(), TimeUnit.SECONDS);
assertThat(toInsertOnlyRows(result))
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ under the License.
<junit5.version>5.8.1</junit5.version>
<spotless.version>2.13.0</spotless.version>
<spotless.delimiter>package</spotless.delimiter>
<target.java.version>1.8</target.java.version>
<target.java.version>11</target.java.version>
<assertj.version>3.23.1</assertj.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<flink.forkCount>1C</flink.forkCount>
Expand Down Expand Up @@ -836,6 +836,10 @@ under the License.
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
<arg>-Xpkginfo:always</arg>
<arg>-Xlint:deprecation</arg>
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down
4 changes: 2 additions & 2 deletions tools/ci/paimon-ci-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
Expand Down

0 comments on commit 48db90f

Please sign in to comment.