From f94a1d80abfde350e655935a7c456ab17125e8c7 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Thu, 21 Nov 2024 16:30:12 +0800 Subject: [PATCH] [flink] Avoid deprecated usages about Configuration --- .../apache/paimon/benchmark/QueryRunner.java | 2 +- .../serialization/SerializerConfig.java | 22 ++++++ .../serialization/SerializerConfigImpl.java | 22 ++++++ .../serialization/SerializerConfig.java | 22 ++++++ .../serialization/SerializerConfigImpl.java | 22 ++++++ .../serialization/SerializerConfig.java | 22 ++++++ .../serialization/SerializerConfigImpl.java | 22 ++++++ .../serialization/SerializerConfig.java | 22 ++++++ .../serialization/SerializerConfigImpl.java | 22 ++++++ ...afkaDebeziumAvroDeserializationSchema.java | 2 +- .../strategy/MongoVersionStrategy.java | 8 +-- .../action/cdc/mysql/MySqlRecordParser.java | 15 +++-- ...lsarDebeziumAvroDeserializationSchema.java | 2 +- .../cdc/mongodb/MongodbSchemaITCase.java | 67 +++++++++---------- .../cdc/mysql/MySqlSyncTableActionITCase.java | 10 ++- .../sink/cdc/CdcRecordSerializeITCase.java | 28 ++++++-- .../changelog/ChangelogTaskTypeInfo.java | 13 +++- .../flink/sink/CommittableTypeInfo.java | 12 +++- .../flink/sink/CompactionTaskTypeInfo.java | 12 +++- .../apache/paimon/flink/sink/FlinkSink.java | 7 +- .../sink/MultiTableCommittableTypeInfo.java | 12 +++- .../MultiTableCompactionTaskTypeInfo.java | 13 +++- .../flink/source/FlinkSourceBuilder.java | 16 ++--- .../AlignedContinuousFileStoreSource.java | 2 +- .../paimon/flink/utils/InternalTypeInfo.java | 14 +++- .../paimon/flink/utils/JavaTypeInfo.java | 16 ++++- .../paimon/flink/FileSystemCatalogITCase.java | 3 +- .../paimon/flink/FlinkJobRecoveryITCase.java | 9 ++- .../paimon/flink/RescaleBucketITCase.java | 4 +- .../UnawareBucketAppendOnlyTableITCase.java | 14 +++- .../flink/sink/SinkSavepointITCase.java | 2 +- .../paimon/flink/util/AbstractTestBase.java | 14 ++-- .../flink/util/ReadWriteTableTestUtil.java | 26 ++++--- .../paimon/hive/HiveCatalogITCaseBase.java | 6 +- 34 files changed, 396 insertions(+), 109 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java diff --git a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java index b07cdef8465e..8bfe4b6c9c03 100644 --- a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java +++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java @@ -77,7 +77,7 @@ public Result run() { String sinkPathConfig = BenchmarkGlobalConfiguration.loadConfiguration() - .getString(BenchmarkOptions.SINK_PATH); + .get(BenchmarkOptions.SINK_PATH); if (sinkPathConfig == null) { throw new IllegalArgumentException( BenchmarkOptions.SINK_PATH.key() + " must be set"); diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java new file mode 100644 index 000000000000..16987469a948 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public interface SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java new file mode 100644 index 000000000000..374d33f6500d --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public class SerializerConfigImpl implements SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java new file mode 100644 index 000000000000..16987469a948 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public interface SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java new file mode 100644 index 000000000000..374d33f6500d --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public class SerializerConfigImpl implements SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java new file mode 100644 index 000000000000..16987469a948 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public interface SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java new file mode 100644 index 000000000000..374d33f6500d --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public class SerializerConfigImpl implements SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java new file mode 100644 index 000000000000..16987469a948 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public interface SerializerConfig {} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java new file mode 100644 index 000000000000..374d33f6500d --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +/** Placeholder class to resolve compatibility issues. */ +public class SerializerConfigImpl implements SerializerConfig {} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java index fc672b9dc0ab..eea364d460de 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java @@ -48,7 +48,7 @@ public class KafkaDebeziumAvroDeserializationSchema public KafkaDebeziumAvroDeserializationSchema(Configuration cdcSourceConfig) { this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig); - this.schemaRegistryUrl = cdcSourceConfig.getString(SCHEMA_REGISTRY_URL); + this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java index 64f127571134..df288a4150e6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java @@ -83,7 +83,7 @@ default Map getExtractRow( Configuration mongodbConfig) throws JsonProcessingException { SchemaAcquisitionMode mode = - SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase()); + SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase()); ObjectNode objectNode = JsonSerdeUtil.asSpecificNodeType(jsonNode.asText(), ObjectNode.class); JsonNode idNode = objectNode.get(ID_FIELD); @@ -92,7 +92,7 @@ default Map getExtractRow( "The provided MongoDB JSON document does not contain an _id field."); } JsonNode document = - mongodbConfig.getBoolean(DEFAULT_ID_GENERATION) + mongodbConfig.get(DEFAULT_ID_GENERATION) ? objectNode.set( ID_FIELD, idNode.get(OID_FIELD) == null ? idNode : idNode.get(OID_FIELD)) @@ -101,8 +101,8 @@ default Map getExtractRow( case SPECIFIED: return parseFieldsFromJsonRecord( document.toString(), - mongodbConfig.getString(PARSER_PATH), - mongodbConfig.getString(FIELD_NAME), + mongodbConfig.get(PARSER_PATH), + mongodbConfig.get(FIELD_NAME), computedColumns, rowTypeBuilder); case DYNAMIC: diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java index 502e6237a477..26579e718f56 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java @@ -45,6 +45,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.apache.flink.cdc.debezium.table.DebeziumOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.slf4j.Logger; @@ -99,11 +101,14 @@ public MySqlRecordParser( .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); String stringifyServerTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE); - this.isDebeziumSchemaCommentsEnabled = - mySqlConfig.getBoolean( - DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX - + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(), - false); + ConfigOption includeSchemaCommentsConfig = + ConfigOptions.key( + DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX + + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS + .name()) + .booleanType() + .defaultValue(false); + this.isDebeziumSchemaCommentsEnabled = mySqlConfig.get(includeSchemaCommentsConfig); this.serverTimeZone = stringifyServerTimeZone == null ? ZoneId.systemDefault() diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java index b0d1d1bf620f..f45ee034bec8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java @@ -46,7 +46,7 @@ public class PulsarDebeziumAvroDeserializationSchema public PulsarDebeziumAvroDeserializationSchema(Configuration cdcSourceConfig) { this.topic = PulsarActionUtils.findOneTopic(cdcSourceConfig); - this.schemaRegistryUrl = cdcSourceConfig.getString(SCHEMA_REGISTRY_URL); + this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java index 394cdd1f149b..f0328b566324 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java @@ -76,13 +76,12 @@ public static void initMongoDB() { @Test public void testCreateSchemaFromValidConfig() { Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); - mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); - mongodbConfig.setString( - MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); - mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); - mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); - mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); + mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); + mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); + mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); + mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase"); + mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection"); Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig); assertNotNull(schema); } @@ -90,13 +89,12 @@ public void testCreateSchemaFromValidConfig() { @Test public void testCreateSchemaFromInvalidHost() { Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345"); - mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); - mongodbConfig.setString( - MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); - mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); - mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); - mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345"); + mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); + mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); + mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); + mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase"); + mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection"); assertThrows( RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig)); @@ -106,7 +104,7 @@ public void testCreateSchemaFromInvalidHost() { public void testCreateSchemaFromIncompleteConfig() { // Create a Configuration object with missing necessary settings Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); // Expect an exception to be thrown due to missing necessary settings assertThrows( NullPointerException.class, @@ -117,13 +115,12 @@ public void testCreateSchemaFromIncompleteConfig() { public void testCreateSchemaFromDynamicConfig() { // Create a Configuration object with the necessary settings Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); - mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); - mongodbConfig.setString( - MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); - mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); - mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); - mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); + mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); + mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); + mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); + mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase"); + mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection"); // Call the method and check the results Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig); @@ -142,13 +139,12 @@ public void testCreateSchemaFromDynamicConfig() { @Test public void testCreateSchemaFromInvalidDatabase() { Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); - mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); - mongodbConfig.setString( - MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); - mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); - mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "invalidDatabase"); - mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); + mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); + mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); + mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); + mongodbConfig.set(MongoDBSourceOptions.DATABASE, "invalidDatabase"); + mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection"); assertThrows( RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig)); @@ -157,13 +153,12 @@ public void testCreateSchemaFromInvalidDatabase() { @Test public void testCreateSchemaFromInvalidCollection() { Configuration mongodbConfig = new Configuration(); - mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); - mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); - mongodbConfig.setString( - MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); - mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); - mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); - mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "invalidCollection"); + mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); + mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER); + mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD); + mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); + mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase"); + mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "invalidCollection"); assertThrows( RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig)); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index bdeab07a746c..febbe4e1deaa 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -31,7 +31,8 @@ import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.JsonSerdeUtil; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.BeforeAll; @@ -1285,8 +1286,11 @@ public void testDefaultCheckpointInterval() throws Exception { mySqlConfig.put("database-name", "default_checkpoint"); mySqlConfig.put("table-name", "t"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(RestartStrategies.noRestart()); + // Using `none` to avoid compatibility issues with Flink 1.18-. + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build(); action.withStreamExecutionEnvironment(env); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java index 698900436e8d..b202ca53c9cc 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java @@ -25,6 +25,8 @@ import org.apache.paimon.types.VarCharType; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -35,6 +37,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -49,7 +53,7 @@ public class CdcRecordSerializeITCase { @Test - public void testCdcRecordKryoSerialize() throws IOException { + public void testCdcRecordKryoSerialize() throws Exception { KryoSerializer kr = createFlinkKryoSerializer(RichCdcMultiplexRecord.class); RowType.Builder rowType = RowType.builder(); @@ -78,7 +82,7 @@ public void testCdcRecordKryoSerialize() throws IOException { } @Test - public void testUnmodifiableListKryoSerialize() throws IOException { + public void testUnmodifiableListKryoSerialize() throws Exception { KryoSerializer kryoSerializer = createFlinkKryoSerializer(List.class); RowType.Builder rowType = RowType.builder(); rowType.field("id", new BigIntType()); @@ -101,8 +105,24 @@ public void testUnmodifiableListKryoSerialize() throws IOException { assertThat(deserializeRecord).isEqualTo(fields); } - public static KryoSerializer createFlinkKryoSerializer(Class type) { - return new KryoSerializer<>(type, new ExecutionConfig()); + @SuppressWarnings({"unchecked", "rawtypes"}) + public static KryoSerializer createFlinkKryoSerializer(Class type) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, + IllegalAccessException { + try { + Constructor constructor = + KryoSerializer.class.getConstructor(Class.class, SerializerConfig.class); + return (KryoSerializer) constructor.newInstance(type, new SerializerConfigImpl()); + } catch (NoSuchMethodException + | InvocationTargetException + | IllegalAccessException + | InstantiationException e) { + // to stay compatible with Flink 1.18- + } + + Constructor constructor = + KryoSerializer.class.getConstructor(Class.class, ExecutionConfig.class); + return (KryoSerializer) constructor.newInstance(type, new ExecutionConfig()); } private static final class TestOutputView extends DataOutputStream implements DataOutputView { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java index 5cae899a0704..a529e6764fae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -56,7 +57,17 @@ public boolean isKeyType() { return false; } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer( + SerializerConfig serializerConfig) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer(ExecutionConfig config) { // we don't need copy for task return new NoneCopyVersionedSerializerTypeSerializerProxy( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java index dcb87238b833..92e826a91379 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java @@ -21,6 +21,7 @@ import org.apache.paimon.table.sink.CommitMessageSerializer; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -57,7 +58,16 @@ public boolean isKeyType() { return false; } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer(SerializerConfig config) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer(ExecutionConfig config) { // no copy, so that data from writer is directly going into committer while chaining return new NoneCopyVersionedSerializerTypeSerializerProxy( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java index 47defa61a971..6510a85b800a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java @@ -22,6 +22,7 @@ import org.apache.paimon.table.sink.CompactionTaskSerializer; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -58,7 +59,16 @@ public boolean isKeyType() { return false; } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer(SerializerConfig config) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer(ExecutionConfig config) { // we don't need copy for task return new NoneCopyVersionedSerializerTypeSerializerProxy( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 59f2f4b1035f..dd364c196d8b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -338,13 +337,11 @@ public static void assertStreamingConfiguration(StreamExecutionEnvironment env) checkArgument( !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Paimon sink currently does not support unaligned checkpoints. Please set " - + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() - + " to false."); + + "execution.checkpointing.unaligned.enabled to false."); checkArgument( env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " - + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() - + " to exactly-once"); + + "execution.checkpointing.mode to exactly-once"); } public static void assertBatchAdaptiveParallelism( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java index f82f08209867..7da0ae0e2078 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java @@ -21,6 +21,7 @@ import org.apache.paimon.table.sink.CommitMessageSerializer; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -57,7 +58,16 @@ public boolean isKeyType() { return false; } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer(SerializerConfig config) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer(ExecutionConfig config) { // no copy, so that data from writer is directly going into committer while chaining return new NoneCopyVersionedSerializerTypeSerializerProxy( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java index f27f29f87fe7..0116ff198811 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java @@ -23,6 +23,7 @@ import org.apache.paimon.table.sink.MultiTableCompactionTaskSerializer; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; @@ -60,7 +61,17 @@ public boolean isKeyType() { return false; } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer( + SerializerConfig serializerConfig) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer( ExecutionConfig executionConfig) { return new SimpleVersionedSerializerTypeSerializerProxy< diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index b3dcd4840cc1..e864ec050045 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; @@ -331,30 +330,25 @@ private void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment checkArgument( checkpointConfig.isCheckpointingEnabled(), "The align mode of paimon source is only supported when checkpoint enabled. Please set " - + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() - + "larger than 0"); + + "execution.checkpointing.interval larger than 0"); checkArgument( checkpointConfig.getMaxConcurrentCheckpoints() == 1, "The align mode of paimon source supports at most one ongoing checkpoint at the same time. Please set " - + ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key() - + " to 1"); + + "execution.checkpointing.max-concurrent-checkpoints to 1"); checkArgument( checkpointConfig.getCheckpointTimeout() > conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT) .toMillis(), "The align mode of paimon source requires that the timeout of checkpoint is greater than the timeout of the source's snapshot alignment. Please increase " - + ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key() - + " or decrease " + + "execution.checkpointing.timeout or decrease " + FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key()); checkArgument( !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "The align mode of paimon source currently does not support unaligned checkpoints. Please set " - + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() - + " to false."); + + "execution.checkpointing.unaligned.enabled to false."); checkArgument( env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "The align mode of paimon source currently only supports EXACTLY_ONCE checkpoint mode. Please set " - + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() - + " to exactly-once"); + + "execution.checkpointing.mode to exactly-once"); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index d6b7060763ac..705e1d9a7a4c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -73,7 +73,7 @@ public SourceReader createReader(SourceReaderCont limit, new FutureCompletingBlockingQueue<>( context.getConfiguration() - .getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY))); + .get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY))); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java index 4ea5db9f34d4..60898421ddea 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java @@ -22,6 +22,7 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -73,8 +74,17 @@ public boolean isKeyType() { return false; } - @Override - public TypeSerializer createSerializer(ExecutionConfig config) { + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer(SerializerConfig config) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ + public TypeSerializer createSerializer(ExecutionConfig executionConfig) { return serializer.duplicate(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java index a36243c5bdac..4aea809b51bc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -78,7 +79,16 @@ public boolean isKeyType() { return Comparable.class.isAssignableFrom(typeClass); } - @Override + /** + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + */ + public TypeSerializer createSerializer(SerializerConfig config) { + return this.createSerializer((ExecutionConfig) null); + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ public TypeSerializer createSerializer(ExecutionConfig config) { return new JavaSerializer<>(this.typeClass); } @@ -91,7 +101,9 @@ public TypeComparator createComparator( @SuppressWarnings("rawtypes") GenericTypeComparator comparator = new GenericTypeComparator( - sortOrderAscending, createSerializer(executionConfig), this.typeClass); + sortOrderAscending, + new JavaSerializer<>(this.typeClass), + this.typeClass); return (TypeComparator) comparator; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 239043ff79e1..915c93680a0d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -27,7 +27,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.utils.BlockingIterator; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -60,7 +59,7 @@ public void setup() { tableEnvironmentBuilder() .streamingMode() .parallelism(1) - .setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false) + .setString("execution.checkpointing.unaligned.enabled", "false") .build(); path = getTempDirPath(); tEnv.executeSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java index c46c4c358922..8df379a71b78 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java @@ -65,7 +65,7 @@ public void before() throws IOException { .set( CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION) - .removeConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL); + .removeKey("execution.checkpointing.interval"); // insert source data batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')"); @@ -219,10 +219,9 @@ private void testRecoverFromSavepoint( batchSql(sql); } - Configuration config = - sEnv.getConfig() - .getConfiguration() - .set(StateRecoveryOptions.SAVEPOINT_PATH, checkpointPath); + Configuration config = sEnv.getConfig().getConfiguration(); + // use config string to stay compatible with flink 1.19- + config.setString("execution.state-recovery.path", checkpointPath); for (Map.Entry entry : recoverOptions.entrySet()) { config.setString(entry.getKey(), entry.getValue()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java index 08969bddfdb3..d5747d2e28d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; @@ -106,9 +105,10 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception { assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData); // step5: resume streaming job + // use config string to stay compatible with flink 1.19- sEnv.getConfig() .getConfiguration() - .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); + .setString("execution.state-recovery.path", savepointPath); JobClient resumedJobClient = startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id()); // stop job diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java index cb323542d4c1..efe948d4ecae 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -49,7 +50,6 @@ import java.util.List; import java.util.Random; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -202,7 +202,11 @@ public void testCompactionInStreamingMode() throws Exception { batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')"); batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')"); - sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500)); + sEnv.getConfig() + .getConfiguration() + .setString( + "execution.checkpointing.interval", + TimeUtils.formatWithHighestUnit(Duration.ofMillis(500))); sEnv.executeSql( "CREATE TEMPORARY TABLE Orders_in (\n" + " f0 INT,\n" @@ -223,7 +227,11 @@ public void testCompactionInStreamingModeWithMaxWatermark() throws Exception { batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')"); batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')"); - sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500)); + sEnv.getConfig() + .getConfiguration() + .setString( + "execution.checkpointing.interval", + TimeUtils.formatWithHighestUnit(Duration.ofMillis(500))); sEnv.executeSql( "CREATE TEMPORARY TABLE Orders_in (\n" + " f0 INT,\n" diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java index 6b912d2e57fe..b1486deacb0c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java @@ -137,7 +137,7 @@ private JobClient runRecoverFromSavepointJob(String failingPath, String savepoin .parallelism(1) .allowRestart() .setConf(conf) - .setConf(StateBackendOptions.STATE_BACKEND, "filesystem") + .setConf(StateBackendOptions.STATE_BACKEND, "hashmap") .setConf( CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + path + "/checkpoint") diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java index ce0017eb1874..ee838ed68255 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.util; import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Transformation; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -164,6 +164,11 @@ public TableEnvironmentBuilder setConf(ConfigOption option, T value) { return this; } + public TableEnvironmentBuilder setString(String key, String value) { + conf.setString(key, value); + return this; + } + public TableEnvironmentBuilder setConf(Configuration conf) { this.conf.addAll(conf); return this; @@ -182,9 +187,10 @@ public TableEnvironment build() { if (checkpointIntervalMs != null) { tEnv.getConfig() .getConfiguration() - .set( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, - Duration.ofMillis(checkpointIntervalMs)); + .setString( + "execution.checkpointing.interval", + TimeUtils.formatWithHighestUnit( + Duration.ofMillis(checkpointIntervalMs))); } } else { tEnv = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java index 86b0014eb39c..9c3170f9a96b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java @@ -23,8 +23,9 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -75,12 +76,11 @@ public static void init(String warehouse) { } public static void init(String warehouse, int parallelism) { - StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism); - sExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + // Using `none` to avoid compatibility issues with Flink 1.18-. + StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism, "none"); sEnv = StreamTableEnvironment.create(sExeEnv); - bExeEnv = buildBatchEnv(parallelism); - bExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + bExeEnv = buildBatchEnv(parallelism, "none"); bEnv = StreamTableEnvironment.create(bExeEnv, EnvironmentSettings.inBatchMode()); ReadWriteTableTestUtil.warehouse = warehouse; @@ -95,16 +95,24 @@ public static void init(String warehouse, int parallelism) { bEnv.useCatalog(catalog); } - public static StreamExecutionEnvironment buildStreamEnv(int parallelism) { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + public static StreamExecutionEnvironment buildStreamEnv( + int parallelism, String restartStrategy) { + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, restartStrategy); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.enableCheckpointing(100); env.setParallelism(parallelism); return env; } - public static StreamExecutionEnvironment buildBatchEnv(int parallelism) { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + public static StreamExecutionEnvironment buildBatchEnv( + int parallelism, String restartStrategy) { + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, restartStrategy); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(parallelism); return env; diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 74d2d7e1c343..2266a8484d9d 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -31,12 +31,12 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.TimeUtils; import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; @@ -139,7 +139,9 @@ private void registerHiveCatalog(String catalogName, Map catalog EnvironmentSettings.newInstance().inStreamingMode().build()); sEnv.getConfig() .getConfiguration() - .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1)); + .setString( + "execution.checkpointing.interval", + TimeUtils.formatWithHighestUnit(Duration.ofSeconds(1))); sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tEnv.executeSql(