From 7d30100ba993cd7c5cd0eada5017d651b6460cec Mon Sep 17 00:00:00 2001 From: Julius Vitkauskas Date: Thu, 21 Mar 2019 11:28:10 +0200 Subject: [PATCH] Backport upsert mode for Vertica to 5.0.0 --- .../connect/jdbc/dialect/DatabaseDialect.java | 9 +- .../jdbc/dialect/Db2DatabaseDialect.java | 9 +- .../jdbc/dialect/DerbyDatabaseDialect.java | 9 +- .../jdbc/dialect/GenericDatabaseDialect.java | 8 +- .../jdbc/dialect/MySqlDatabaseDialect.java | 9 +- .../jdbc/dialect/OracleDatabaseDialect.java | 9 +- .../dialect/PostgreSqlDatabaseDialect.java | 9 +- .../jdbc/dialect/SapHanaDatabaseDialect.java | 9 +- .../dialect/SqlServerDatabaseDialect.java | 9 +- .../jdbc/dialect/SqliteDatabaseDialect.java | 9 +- .../jdbc/dialect/SybaseDatabaseDialect.java | 9 +- .../jdbc/dialect/VerticaDatabaseDialect.java | 82 ++++++++++++++++++- .../connect/jdbc/sink/BufferedRecords.java | 3 +- .../jdbc/dialect/Db2DatabaseDialectTest.java | 9 +- .../dialect/DerbyDatabaseDialectTest.java | 9 +- .../dialect/GenericDatabaseDialectTest.java | 2 +- .../dialect/MySqlDatabaseDialectTest.java | 7 +- .../dialect/OracleDatabaseDialectTest.java | 5 +- .../PostgreSqlDatabaseDialectTest.java | 5 +- .../dialect/SapHanaDatabaseDialectTest.java | 5 +- .../dialect/SqlServerDatabaseDialectTest.java | 7 +- .../dialect/SqliteDatabaseDialectTest.java | 5 +- .../dialect/SybaseDatabaseDialectTest.java | 7 +- .../dialect/VerticaDatabaseDialectTest.java | 28 ++++++- 24 files changed, 196 insertions(+), 76 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java index 2df33acee6..4a0f0823e7 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java @@ -366,14 +366,15 @@ String buildDeleteStatement( * but may be empty * @param nonKeyColumns the identifiers of the other columns in the table; may not be null but may * be empty + * @param allFields all table fields; used to get type information for upserts * @return the upsert/merge statement; may not be null * @throws UnsupportedOperationException if the dialect does not support upserts */ String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ); + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields); /** * Build the DROP TABLE statement expression for the given table. diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java index a81ef7b16c..5fc526eccd 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // http://lpar.ath0.com/2013/08/12/upsert-in-db2/ final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java index 6f31f31e8f..35d61462cd 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // http://lpar.ath0.com/2013/08/12/upsert-in-db2/ final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 7a72b4c5b1..d98436a03e 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -1391,10 +1391,10 @@ public String buildUpdateStatement( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java index 4c0a0212a7..e0765f7d61 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -125,10 +126,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { //MySql doesn't support SQL 2003:merge so here how the upsert is handled final Transform transform = (builder, col) -> { builder.appendIdentifierQuoted(col.name()); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java index cc6067aea4..006345d251 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -161,10 +162,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // https://blogs.oracle.com/cmar/entry/using_merge_to_do_an final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index 7d9c4a0169..ac0ab89997 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -30,6 +30,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -219,10 +220,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { final Transform transform = (builder, col) -> { builder.appendIdentifierQuoted(col.name()) .append("=EXCLUDED.") diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java index a95b2d07e2..a75bd91029 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -134,10 +135,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // https://help.sap.com/hana_one/html/sql_replace_upsert.html ExpressionBuilder builder = expressionBuilder(); builder.append("UPSERT "); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java index 5af92a5b29..b1879e2e6c 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -144,10 +145,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("merge into "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java index 7134568cf9..304552e8e7 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -114,10 +115,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("INSERT OR REPLACE INTO "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java index 1b4cb4f94f..2fd65e20de 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; @@ -242,10 +243,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("merge into "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java index f2faf6515b..2dd846a62e 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java @@ -16,6 +16,8 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -26,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -61,10 +64,17 @@ public VerticaDatabaseDialect(AbstractConfig config) { @Override protected String getSqlType(SinkRecordField field) { + return getSqlType(field, true); + } + + private String getSqlType(SinkRecordField field, boolean sized) { if (field.schemaName() != null) { switch (field.schemaName()) { case Decimal.LOGICAL_NAME: - return "DECIMAL(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")"; + return "DECIMAL" + ( + sized ? "(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")" + : "" + ); case Date.LOGICAL_NAME: return "DATE"; case Time.LOGICAL_NAME: @@ -92,9 +102,9 @@ protected String getSqlType(SinkRecordField field) { case BOOLEAN: return "BOOLEAN"; case STRING: - return "VARCHAR(1024)"; + return "VARCHAR" + (sized ? "(1024)" : ""); case BYTES: - return "VARBINARY(1024)"; + return "VARBINARY" + (sized ? "(1024)" : ""); default: return super.getSqlType(field); } @@ -111,4 +121,70 @@ public List buildAlterTable( } return queries; } + + @Override + public String buildUpsertQueryStatement( + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("MERGE INTO "); + builder.append(table); + builder.append(" AS target USING (SELECT "); + builder.appendList() + .delimitedBy(", ") + .transformedBy((b, input) -> transformTypedParam(b, (ColumnId) input, allFields)) + .of(keyColumns, nonKeyColumns); + builder.append(") AS incoming ON ("); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(this::transformAs) + .of(keyColumns); + builder.append(")"); + builder.append(" WHEN MATCHED THEN UPDATE SET "); + builder.appendList() + .delimitedBy(",") + .transformedBy(this::transformUpdate) + .of(nonKeyColumns, keyColumns); + builder.append(" WHEN NOT MATCHED THEN INSERT ("); + builder.appendList() + .delimitedBy(", ") + .transformedBy(ExpressionBuilder.columnNamesWithPrefix("")) + .of(nonKeyColumns, keyColumns); + builder.append(") VALUES ("); + builder.appendList() + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNamesWithPrefix("incoming.")) + .of(nonKeyColumns, keyColumns); + builder.append(");"); + return builder.toString(); + } + + private void transformAs(ExpressionBuilder builder, ColumnId col) { + builder.append("target.") + .appendIdentifierQuoted(col.name()) + .append("=incoming.") + .appendIdentifierQuoted(col.name()); + } + + private void transformUpdate(ExpressionBuilder builder, ColumnId col) { + builder.appendIdentifierQuoted(col.name()) + .append("=incoming.") + .appendIdentifierQuoted(col.name()); + } + + private void transformTypedParam( + ExpressionBuilder builder, + ColumnId col, + Map allFields + ) { + SinkRecordField field = allFields.get(col.name()); + + builder.append("?::") + .append(getSqlType(field, false)) + .append(" AS ") + .appendIdentifierQuoted(col.name()); + } } diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index 14e5472074..30447c83e1 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -257,7 +257,8 @@ private String getInsertSql() { return dbDialect.buildUpsertQueryStatement( tableId, asColumns(fieldsMetadata.keyFieldNames), - asColumns(fieldsMetadata.nonKeyFieldNames) + asColumns(fieldsMetadata.nonKeyFieldNames), + fieldsMetadata.allFields ); } catch (UnsupportedOperationException e) { throw new ConnectException(String.format( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java index 94a62148de..9484083b08 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -184,7 +185,7 @@ public void shouldBuildUpsertStatement() { + "values(DAT.\"columnA\",DAT.\"columnB\",DAT.\"columnC\",DAT.\"columnD\"," + "DAT.\"id1\"," + "DAT.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -202,8 +203,8 @@ public void upsert() { String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", "score" - ) - ); + ), + Collections.emptyMap()); assertEquals(expected, sql); } @@ -214,7 +215,7 @@ public void upsertOnlyKeyCols() { + ".\"actor_id\"=DAT.\"actor_id\" when not matched then insert(\"actor\"" + ".\"actor_id\") values(DAT.\"actor_id\")"; String sql = dialect.buildUpsertQueryStatement( - actor, columns(actor, "actor_id"), columns(actor)); + actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); assertEquals(expected, sql); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java index b616c10b4c..cc70f69776 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java @@ -23,6 +23,7 @@ import org.junit.Ignore; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -186,7 +187,7 @@ public void shouldBuildUpsertStatement() { + ".\"id2\") " + "values(DAT.\"columnA\",DAT.\"columnB\",DAT.\"columnC\",DAT.\"columnD\",DAT.\"id1\"," + "DAT.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -205,8 +206,8 @@ public void upsert() { String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", "score" - ) - ); + ), + Collections.emptyMap()); assertEquals(expected, sql); } @@ -217,7 +218,7 @@ public void upsertOnlyKeyCols() { + ".\"actor_id\"=DAT.\"actor_id\" when not matched then insert(\"actor\"" + ".\"actor_id\") values(DAT.\"actor_id\")"; String sql = dialect.buildUpsertQueryStatement( - actor, columns(actor, "actor_id"), columns(actor)); + actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); assertEquals(expected, sql); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java index fc4a8d48ef..bd8ae2ac43 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java @@ -319,7 +319,7 @@ public void shouldBuildAlterTableStatement() { @Test(expected = UnsupportedOperationException.class) public void shouldBuildUpsertStatement() { - dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java index 1817d58375..788356f310 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -119,7 +120,7 @@ public void shouldBuildUpsertStatement() { " values(?,?,?,?,?,?) on duplicate key update `columnA`=values(`columnA`)," + "`columnB`=values(`columnB`),`columnC`=values(`columnC`),`columnD`=values" + "(`columnD`)"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -164,7 +165,7 @@ public void upsert() { "`last_name`=values(`last_name`),`score`=values(`score`)"; String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", - "score")); + "score"), Collections.emptyMap()); assertEquals(expected, sql); } @@ -174,7 +175,7 @@ public void upsertOnlyKeyCols() { String expected = "insert into `actor`(`actor_id`) " + "values(?) on duplicate key update `actor_id`=values(`actor_id`)"; String sql = dialect - .buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor)); + .buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); assertEquals(expected, sql); } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java index 14c8d9e4e1..8ae6b62184 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -126,7 +127,7 @@ public void shouldBuildUpsertStatement() { "\"myTable\".\"id1\",\"myTable\".\"id2\") values(incoming.\"columnA\"," + "incoming.\"columnB\",incoming.\"columnC\",incoming.\"columnD\",incoming" + ".\"id1\",incoming.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -178,7 +179,7 @@ public void upsert() { "\"ARTICLE\".\"author\") " + "values(incoming.\"body\",incoming.\"title\",incoming.\"author\")"; String actual = dialect.buildUpsertQueryStatement(article, columns(article, "title", "author"), - columns(article, "body")); + columns(article, "body"), Collections.emptyMap()); assertEquals(expected, actual); } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 19af3e1cf9..c008e590b4 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -119,7 +120,7 @@ public void shouldBuildUpsertStatement() { "\"id2\") DO UPDATE SET \"columnA\"=EXCLUDED" + ".\"columnA\",\"columnB\"=EXCLUDED.\"columnB\",\"columnC\"=EXCLUDED" + ".\"columnC\",\"columnD\"=EXCLUDED.\"columnD\""; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -163,7 +164,7 @@ public void upsert() { "VALUES (?,?,?,?) ON CONFLICT (\"id\") DO UPDATE SET \"name\"=EXCLUDED.\"name\"," + "\"salary\"=EXCLUDED.\"salary\",\"address\"=EXCLUDED.\"address\"", dialect .buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java index c9213b6caf..2ff3874c81 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -120,7 +121,7 @@ public void shouldBuildAlterTableStatement() { public void shouldBuildUpsertStatement() { String expected = "UPSERT \"myTable\"(\"id1\",\"id2\",\"columnA\",\"columnB\",\"columnC\"," + "\"columnD\") VALUES(?,?,?,?,?,?) WITH PRIMARY KEY"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -165,7 +166,7 @@ public void upsert() { assertEquals( "UPSERT \"tableA\"(\"col1\",\"col2\",\"col3\",\"col4\") VALUES(?,?,?,?) WITH PRIMARY KEY", dialect.buildUpsertQueryStatement(tableA, columns(tableA, "col1"), - columns(tableA, "col2", "col3", "col4"))); + columns(tableA, "col2", "col3", "col4"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java index 62d94df8ec..8a54a78bf0 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -124,7 +125,7 @@ public void shouldBuildUpsertStatement() { "[columnB], [columnC], [columnD], [id1], [id2]) values (incoming.[columnA]," + "incoming.[columnB],incoming.[columnC],incoming.[columnD],incoming.[id1]," + "incoming.[id2]);"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -173,7 +174,7 @@ public void upsert1() { "([name], [salary], [address], [id]) values (incoming.[name],incoming" + ".[salary],incoming.[address],incoming.[id]);", dialect.buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } @Test @@ -188,6 +189,6 @@ public void upsert2() { "matched then insert ([ISBN], [year], [pages], [author], [title]) values (incoming" + ".[ISBN],incoming.[year]," + "incoming.[pages],incoming.[author],incoming.[title]);", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java index 610becbd18..15b2cf5ab9 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.sql.Types; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.sink.SqliteHelper; @@ -140,7 +141,7 @@ public void shouldBuildAlterTableStatement() { public void shouldBuildUpsertStatement() { String expected = "INSERT OR REPLACE INTO `myTable`(`id1`,`id2`,`columnA`,`columnB`," + "`columnC`,`columnD`) VALUES(?,?,?,?,?,?)"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -182,7 +183,7 @@ public void upsert() { assertEquals( "INSERT OR REPLACE INTO `Book`(`author`,`title`,`ISBN`,`year`,`pages`) VALUES(?,?,?,?,?)", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } @Test(expected = SQLException.class) diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java index d11376d115..f458b6b91b 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -154,7 +155,7 @@ public void shouldBuildUpsertStatement() { "\"columnB\", \"columnC\", \"columnD\", \"id1\", \"id2\") values (incoming.\"columnA\"," + "incoming.\"columnB\",incoming.\"columnC\",incoming.\"columnD\",incoming.\"id1\"," + "incoming.\"id2\");"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -203,7 +204,7 @@ public void upsert1() { "(\"name\", \"salary\", \"address\", \"id\") values (incoming.\"name\",incoming" + ".\"salary\",incoming.\"address\",incoming.\"id\");", dialect.buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } @Test @@ -218,7 +219,7 @@ public void upsert2() { "matched then insert (\"ISBN\", \"year\", \"pages\", \"author\", \"title\") values (incoming" + ".\"ISBN\",incoming.\"year\"," + "incoming.\"pages\",incoming.\"author\",incoming.\"title\");", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } @Test diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java index 00df56ee19..2842b3875c 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java @@ -14,6 +14,8 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import io.confluent.connect.jdbc.util.TableId; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -22,6 +24,8 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.junit.Assert.assertEquals; @@ -114,9 +118,29 @@ public void shouldBuildAlterTableStatement() { assertStatements(sql, statements); } - @Test(expected = UnsupportedOperationException.class) + @Test public void shouldBuildUpsertStatement() { - dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + TableId book = tableId("Book"); + HashMap fields = new HashMap<>(); + fields.put("author", new SinkRecordField(Schema.STRING_SCHEMA, "author", true)); + fields.put("title", new SinkRecordField(Schema.STRING_SCHEMA, "title", true)); + fields.put("ISBN", new SinkRecordField(Schema.STRING_SCHEMA, "ISBN", false)); + fields.put("year", new SinkRecordField(Schema.INT32_SCHEMA, "year", false)); + fields.put("pages", new SinkRecordField(Schema.INT32_SCHEMA, "pages", false)); + assertEquals( + "MERGE INTO \"Book\" AS target USING (" + + "SELECT ?::VARCHAR AS \"author\", ?::VARCHAR AS \"title\", ?::VARCHAR AS \"ISBN\", ?::INT AS \"year\", ?::INT AS \"pages\"" + + ") AS incoming ON (target.\"author\"=incoming.\"author\" AND target.\"title\"=incoming.\"title\")" + + " WHEN MATCHED THEN UPDATE SET \"ISBN\"=incoming.\"ISBN\",\"year\"=incoming.\"year\",\"pages\"=incoming.\"pages\"," + + "\"author\"=incoming.\"author\",\"title\"=incoming.\"title\"" + + " WHEN NOT MATCHED THEN INSERT (\"ISBN\", \"year\", \"pages\", \"author\", \"title\") VALUES (" + + "incoming.\"ISBN\",incoming.\"year\",incoming.\"pages\",incoming.\"author\",incoming.\"title\");", + dialect.buildUpsertQueryStatement( + book, + columns(book, "author", "title"), + columns(book, "ISBN", "year", "pages"), + fields) + ); }