Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eng 789 auto create schema feature #25

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public Config validate(Map<String, String> connectorConfigs) {
return result;
}

try {
// Disabling config validation due to ENG-789/Auto create schema feature for Append SF connector
/*try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
Expand All @@ -304,7 +305,7 @@ public Config validate(Map<String, String> connectorConfigs) {
throw e;
}
return result;
}
}*/

LOGGER.info("Validated config with no error");
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ public void start(final Map<String, String> parsedConfig) {
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();

createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig);

DYNAMIC_LOGGER.info(
Expand All @@ -238,6 +239,12 @@ public void start(final Map<String, String> parsedConfig) {
getDurationFromStartMs(this.taskStartTime));
}

private void createSchemaIfNotExists(SnowflakeConnectionService con, String schemaName){
if(!con.schemaExist(schemaName)) {
con.createSchema(schemaName);
}
}

/**
* stop method is invoked only once outstanding calls to other methods have completed. e.g. after
* current put, and a final preCommit has completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ static int resultSize(ResultSet resultSet) throws SQLException {
static void assertNotEmpty(String name, Object value) {
if (value == null || (value instanceof String && value.toString().isEmpty())) {
switch (name.toLowerCase()) {
case "schemaname":
throw SnowflakeErrors.ERROR_0031.getException();
case "tablename":
throw SnowflakeErrors.ERROR_0005.getException();
case "stagename":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
import java.util.Map;

public interface SnowflakeConnectionService {
/**
* create schema is not exists
*
* @param schemaName schema name
*/
void createSchema(String schemaName);

/**
* Create a table with two variant columns: RECORD_METADATA and RECORD_CONTENT
*
Expand Down Expand Up @@ -59,6 +66,14 @@ public interface SnowflakeConnectionService {
*/
void createStage(String stageName);

/**
* check schema existence
*
* @param schemaName table name
* @return true if schema exists, false otherwise
*/
boolean schemaExist(String schemaName);

/**
* check table existence
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ private static Properties mergeProxyAndConnectionProperties(
return mergedProperties;
}

@Override
public void createSchema(final String schemaName) {
checkConnection();
InternalUtils.assertNotEmpty("schemaName", schemaName);
String query;
query = "create schema if not exists identifier(?) comment = 'created by"
+ " automatic schema creation from Snowflake Kafka Connector'";
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, schemaName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2018.getException(e);
}

LOGGER.info("create schema {}", schemaName);
}

@Override
public void createTable(final String tableName, final boolean overwrite) {
checkConnection();
Expand Down Expand Up @@ -237,6 +256,33 @@ public void createStage(final String stageName) {
createStage(stageName, false);
}

@Override
public boolean schemaExist(final String schemaName) {
checkConnection();
InternalUtils.assertNotEmpty("schemaName", schemaName);
String query = "desc schema identifier(?)";
PreparedStatement stmt = null;
boolean exist;
try {
stmt = conn.prepareStatement(query);
stmt.setString(1, schemaName);
stmt.execute();
exist = true;
} catch (Exception e) {
LOGGER.debug("schema {} doesn't exist", schemaName);
exist = false;
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return exist;
}

@Override
public boolean tableExist(final String tableName) {
checkConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public enum SnowflakeErrors {
String.format(
"Failed to parse %s map",
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)),

ERROR_0031("0031", "Empty Schema name", "Input Schema name is empty string or null"),
// Snowflake connection issues 1---
ERROR_1001(
"1001",
Expand Down Expand Up @@ -205,6 +205,10 @@ public enum SnowflakeErrors {
"2017",
"Failed to check schema evolution permission",
"Failed to check schema evolution permission"),
ERROR_2018(
"2018",
"Failed to create schema",
"Failed to create schema on Snowflake, please check that you have permission to do so."),
// Snowpipe related issues 3---
ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {

// Whether schematization has been enabled.
private final boolean enableSchematization;
private final boolean autoSchematization;

// Whether schema evolution could be done on this channel
private final boolean enableSchemaEvolution;
Expand Down Expand Up @@ -232,8 +233,13 @@ public DirectTopicPartitionChannel(
/* Schematization related properties */
this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig);

this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission;
this.autoSchematization =
this.recordService.setAndGetAutoSchematizationFromConfig(sfConnectorConfig);
//this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission;
this.enableSchemaEvolution =
this.enableSchematization
&& this.conn != null
&& (!autoSchematization || hasSchemaEvolutionPermission);

if (isEnableChannelOffsetMigration(sfConnectorConfig)) {
/* Channel Name format V2 is computed from connector name, topic and partition */
Expand Down
Loading