Skip to content

Commit

Permalink
[flink] Fix mongodb CDC Ingestion QueryException (#2028)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Sep 19, 2023
1 parent 9587952 commit f8d224b
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;

Expand Down Expand Up @@ -86,43 +93,111 @@ public List<String> primaryKeys() {
return primaryKeys;
}

/**
* Utility class for creating a MongoDB schema based on the provided configuration. The schema
* can be created in one of the two modes:
*
* <ul>
* <li><b>SPECIFIED</b>: In this mode, the schema is created based on the explicit column
* names provided in the configuration. The data types for all columns are assumed to be
* STRING.
* <li><b>DYNAMIC</b>: In this mode, the schema is inferred dynamically from the first
* document in the specified MongoDB collection.
* </ul>
*
* <p>The Configuration object passed to the createSchema method should have the necessary
* MongoDB configuration properties set, including the host address, database name, collection
* name, and optionally, the username and password for authentication. For the SPECIFIED mode,
* the field names should also be specified in the configuration.
*/
public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
String databaseName =
Objects.requireNonNull(
mongodbConfig.get(MongoDBSourceOptions.DATABASE),
"Database name cannot be null");
String collectionName =
Objects.requireNonNull(
mongodbConfig.get(MongoDBSourceOptions.COLLECTION),
"Collection name cannot be null");

switch (mode) {
case SPECIFIED:
return createSchemaFromSpecifiedConfig(mongodbConfig);
String[] columnNames =
Objects.requireNonNull(
mongodbConfig.get(FIELD_NAME), "Field names cannot be null")
.split(",");
LinkedHashMap<String, DataType> schemaFields =
generateSchemaFields(Arrays.asList(columnNames));
return new MongodbSchema(
databaseName,
collectionName,
schemaFields,
Collections.singletonList(ID_FIELD));
case DYNAMIC:
return createSchemaFromDynamicConfig(mongodbConfig);
String hosts =
Objects.requireNonNull(
mongodbConfig.get(MongoDBSourceOptions.HOSTS),
"Hosts cannot be null");

MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

settingsBuilder.applyConnectionString(
new ConnectionString(
buildConnectionString(
mongodbConfig.get(MongoDBSourceOptions.USERNAME),
mongodbConfig.get(MongoDBSourceOptions.PASSWORD),
mongodbConfig.get(MongoDBSourceOptions.SCHEME),
hosts,
mongodbConfig.get(
MongoDBSourceOptions.CONNECTION_OPTIONS))));

MongoClientSettings settings = settingsBuilder.build();

try (MongoClient mongoClient = MongoClients.create(settings)) {
MongoDatabase database = mongoClient.getDatabase(databaseName);
MongoCollection<Document> collection = database.getCollection(collectionName);
Document firstDocument = collection.find().first();

if (firstDocument == null) {
throw new IllegalStateException(
"No documents in collection to infer schema");
}

return createMongodbSchema(
databaseName, collectionName, getColumnNames(firstDocument));
} catch (Exception e) {
throw new RuntimeException(
"Failed to create schema from MongoDB collection", e);
}
default:
throw new IllegalArgumentException("Unsupported schema acquisition mode: " + mode);
}
}

private static SchemaAcquisitionMode getModeFromConfig(Configuration mongodbConfig) {
return SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
}
public static String buildConnectionString(
@Nullable String username,
@Nullable String password,
String scheme,
String hosts,
@Nullable String connectionOptions) {
StringBuilder sb = new StringBuilder(scheme).append("://");

private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration mongodbConfig) {
String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
LinkedHashMap<String, DataType> schemaFields =
generateSchemaFields(Arrays.asList(columnNames));
String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
String collectionName = mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
return new MongodbSchema(
databaseName, collectionName, schemaFields, Collections.singletonList(ID_FIELD));
}
if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
}

sb.append(checkNotNull(hosts));

