Skip to content

Commit

Permalink
[flink] Avoid deprecated usages about Configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 26, 2024
1 parent be24886 commit f94a1d8
Show file tree
Hide file tree
Showing 34 changed files with 396 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Result run() {

String sinkPathConfig =
BenchmarkGlobalConfiguration.loadConfiguration()
.getString(BenchmarkOptions.SINK_PATH);
.get(BenchmarkOptions.SINK_PATH);
if (sinkPathConfig == null) {
throw new IllegalArgumentException(
BenchmarkOptions.SINK_PATH.key() + " must be set");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public interface SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public class SerializerConfigImpl implements SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public interface SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public class SerializerConfigImpl implements SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public interface SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public class SerializerConfigImpl implements SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public interface SerializerConfig {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

/** Placeholder class to resolve compatibility issues. */
public class SerializerConfigImpl implements SerializerConfig {}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class KafkaDebeziumAvroDeserializationSchema

public KafkaDebeziumAvroDeserializationSchema(Configuration cdcSourceConfig) {
this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
this.schemaRegistryUrl = cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ default Map<String, String> getExtractRow(
Configuration mongodbConfig)
throws JsonProcessingException {
SchemaAcquisitionMode mode =
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
ObjectNode objectNode =
JsonSerdeUtil.asSpecificNodeType(jsonNode.asText(), ObjectNode.class);
JsonNode idNode = objectNode.get(ID_FIELD);
Expand All @@ -92,7 +92,7 @@ default Map<String, String> getExtractRow(
"The provided MongoDB JSON document does not contain an _id field.");
}
JsonNode document =
mongodbConfig.getBoolean(DEFAULT_ID_GENERATION)
mongodbConfig.get(DEFAULT_ID_GENERATION)
? objectNode.set(
ID_FIELD,
idNode.get(OID_FIELD) == null ? idNode : idNode.get(OID_FIELD))
Expand All @@ -101,8 +101,8 @@ default Map<String, String> getExtractRow(
case SPECIFIED:
return parseFieldsFromJsonRecord(
document.toString(),
mongodbConfig.getString(PARSER_PATH),
mongodbConfig.getString(FIELD_NAME),
mongodbConfig.get(PARSER_PATH),
mongodbConfig.get(FIELD_NAME),
computedColumns,
rowTypeBuilder);
case DYNAMIC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,11 +101,14 @@ public MySqlRecordParser(
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
String stringifyServerTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);

this.isDebeziumSchemaCommentsEnabled =
mySqlConfig.getBoolean(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);
ConfigOption<Boolean> includeSchemaCommentsConfig =
ConfigOptions.key(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
.name())
.booleanType()
.defaultValue(false);
this.isDebeziumSchemaCommentsEnabled = mySqlConfig.get(includeSchemaCommentsConfig);
this.serverTimeZone =
stringifyServerTimeZone == null
? ZoneId.systemDefault()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class PulsarDebeziumAvroDeserializationSchema

public PulsarDebeziumAvroDeserializationSchema(Configuration cdcSourceConfig) {
this.topic = PulsarActionUtils.findOneTopic(cdcSourceConfig);
this.schemaRegistryUrl = cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,25 @@ public static void initMongoDB() {
@Test
public void testCreateSchemaFromValidConfig() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");
mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
assertNotNull(schema);
}

@Test
public void testCreateSchemaFromInvalidHost() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");
mongodbConfig.set(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");

assertThrows(
RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
Expand All @@ -106,7 +104,7 @@ public void testCreateSchemaFromInvalidHost() {
public void testCreateSchemaFromIncompleteConfig() {
// Create a Configuration object with missing necessary settings
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
// Expect an exception to be thrown due to missing necessary settings
assertThrows(
NullPointerException.class,
Expand All @@ -117,13 +115,12 @@ public void testCreateSchemaFromIncompleteConfig() {
public void testCreateSchemaFromDynamicConfig() {
// Create a Configuration object with the necessary settings
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");
mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");

// Call the method and check the results
Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
Expand All @@ -142,13 +139,12 @@ public void testCreateSchemaFromDynamicConfig() {
@Test
public void testCreateSchemaFromInvalidDatabase() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "invalidDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");
mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.set(MongoDBSourceOptions.DATABASE, "invalidDatabase");
mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");

assertThrows(
RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
Expand All @@ -157,13 +153,12 @@ public void testCreateSchemaFromInvalidDatabase() {
@Test
public void testCreateSchemaFromInvalidCollection() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "invalidCollection");
mongodbConfig.set(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.set(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.set(MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "invalidCollection");

assertThrows(
RuntimeException.class, () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
Expand Down
Loading

0 comments on commit f94a1d8

Please sign in to comment.