Skip to content

Commit

Permalink
added dbz_date_col to testNominal
Browse files Browse the repository at this point in the history
  • Loading branch information
acristu committed Jan 30, 2024
1 parent 4e7d928 commit 217c75a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,27 @@
<dependency>
<groupId>com.streamkap</groupId>
<artifactId>streamkap-kafka-connect-common</artifactId>
<version>0.0.4</version>
<version>0.0.9</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.streamkap.kafka.connect.transforms</groupId>
<artifactId>multi-converter</artifactId>
<version>0.0.5</version>
<version>0.0.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.streamkap.kafka.connect.transforms</groupId>
<artifactId>auto-time-converters</artifactId>
<version>0.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.internal.InternalUtils;
import com.snowflake.kafka.connector.internal.SnowflakeURL;
import com.streamkap.common.test.TestUtils;
import com.streamkap.common.test.sink.StreamkapSinkITBase;
import com.streamkap.kafka.connect.transforms.AutoTimeConverters;

public class SnowflakeStreamkapSinkIT extends StreamkapSinkITBase<SnowflakeSinkTask> {
private static final String SCHEMA_NAME = "junit";

ReplaceField<SinkRecord> renameAmbigiousFields = new ReplaceField.Value<>();
AutoTimeConverters<SinkRecord> dateConv = new AutoTimeConverters<>();

public SnowflakeStreamkapSinkIT() throws Exception {
Map<String, String> config = new HashMap<>();
Expand All @@ -28,7 +31,26 @@ public SnowflakeStreamkapSinkIT() throws Exception {
"account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with");
renameAmbigiousFields.configure(config);

super.init(generateCon());
config = new HashMap<>();
config.put("dateconv.target.type", "string");
config.put("dateconv.format", "yyyy-MM-dd");
config.put("dateconv.unix.precision", "days");
config.put("schema.name", "io.debezium.time.Date");
config.put("prefix", "dateconv");
dateConv.configure(config);

super.init(new TestUtils() {

@Override
protected Connection createCon() throws SQLException {
try {
return generateCon();
} catch (Exception e) {
throw new SQLException(e);
}
}

});
}

public static Connection generateCon() throws Exception {
Expand Down Expand Up @@ -96,7 +118,7 @@ protected boolean checkTableMetadata() {

@Test
public void testNominal() throws SQLException, InterruptedException {
super.testNominal();
super.testNominalInsert(false);
}

// @Test
Expand Down Expand Up @@ -133,12 +155,12 @@ protected String getSchemaName() {
}

@Override
protected void configureIngestionMode(Map<String, String> config, boolean isUpsert) {
// config.put("databricks.ingestion.mode", isUpsert ? "upsert" : "append");
protected SinkRecord applyTransforms(SinkRecord record) {
return renameAmbigiousFields.apply(record);
}

@Override
protected SinkRecord applyTransforms(SinkRecord record) {
return renameAmbigiousFields.apply(record);
protected SinkRecord applyTransformsNominal(SinkRecord record) {
return dateConv.apply(record);
}
}

0 comments on commit 217c75a

Please sign in to comment.