private static MongodbSchema createSchemaFromDynamicConfig(Configuration mongodbConfig) {
String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
String collectionName = mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
String url = String.format("mongodb://%s/%s", hosts, databaseName);
try (MongoClient mongoClient = MongoClients.create(url)) {
MongoDatabase database = mongoClient.getDatabase(databaseName);
MongoCollection<Document> collection = database.getCollection(collectionName);
Document firstDocument = collection.find().first();
return createMongodbSchema(databaseName, collectionName, getColumnNames(firstDocument));
if (StringUtils.isNotEmpty(connectionOptions)) {
sb.append("/?").append(connectionOptions);
}

return sb.toString();
}

private static SchemaAcquisitionMode getModeFromConfig(Configuration mongodbConfig) {
return SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
}

private static List<String> getColumnNames(Document document) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER_PASSWORD;

/** Base test class for {@link org.apache.paimon.flink.action.Action}s related to MongoDB. */
public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {

Expand Down Expand Up @@ -65,6 +68,8 @@ public static void startContainers() {
protected Map<String, String> getBasicMongoDBConfig() {
Map<String, String> config = new HashMap<>();
config.put("hosts", MONGODB_CONTAINER.getHostAndPort());
config.put("username", PAIMON_USER);
config.put("password", PAIMON_USER_PASSWORD);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class MongoDBContainer extends org.testcontainers.containers.MongoDBConta

private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$");

public static final String PAIMON_USER = "flinkuser";

public static final String PAIMON_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";

public static final int MONGODB_PORT = 27017;

public MongoDBContainer(String imageName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mongodb;

import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedHashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Tests for {@link MongodbSchema}. */
public class MongodbSchemaITCase extends MongoDBActionITCaseBase {

@BeforeAll
public static void initMongoDB() {
// Create a real MongoDB client and insert a document to infer the schema
MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder()
.applyToClusterSettings(
builder ->
builder.hosts(
Collections.singletonList(
new ServerAddress(
MONGODB_CONTAINER
.getHostAndPort()))))
.credential(
MongoCredential.createCredential(
MongoDBContainer.PAIMON_USER,
"admin",
MongoDBContainer.PAIMON_USER_PASSWORD.toCharArray()));

MongoClientSettings settings = settingsBuilder.build();
try (MongoClient mongoClient = MongoClients.create(settings)) {
MongoDatabase database = mongoClient.getDatabase("testDatabase");
MongoCollection<Document> collection = database.getCollection("testCollection");
Document doc = new Document("name", "Alice").append("age", 30);
collection.insertOne(doc);
}
}

@Test
public void testCreateSchemaFromValidConfig() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");
MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
assertNotNull(schema);
assertEquals("testDatabase", schema.databaseName());
assertEquals("testCollection", schema.tableName());
}

@Test
public void testCreateSchemaFromInvalidHost() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");

assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig));
}

@Test
public void testCreateSchemaFromIncompleteConfig() {
// Create a Configuration object with missing necessary settings
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
// Expect an exception to be thrown due to missing necessary settings
assertThrows(
NullPointerException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig));
}

@Test
public void testCreateSchemaFromDynamicConfig() {
// Create a Configuration object with the necessary settings
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");

// Call the method and check the results
MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);

// Verify the schema
assertNotNull(schema);
assertEquals("testDatabase", schema.databaseName());
assertEquals("testCollection", schema.tableName());

LinkedHashMap<String, DataType> expectedFields = new LinkedHashMap<>();
expectedFields.put("name", DataTypes.STRING());
expectedFields.put("age", DataTypes.STRING());
expectedFields.put("_id", DataTypes.STRING());

assertEquals(expectedFields, schema.fields());
}

@Test
public void testCreateSchemaFromInvalidDatabase() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "invalidDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection");

assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig));
}

@Test
public void testCreateSchemaFromInvalidCollection() {
Configuration mongodbConfig = new Configuration();
mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort());
mongodbConfig.setString(MongoDBSourceOptions.USERNAME, MongoDBContainer.PAIMON_USER);
mongodbConfig.setString(
MongoDBSourceOptions.PASSWORD, MongoDBContainer.PAIMON_USER_PASSWORD);
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "invalidCollection");

assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig));
}
}

0 comments on commit f8d224b

Please sign in to comment.