Skip to content

Commit

Permalink
ref: ENG-789/Auto create schema feature for Append SF connector
Browse files Browse the repository at this point in the history
  • Loading branch information
wrehman-skap committed Sep 19, 2024
1 parent acfdc1e commit 7539d5e
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ public void start(final Map<String, String> parsedConfig) {
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();

createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig);

DYNAMIC_LOGGER.info(
Expand All @@ -238,6 +239,12 @@ public void start(final Map<String, String> parsedConfig) {
getDurationFromStartMs(this.taskStartTime));
}

private void createSchemaIfNotExists(SnowflakeConnectionService con, String schemaName){
if(!con.schemaExist(schemaName)) {
con.createSchema(schemaName);
}
}

/**
* stop method is invoked only once outstanding calls to other methods have completed. e.g. after
* current put, and a final preCommit has completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ static int resultSize(ResultSet resultSet) throws SQLException {
static void assertNotEmpty(String name, Object value) {
if (value == null || (value instanceof String && value.toString().isEmpty())) {
switch (name.toLowerCase()) {
case "schemaname":
throw SnowflakeErrors.ERROR_0031.getException();
case "tablename":
throw SnowflakeErrors.ERROR_0005.getException();
case "stagename":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
import java.util.Map;

public interface SnowflakeConnectionService {
/**
* create schema is not exists
*
* @param schemaName schema name
*/
void createSchema(String schemaName);

/**
* Create a table with two variant columns: RECORD_METADATA and RECORD_CONTENT
*
Expand Down Expand Up @@ -59,6 +66,14 @@ public interface SnowflakeConnectionService {
*/
void createStage(String stageName);

/**
* check schema existence
*
* @param schemaName table name
* @return true if schema exists, false otherwise
*/
boolean schemaExist(String schemaName);

/**
* check table existence
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ private static Properties mergeProxyAndConnectionProperties(
return mergedProperties;
}

@Override
public void createSchema(final String schemaName) {
checkConnection();
InternalUtils.assertNotEmpty("schemaName", schemaName);
String query;
query = "create schema if not exists identifier(?) comment = 'created by"
+ " automatic schema creation from Snowflake Kafka Connector'";
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, schemaName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2018.getException(e);
}

LOGGER.info("create schema {}", schemaName);
}

@Override
public void createTable(final String tableName, final boolean overwrite) {
checkConnection();
Expand Down Expand Up @@ -237,6 +256,33 @@ public void createStage(final String stageName) {
createStage(stageName, false);
}

@Override
public boolean schemaExist(final String schemaName) {
checkConnection();
InternalUtils.assertNotEmpty("schemaName", schemaName);
String query = "desc schema identifier(?)";
PreparedStatement stmt = null;
boolean exist;
try {
stmt = conn.prepareStatement(query);
stmt.setString(1, schemaName);
stmt.execute();
exist = true;
} catch (Exception e) {
LOGGER.debug("schema {} doesn't exist", schemaName);
exist = false;
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return exist;
}

@Override
public boolean tableExist(final String tableName) {
checkConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public enum SnowflakeErrors {
String.format(
"Failed to parse %s map",
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)),

ERROR_0031("0031", "Empty Schema name", "Input Schema name is empty string or null"),
// Snowflake connection issues 1---
ERROR_1001(
"1001",
Expand Down Expand Up @@ -205,6 +205,10 @@ public enum SnowflakeErrors {
"2017",
"Failed to check schema evolution permission",
"Failed to check schema evolution permission"),
ERROR_2018(
"2018",
"Failed to create schema",
"Failed to create schema on Snowflake, please check that you have permission to do so."),
// Snowpipe related issues 3---
ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"),

Expand Down

0 comments on commit 7539d5e

Please sign in to comment.