diff --git a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java index 90681f715..12507cbdb 100644 --- a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java +++ b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSink.java @@ -101,6 +101,11 @@ public String getEscapedTableName() { return ESCAPE_CHAR + tableName + ESCAPE_CHAR; } + @Override + public String getEscapedDbSchemaName() { + return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR; + } + @Override public Map getDBSpecificArguments() { if (connectionTimeout != null) { diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java index d74358f54..06d054533 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java @@ -182,6 +182,11 @@ public String getTransactionIsolationLevel() { public String getEscapedTableName() { return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR; } + + @Override + public String getEscapedDbSchemaName() { + return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR; + } @Override public Map getDBSpecificArguments() { diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java index 9766a20d7..5b92a85f7 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java @@ -56,11 +56,13 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen @Macro @Nullable private String dbSchemaName; + @Name(OPERATION_NAME) @Description("Operation for the query to perform. By default, the query performs INSERT operation") @Macro @Nullable protected String operationName; + @Name(RELATION_TABLE_KEY) @Macro @Nullable @@ -89,6 +91,10 @@ public String getEscapedTableName() { return tableName; } + public String getEscapedDbSchemaName() { + return dbSchemaName; + } + @Override public boolean canConnect() { return !containsMacro(TABLE_NAME) && getConnection().canConnect(); diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSinkConfig.java index 81aa68992..01432fcef 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSinkConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSinkConfig.java @@ -65,6 +65,15 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig { */ String getEscapedTableName(); + /** + * Adds escape characters (back quotes, double quotes, etc.) to the database schema name for + * databases with case-sensitive identifiers. + * + * @return dBSchemaName with leading and trailing escape characters appended. + * Default implementation returns unchanged table name string. + */ + String getEscapedDbSchemaName(); + /** * Validate the sink config * diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java index cf112d9c0..2deac8ce4 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java @@ -204,7 +204,7 @@ public void prepareRun(BatchSinkContext context) { configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName()); configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString); String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() - : dbSchemaName + "." + dbSinkConfig.getEscapedTableName(); + : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName); configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns); configAccessor.setOperationName(dbSinkConfig.getOperationName()); @@ -267,7 +267,7 @@ private Schema inferSchema(Class driverClass) { List inferredFields = new ArrayList<>(); String dbSchemaName = dbSinkConfig.getDBSchemaName(); String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() - : dbSchemaName + "." + dbSinkConfig.getEscapedTableName(); + : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); try { DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(), dbSinkConfig.getJdbcPluginName()); @@ -318,7 +318,7 @@ private void setResultSetMetadata() throws Exception { String connectionString = dbSinkConfig.getConnectionString(); String dbSchemaName = dbSinkConfig.getDBSchemaName(); String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() - : dbSchemaName + "." + dbSinkConfig.getEscapedTableName(); + : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); driverCleanup = DBUtils .ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName()); @@ -381,7 +381,7 @@ private void validateSchema(FailureCollector collector, Class Schema inputSchema, String dbSchemaName) { String connectionString = dbSinkConfig.getConnectionString(); String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() - : dbSchemaName + "." + dbSinkConfig.getEscapedTableName(); + : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); try { DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName()); } catch (IllegalAccessException | InstantiationException | SQLException e) { @@ -467,12 +467,14 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi @Description("Name of the database schema of table.") @Macro @Nullable - private String dbSchemaName; + public String dbSchemaName; + @Name(OPERATION_NAME) @Description("Operation for the query to perform. By default, the query performs INSERT operation") @Macro @Nullable protected String operationName; + @Name(RELATION_TABLE_KEY) @Macro @Nullable @@ -486,6 +488,7 @@ public String getTableName() { public String getDBSchemaName() { return dbSchemaName; } + @Override public Operation getOperationName() { return Strings.isNullOrEmpty(operationName) ? Operation.INSERT : Operation.valueOf(operationName.toUpperCase()); @@ -506,6 +509,17 @@ public String getEscapedTableName() { return tableName; } + /** + * Adds escape characters (back quotes, double quotes, etc.) to the database schema name for + * databases with case-sensitive identifiers. + * + * @return dbschemaName with leading and trailing escape characters appended. + * Default implementation returns unchanged table name string. + */ + public String getEscapedDbSchemaName() { + return dbSchemaName; + } + public boolean canConnect() { return (!containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT) && !containsMacro(ConnectionConfig.DATABASE) && !containsMacro(TABLE_NAME) && !containsMacro(USER) && diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 4fdd2a54c..45afbcb51 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -127,6 +127,11 @@ public String getEscapedTableName() { return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR; } + @Override + public String getEscapedDbSchemaName() { + return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR; + } + @Override protected OracleConnectorConfig getConnection() { return connection; diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java index 9a791db39..8fd91cc63 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSink.java @@ -155,6 +155,11 @@ public String getEscapedTableName() { return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR; } + @Override + public String getEscapedDbSchemaName() { + return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR; + } + @Override public Map getDBSpecificArguments() { return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout)); diff --git a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaSink.java b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaSink.java index badd97bf4..dde266031 100644 --- a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaSink.java +++ b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaSink.java @@ -92,5 +92,10 @@ public String getEscapedTableName() { return ESCAPE_CHAR + tableName + ESCAPE_CHAR; } + @Override + public String getEscapedDbSchemaName() { + return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR; + } + } }