Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ENG-970-Schema_creation_issue' i…
Browse files Browse the repository at this point in the history
…nto snowflake-v2.4.1-upgrade
  • Loading branch information
wrehman-skap committed Nov 19, 2024
2 parents fc3154d + 493c23e commit a29ccc3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,20 @@ public Config validate(Map<String, String> connectorConfigs) {
return result;
}

// Disabling config validation due to ENG-789/Auto create schema feature for Append SF connector
/*try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
boolean createSchemaAuto = Boolean.parseBoolean(connectorConfigs.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"));
if(!createSchemaAuto) {
try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
}
return result;
}
return result;
}*/
}

LOGGER.info("Validated config with no error");
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,11 @@ public void start(final Map<String, String> parsedConfig) {
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));

if(Boolean.parseBoolean(parsedConfig.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"))) {
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
}
this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig);

DYNAMIC_LOGGER.info(
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class Utils {
public static final String TOPICS_MAP_CONF = "topics.config.map";
public static final String SCHEMA_CHANGE_CHECK_MS = "schema.changes.check.interval.ms";
public static final String APPLY_DYNAMIC_TABLE_SCRIPT_CONF = "apply.dynamic.table.script";
public static final String CREATE_SCHEMA_AUTO = "create.schema.auto";

private static final KCLogger LOGGER = new KCLogger(Utils.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void createStage(final String stageName) {
public boolean schemaExist(final String schemaName) {
checkConnection();
InternalUtils.assertNotEmpty("schemaName", schemaName);
String query = "desc schema identifier(?)";
String query = "use schema identifier(?)";
PreparedStatement stmt = null;
boolean exist;
try {
Expand Down

0 comments on commit a29ccc3

Please sign in to comment.