From 217c75a99f772d93d28487c0293baacf6dfcd1bf Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Tue, 30 Jan 2024 17:18:52 +0200 Subject: [PATCH] added dbz_date_col to testNominal --- pom.xml | 16 +++++++-- .../connector/SnowflakeStreamkapSinkIT.java | 34 +++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 4aa32223b..ce1463973 100644 --- a/pom.xml +++ b/pom.xml @@ -501,15 +501,27 @@ com.streamkap streamkap-kafka-connect-common - 0.0.4 + 0.0.9 test-jar test com.streamkap.kafka.connect.transforms multi-converter - 0.0.5 + 0.0.7 test + + com.streamkap.kafka.connect.transforms + auto-time-converters + 0.0.4 + test + + + org.apache.kafka + connect-json + 3.3.1 + test + diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java index cbdb67708..c7218eee5 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java @@ -14,12 +14,15 @@ import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.internal.InternalUtils; import com.snowflake.kafka.connector.internal.SnowflakeURL; +import com.streamkap.common.test.TestUtils; import com.streamkap.common.test.sink.StreamkapSinkITBase; +import com.streamkap.kafka.connect.transforms.AutoTimeConverters; public class SnowflakeStreamkapSinkIT extends StreamkapSinkITBase { private static final String SCHEMA_NAME = "junit"; ReplaceField renameAmbigiousFields = new ReplaceField.Value<>(); + AutoTimeConverters dateConv = new AutoTimeConverters<>(); public SnowflakeStreamkapSinkIT() throws Exception { Map config = new HashMap<>(); @@ -28,7 +31,26 @@ public SnowflakeStreamkapSinkIT() throws Exception { "account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with"); renameAmbigiousFields.configure(config); - super.init(generateCon()); + config = new HashMap<>(); + config.put("dateconv.target.type", "string"); + config.put("dateconv.format", "yyyy-MM-dd"); + config.put("dateconv.unix.precision", "days"); + config.put("schema.name", "io.debezium.time.Date"); + config.put("prefix", "dateconv"); + dateConv.configure(config); + + super.init(new TestUtils() { + + @Override + protected Connection createCon() throws SQLException { + try { + return generateCon(); + } catch (Exception e) { + throw new SQLException(e); + } + } + + }); } public static Connection generateCon() throws Exception { @@ -96,7 +118,7 @@ protected boolean checkTableMetadata() { @Test public void testNominal() throws SQLException, InterruptedException { - super.testNominal(); + super.testNominalInsert(false); } // @Test @@ -133,12 +155,12 @@ protected String getSchemaName() { } @Override - protected void configureIngestionMode(Map config, boolean isUpsert) { - // config.put("databricks.ingestion.mode", isUpsert ? "upsert" : "append"); + protected SinkRecord applyTransforms(SinkRecord record) { + return renameAmbigiousFields.apply(record); } @Override - protected SinkRecord applyTransforms(SinkRecord record) { - return renameAmbigiousFields.apply(record); + protected SinkRecord applyTransformsNominal(SinkRecord record) { + return dateConv.apply(record); } }