Skip to content

Commit

Permalink
Merge pull request #411 from cloudsufi/bugfix/CSM-1032
Browse files Browse the repository at this point in the history
PLUGIN-1636 : PostgreSQL Sink - Case Sensitivity in Schema name
  • Loading branch information
sgarg-CS authored Sep 15, 2023
2 parents 82a8feb + 06b871f commit 67bf43b
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
if (connectionTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public String getTransactionIsolationLevel() {
public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
@Macro
@Nullable
private String dbSchemaName;

@Name(OPERATION_NAME)
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
@Macro
@Nullable
protected String operationName;

@Name(RELATION_TABLE_KEY)
@Macro
@Nullable
Expand Down Expand Up @@ -89,6 +91,10 @@ public String getEscapedTableName() {
return tableName;
}

public String getEscapedDbSchemaName() {
return dbSchemaName;
}

@Override
public boolean canConnect() {
return !containsMacro(TABLE_NAME) && getConnection().canConnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
*/
String getEscapedTableName();

/**
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
* databases with case-sensitive identifiers.
*
* @return dBSchemaName with leading and trailing escape characters appended.
* Default implementation returns unchanged table name string.
*/
String getEscapedDbSchemaName();

/**
* Validate the sink config
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void prepareRun(BatchSinkContext context) {
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
configAccessor.setOperationName(dbSinkConfig.getOperationName());
Expand Down Expand Up @@ -267,7 +267,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
List<Schema.Field> inferredFields = new ArrayList<>();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
dbSinkConfig.getJdbcPluginName());
Expand Down Expand Up @@ -318,7 +318,7 @@ private void setResultSetMetadata() throws Exception {
String connectionString = dbSinkConfig.getConnectionString();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();

driverCleanup = DBUtils
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
Expand Down Expand Up @@ -381,7 +381,7 @@ private void validateSchema(FailureCollector collector, Class<? extends Driver>
Schema inputSchema, String dbSchemaName) {
String connectionString = dbSinkConfig.getConnectionString();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
} catch (IllegalAccessException | InstantiationException | SQLException e) {
Expand Down Expand Up @@ -467,12 +467,14 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi
@Description("Name of the database schema of table.")
@Macro
@Nullable
private String dbSchemaName;
public String dbSchemaName;

@Name(OPERATION_NAME)
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
@Macro
@Nullable
protected String operationName;

@Name(RELATION_TABLE_KEY)
@Macro
@Nullable
Expand All @@ -486,6 +488,7 @@ public String getTableName() {
public String getDBSchemaName() {
return dbSchemaName;
}

@Override
public Operation getOperationName() {
return Strings.isNullOrEmpty(operationName) ? Operation.INSERT : Operation.valueOf(operationName.toUpperCase());
Expand All @@ -506,6 +509,17 @@ public String getEscapedTableName() {
return tableName;
}

/**
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
* databases with case-sensitive identifiers.
*
* @return dbschemaName with leading and trailing escape characters appended.
* Default implementation returns unchanged table name string.
*/
public String getEscapedDbSchemaName() {
return dbSchemaName;
}

public boolean canConnect() {
return (!containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT) &&
!containsMacro(ConnectionConfig.DATABASE) && !containsMacro(TABLE_NAME) && !containsMacro(USER) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
protected OracleConnectorConfig getConnection() {
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
}

}
}

0 comments on commit 67bf43b

Please sign in to comment.