diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 8aef65d6a..fcd0f4c0e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -50,26 +50,36 @@ public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; private static final String TABLE_NAME_OPTIONS = "table-name"; + protected Configuration config; + protected String database; + protected TableNameConverter converter; protected Pattern includingPattern; protected Pattern excludingPattern; protected Map tableConfig; protected Configuration sinkConfig; protected boolean ignoreDefaultValue; + public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; protected String includingTables; protected String excludingTables; + public abstract void registerDriver() throws SQLException; + public abstract Connection getConnection() throws SQLException; public abstract List getSchemaList() throws Exception; public abstract DataStreamSource buildCdcSource(StreamExecutionEnvironment env); + public DatabaseSync() throws SQLException { + registerDriver(); + } + public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 2235e0b99..22e49aaa7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -54,10 +54,25 @@ public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); - private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; - private static String PROPERTIES_PREFIX = "jdbc.properties."; + private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; + private static final String PROPERTIES_PREFIX = "jdbc.properties."; - public MysqlDatabaseSync() { + public MysqlDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException ex) { + LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver"); + try { + Class.forName("com.mysql.jdbc.Driver"); + } catch (Exception e) { + throw new SQLException("No suitable driver found, can not found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver"); + } + } } @Override @@ -86,7 +101,7 @@ public List getSchemaList() throws Exception { } SourceSchema sourceSchema = new MysqlSchema(metaData, databaseName, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } } @@ -196,9 +211,8 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { } MySqlSource mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); - DataStreamSource streamSource = env.fromSource( + return env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); - return streamSource; } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 004957943..6e27eb6c8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -59,9 +59,24 @@ public class OracleDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class); - private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; + private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s"; - public OracleDatabaseSync() { + public OracleDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("oracle.jdbc.driver.OracleDriver"); + } catch (ClassNotFoundException ex) { + LOG.warn("can not found class oracle.jdbc.driver.OracleDriver, use class oracle.jdbc.OracleDriver"); + try { + Class.forName("oracle.jdbc.OracleDriver"); + } catch (Exception e) { + throw new SQLException("No suitable driver found, can not found class oracle.jdbc.driver.OracleDriver and oracle.jdbc.OracleDriver"); + } + } } @Override @@ -97,7 +112,7 @@ public List getSchemaList() throws Exception { } SourceSchema sourceSchema = new OracleSchema(metaData, databaseName, schemaName, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 31878a5ff..b8c9ad1d6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -62,9 +62,19 @@ public class PostgresDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class); - private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; + private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; - public PostgresDatabaseSync() { + public PostgresDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("org.postgresql.Driver"); + } catch (ClassNotFoundException ex) { + throw new SQLException("No suitable driver found, can not found class org.postgresql.Driver"); + } } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index 6cf9c9d96..fb25212ec 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -58,10 +58,20 @@ public class SqlServerDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); - private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; - private static String PORT = "port"; + private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; + private static final String PORT = "port"; - public SqlServerDatabaseSync() { + public SqlServerDatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } catch (ClassNotFoundException ex) { + throw new SQLException("No suitable driver found, can not found class com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } } @Override @@ -91,7 +101,7 @@ public List getSchemaList() throws Exception { } SourceSchema sourceSchema = new SqlServerSchema(metaData, databaseName, null, tableName, tableComment); - sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE); schemaList.add(sourceSchema); } }