diff --git a/pom.xml b/pom.xml index a7fb9318c..4dfdd05c9 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. ~--> - 4.0.0 @@ -23,10 +23,10 @@ io.confluent common - 5.0.0 + 5.2.1 - io.confluent + io.confluent.adform kafka-connect-jdbc jar kafka-connect-jdbc @@ -36,7 +36,7 @@ http://confluent.io - A Kafka Connect JDBC connector for copying data between databases and Kafka. + A Kafka Connect JDBC connector for copying data between databases and Kafka. @@ -83,7 +83,7 @@ provided - + org.xerial sqlite-jdbc @@ -156,7 +156,7 @@ -Xlint:all - -Werror + @@ -171,18 +171,32 @@ Kafka Connect JDBC - https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html + https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html + - The JDBC source connector allows you to import data from any relational database with a JDBC driver into Kafka topics. By using JDBC, this connector can support a wide variety of databases without requiring custom code for each one. + The JDBC source connector allows you to import data from any relational database with a + JDBC driver into Kafka topics. By using JDBC, this connector can support a wide variety + of databases without requiring custom code for each one. - Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. By default, all tables in a database are copied, each to its own output topic. The database is monitored for new or deleted tables and adapts automatically. When copying data from a table, the connector can load only new or modified rows by specifying which columns should be used to detect new or modified data. + Data is loaded by periodically executing a SQL query and creating an output record for + each row in the result set. By default, all tables in a database are copied, each to its + own output topic. The database is monitored for new or deleted tables and adapts + automatically. When copying data from a table, the connector can load only new or + modified rows by specifying which columns should be used to detect new or modified data. - The JDBC sink connector allows you to export data from Kafka topics to any relational database with a JDBC driver. By using JDBC, this connector can support a wide variety of databases without requiring a dedicated connector for each one. The connector polls data from Kafka to write to the database based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables, and limited auto-evolution is also supported. + The JDBC sink connector allows you to export data from Kafka topics to any relational + database with a JDBC driver. By using JDBC, this connector can support a wide variety of + databases without requiring a dedicated connector for each one. The connector polls data + from Kafka to write to the database based on the topics subscription. It is possible to + achieve idempotent writes with upserts. Auto-creation of tables, and limited + auto-evolution is also supported. logos/jdbc.jpg Confluent, Inc. - Confluent supports the JDBC sink and source connectors alongside community members as part of its Confluent Platform open source offering. + Confluent supports the JDBC sink and source connectors alongside community + members as part of its Confluent Platform open source offering. + https://docs.confluent.io/current/ logos/confluent.png @@ -248,7 +262,7 @@ validate - validate + none checkstyle/suppressions.xml @@ -271,19 +285,19 @@ rpm - - - org.codehaus.mojo - rpm-maven-plugin - 2.1.5 - - - generate-rpm - - rpm - - - + + + org.codehaus.mojo + rpm-maven-plugin + 2.1.5 + + + generate-rpm + + rpm + + + Applications/Internet Confluent Packaging @@ -298,7 +312,8 @@ /usr/share/java/${project.artifactId} - ${project.package.home}/share/java/${project.artifactId} + ${project.package.home}/share/java/${project.artifactId} + @@ -356,12 +371,20 @@ io.confluent.licenses.LicenseFinder - -i ${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-jdbc + -i + ${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-jdbc + -o ${project.basedir}/licenses -f - -h ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses.html - -l ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses - -n ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/notices + -h + ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses.html + + -l + ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses + + -n + ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/notices + -t ${project.name} -x licenses-${licenses.version}.jar @@ -391,8 +414,8 @@ - - licenses-source + + licenses-source @@ -406,7 +429,9 @@ io.confluent.licenses.LicenseFinder - -i ${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-jdbc + -i + ${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-jdbc + -o ${project.basedir}/licenses -f -h ${project.basedir}/licenses.html diff --git a/src/main/java/io/confluent/connect/jdbc/VerticaSinkConnector.java b/src/main/java/io/confluent/connect/jdbc/VerticaSinkConnector.java new file mode 100644 index 000000000..099529bbb --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/VerticaSinkConnector.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.jdbc; + +import io.confluent.connect.jdbc.sink.JdbcSinkConfig; +import io.confluent.connect.jdbc.sink.VerticaSinkTask; +import io.confluent.connect.jdbc.util.Version; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Cannot inherit from JdbcSinkConnector as it is final + * We only need to override {@link #taskClass()} to return our VerticaSinkTask + */ +public class VerticaSinkConnector extends SinkConnector { + + private static final Logger log = LoggerFactory.getLogger(VerticaSinkConnector.class); + + private Map configProps; + + public Class taskClass() { + return VerticaSinkTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + log.info("Setting task configurations for {} workers.", maxTasks); + final List> configs = new ArrayList<>(maxTasks); + for (int i = 0; i < maxTasks; ++i) { + configs.add(configProps); + } + return configs; + } + + @Override + public void start(Map props) { + configProps = props; + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return JdbcSinkConfig.CONFIG_DEF; + } + + @Override + public Config validate(Map connectorConfigs) { + // TODO cross-fields validation here: pkFields against the pkMode + return super.validate(connectorConfigs); + } + + @Override + public String version() { + return Version.getVersion(); + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java index 7a32c9bd7..54af12def 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java @@ -1,17 +1,16 @@ /* * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.dialect; @@ -358,9 +357,27 @@ String buildUpdateStatement( String buildUpsertQueryStatement( TableId table, Collection keyColumns, - Collection nonKeyColumns + Collection nonKeyColumns, + Map allFields ); + /** + * Build the DELETE prepared statement expression for the given table and its columns. Variables + * for each key column should also appear in the WHERE clause of the statement. + * + * @param table the identifier of the table; may not be null + * @param keyColumns the identifiers of the columns in the primary/unique key; may not be null + * but may be empty + * @return the delete statement; may not be null + * @throws UnsupportedOperationException if the dialect does not support deletes + */ + default String buildDeleteStatement( + TableId table, + Collection keyColumns + ) { + throw new UnsupportedOperationException(); + } + /** * Build the DROP TABLE statement expression for the given table. * diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java index a81ef7b16..5fc526ecc 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialect.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // http://lpar.ath0.com/2013/08/12/upsert-in-db2/ final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java index 6f31f31e8..35d61462c 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialect.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // http://lpar.ath0.com/2013/08/12/upsert-in-db2/ final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index af5440de3..2c89931ce 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -16,6 +16,30 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.FixedScoreProvider; +import io.confluent.connect.jdbc.sink.JdbcSinkConfig; +import io.confluent.connect.jdbc.sink.JdbcSinkConfig.InsertMode; +import io.confluent.connect.jdbc.sink.JdbcSinkConfig.PrimaryKeyMode; +import io.confluent.connect.jdbc.sink.PreparedStatementBinder; +import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; +import io.confluent.connect.jdbc.sink.metadata.SchemaPair; +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import io.confluent.connect.jdbc.source.ColumnMapping; +import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig; +import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.NumericMapping; +import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig; +import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria; +import io.confluent.connect.jdbc.util.ColumnDefinition; +import io.confluent.connect.jdbc.util.ColumnDefinition.Mutability; +import io.confluent.connect.jdbc.util.ColumnDefinition.Nullability; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.DateTimeUtils; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; +import io.confluent.connect.jdbc.util.IdentifierRules; +import io.confluent.connect.jdbc.util.JdbcDriverInfo; +import io.confluent.connect.jdbc.util.TableDefinition; +import io.confluent.connect.jdbc.util.TableId; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.data.Date; @@ -59,34 +83,10 @@ import java.util.Properties; import java.util.Queue; import java.util.Set; +import java.util.StringJoiner; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; -import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.FixedScoreProvider; -import io.confluent.connect.jdbc.sink.JdbcSinkConfig; -import io.confluent.connect.jdbc.sink.JdbcSinkConfig.InsertMode; -import io.confluent.connect.jdbc.sink.JdbcSinkConfig.PrimaryKeyMode; -import io.confluent.connect.jdbc.sink.PreparedStatementBinder; -import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; -import io.confluent.connect.jdbc.sink.metadata.SchemaPair; -import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; -import io.confluent.connect.jdbc.source.ColumnMapping; -import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig; -import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.NumericMapping; -import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig; -import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria; -import io.confluent.connect.jdbc.util.ColumnDefinition; -import io.confluent.connect.jdbc.util.ColumnDefinition.Mutability; -import io.confluent.connect.jdbc.util.ColumnDefinition.Nullability; -import io.confluent.connect.jdbc.util.ColumnId; -import io.confluent.connect.jdbc.util.DateTimeUtils; -import io.confluent.connect.jdbc.util.ExpressionBuilder; -import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; -import io.confluent.connect.jdbc.util.IdentifierRules; -import io.confluent.connect.jdbc.util.JdbcDriverInfo; -import io.confluent.connect.jdbc.util.TableDefinition; -import io.confluent.connect.jdbc.util.TableId; - /** * A {@link DatabaseDialect} implementation that provides functionality based upon JDBC and SQL. * @@ -107,7 +107,7 @@ public class GenericDatabaseDialect implements DatabaseDialect { public static class Provider extends FixedScoreProvider { public Provider() { super(GenericDatabaseDialect.class.getSimpleName(), - DatabaseDialectProvider.AVERAGE_MATCHING_SCORE + DatabaseDialectProvider.AVERAGE_MATCHING_SCORE ); } @@ -132,6 +132,7 @@ public DatabaseDialect create(AbstractConfig config) { private final AtomicReference identifierRules = new AtomicReference<>(); private final Queue connections = new ConcurrentLinkedQueue<>(); private volatile JdbcDriverInfo jdbcDriverInfo; + private Connection connection; /** * Create a new dialect instance with the given connector configuration. @@ -166,7 +167,7 @@ protected GenericDatabaseDialect( tableTypes = new HashSet<>(config.getList(JdbcSourceTaskConfig.TABLE_TYPE_CONFIG)); } if (config instanceof JdbcSourceConnectorConfig) { - mapNumerics = ((JdbcSourceConnectorConfig)config).numericMapping(); + mapNumerics = ((JdbcSourceConnectorConfig) config).numericMapping(); } else { mapNumerics = NumericMapping.NONE; } @@ -177,8 +178,7 @@ public String name() { return getClass().getSimpleName().replace("DatabaseDialect", ""); } - @Override - public Connection getConnection() throws SQLException { + private Connection createConnection() throws SQLException { // These config names are the same for both source and sink configs ... String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); @@ -190,12 +190,18 @@ public Connection getConnection() throws SQLException { properties.setProperty("password", dbPassword.value()); } properties = addConnectionProperties(properties); - Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Connection newConnection = DriverManager.getConnection(jdbcUrl, properties); if (jdbcDriverInfo == null) { - jdbcDriverInfo = createJdbcDriverInfo(connection); + jdbcDriverInfo = createJdbcDriverInfo(newConnection); } - connections.add(connection); - return connection; + return newConnection; + } + + @Override + public Connection getConnection() throws SQLException { + Connection newConnection = createConnection(); + connections.add(newConnection); + return newConnection; } @Override @@ -243,6 +249,10 @@ protected String checkConnectionQuery() { return "SELECT 1"; } + protected void setArrayStatement(PreparedStatement statement, Object value, Schema schema, int index, Connection connection) throws SQLException { + throw new UnsupportedOperationException("Arrays not supported"); + } + protected JdbcDriverInfo jdbcDriverInfo() { if (jdbcDriverInfo == null) { try (Connection connection = getConnection()) { @@ -512,7 +522,7 @@ public Map describeColumns( TableId tableId = parseTableIdentifier(tablePattern); String catalog = tableId.catalogName() != null ? tableId.catalogName() : catalogPattern; String schema = tableId.schemaName() != null ? tableId.schemaName() : schemaPattern; - return describeColumns(connection, catalog , schema, tableId.tableName(), columnPattern); + return describeColumns(connection, catalog, schema, tableId.tableName(), columnPattern); } @Override @@ -732,8 +742,8 @@ public TableDefinition describeTable( TableId tableId ) throws SQLException { Map columnDefns = describeColumns(connection, tableId.catalogName(), - tableId.schemaName(), - tableId.tableName(), null + tableId.schemaName(), + tableId.tableName(), null ); if (columnDefns.isEmpty()) { return null; @@ -832,7 +842,7 @@ public String addFieldToSchema( SchemaBuilder builder ) { return addFieldToSchema(columnDefn, builder, fieldNameFor(columnDefn), columnDefn.type(), - columnDefn.isOptional() + columnDefn.isOptional() ); } @@ -1064,7 +1074,7 @@ public ColumnConverter createColumnConverter( ColumnMapping mapping ) { return columnConverterFor(mapping, mapping.columnDefn(), mapping.columnNumber(), - jdbcDriverInfo().jdbcVersionAtLeast(4, 0) + jdbcDriverInfo().jdbcVersionAtLeast(4, 0) ); } @@ -1339,16 +1349,69 @@ public String buildInsertStatement( builder.append(table); builder.append("("); builder.appendList() - .delimitedBy(",") - .transformedBy(ExpressionBuilder.columnNames()) - .of(keyColumns, nonKeyColumns); + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNames()) + .of(keyColumns, nonKeyColumns); builder.append(") VALUES("); builder.appendMultiple(",", "?", keyColumns.size() + nonKeyColumns.size()); builder.append(")"); return builder.toString(); } - @Override + public String buildDeleteStatement( + TableId table, + Collection keyColumns + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("DELETE FROM "); + builder.append(table); + if (!keyColumns.isEmpty()) { + builder.append(" WHERE "); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(keyColumns); + } + return builder.toString(); + } + + public String buildBulkDeleteStatement( + TableId table, + TableId tmpTable, + Collection keyColumns + ) { + StringJoiner joiner = new StringJoiner(" AND "); + ExpressionBuilder builder = expressionBuilder(); + ExpressionBuilder builderName = expressionBuilder(); + String tableName = builderName.append(table).toString(); + builder.append("DELETE FROM "); + builder.append(table); + if (!keyColumns.isEmpty()) { + builder.append(" WHERE "); + builder.append("EXISTS (SELECT 1 FROM "); + builder.append(tmpTable); + builder.append(" tmp WHERE "); + keyColumns + .stream() + .map(columnId -> tableName + "." + columnId.name() + "=tmp." + columnId.name()) + .forEach(joiner::add); + builder.append(joiner.toString()); + builder.append(")"); + } + return builder.toString(); + } + + public String truncateTableStatement( + TableId table + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("TRUNCATE TABLE "); + builder.append(table); + return builder.toString(); + } + + + @Override public String buildUpdateStatement( TableId table, Collection keyColumns, @@ -1359,15 +1422,15 @@ public String buildUpdateStatement( builder.append(table); builder.append(" SET "); builder.appendList() - .delimitedBy(", ") - .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) - .of(nonKeyColumns); + .delimitedBy(", ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(nonKeyColumns); if (!keyColumns.isEmpty()) { builder.append(" WHERE "); builder.appendList() - .delimitedBy(", ") - .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) - .of(keyColumns); + .delimitedBy(", ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(keyColumns); } return builder.toString(); } @@ -1376,8 +1439,8 @@ public String buildUpdateStatement( public String buildUpsertQueryStatement( TableId table, Collection keyColumns, - Collection nonKeyColumns - ) { + Collection nonKeyColumns, + Map allFields) { throw new UnsupportedOperationException(); } @@ -1450,6 +1513,12 @@ protected boolean maybeBindPrimitive( case STRING: statement.setString(index, (String) value); break; + case ARRAY: + if (connection == null || connection.isClosed()) { + connection = createConnection(); + } + setArrayStatement(statement, value, schema, index, connection); + break; case BYTES: final byte[] bytes; if (value instanceof ByteBuffer) { @@ -1523,9 +1592,9 @@ public String buildCreateTableStatement( builder.append(System.lineSeparator()); builder.append("PRIMARY KEY("); builder.appendList() - .delimitedBy(",") - .transformedBy(ExpressionBuilder.quote()) - .of(pkFieldNames); + .delimitedBy(",") + .transformedBy(ExpressionBuilder.quote()) + .of(pkFieldNames); builder.append(")"); } builder.append(")"); @@ -1570,9 +1639,9 @@ public List buildAlterTable( builder.append(table); builder.append(" "); builder.appendList() - .delimitedBy(",") - .transformedBy(transform) - .of(fields); + .delimitedBy(",") + .transformedBy(transform) + .of(fields); return Collections.singletonList(builder.toString()); } @@ -1658,6 +1727,7 @@ protected void formatColumnValue( case INT64: case FLOAT32: case FLOAT64: + case ARRAY: // no escaping required builder.append(value); break; diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java index 4c0a0212a..e0765f7d6 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialect.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -125,10 +126,10 @@ protected String getSqlType(SinkRecordField field) { @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { //MySql doesn't support SQL 2003:merge so here how the upsert is handled final Transform transform = (builder, col) -> { builder.appendIdentifierQuoted(col.name()); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java index cc6067aea..006345d25 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -161,10 +162,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - final TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + final TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // https://blogs.oracle.com/cmar/entry/using_merge_to_do_an final Transform transform = (builder, col) -> { builder.append(table) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index 7d9c4a016..b2a83fa88 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -13,9 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import io.confluent.connect.jdbc.source.ColumnMapping; +import io.confluent.connect.jdbc.util.ColumnDefinition; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; +import io.confluent.connect.jdbc.util.IdentifierRules; +import io.confluent.connect.jdbc.util.TableId; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -30,22 +38,12 @@ import java.sql.SQLException; import java.sql.Types; import java.util.Collection; - -import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; -import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; -import io.confluent.connect.jdbc.source.ColumnMapping; -import io.confluent.connect.jdbc.util.ColumnDefinition; -import io.confluent.connect.jdbc.util.ColumnId; -import io.confluent.connect.jdbc.util.ExpressionBuilder; -import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; -import io.confluent.connect.jdbc.util.IdentifierRules; -import io.confluent.connect.jdbc.util.TableId; - +import java.util.List; +import java.util.Map; /** * A {@link DatabaseDialect} for PostgreSQL. */ public class PostgreSqlDatabaseDialect extends GenericDatabaseDialect { - /** * The provider for {@link PostgreSqlDatabaseDialect}. */ @@ -62,6 +60,10 @@ public DatabaseDialect create(AbstractConfig config) { private static final String JSON_TYPE_NAME = "json"; private static final String JSONB_TYPE_NAME = "jsonb"; + private static final String POSTGRES_UUID_TYPE_NAME = "PostgresUUID"; + private static final String POSTGRES_OPTIONAL_UUID_TYPE_NAME = "PostgresOptionalUUID"; + private static final String POSTGRES_JSONB_TYPE_NAME = "PostgresJsonb"; + private static final String POSTGRES_OPTIONAL_JSONB_TYPE_NAME = "PostgresOptionalJsonb"; /** * Create a new dialect instance with the given connector configuration. @@ -89,12 +91,10 @@ protected void initializePreparedStatement(PreparedStatement stmt) throws SQLExc log.trace("Initializing PreparedStatement fetch direction to FETCH_FORWARD for '{}'", stmt); stmt.setFetchDirection(ResultSet.FETCH_FORWARD); } - - @Override public String addFieldToSchema( - ColumnDefinition columnDefn, - SchemaBuilder builder + ColumnDefinition columnDefn, + SchemaBuilder builder ) { // Add the PostgreSQL-specific types first final String fieldName = fieldNameFor(columnDefn); @@ -122,8 +122,8 @@ public String addFieldToSchema( // since only fixed byte arrays can have a fixed size if (isJsonType(columnDefn)) { builder.field( - fieldName, - columnDefn.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA + fieldName, + columnDefn.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA ); return fieldName; } @@ -132,14 +132,12 @@ public String addFieldToSchema( default: break; } - // Delegate for the remaining logic return super.addFieldToSchema(columnDefn, builder); } - @Override public ColumnConverter createColumnConverter( - ColumnMapping mapping + ColumnMapping mapping ) { // First handle any PostgreSQL-specific types ColumnDefinition columnDefn = mapping.columnDefn(); @@ -167,16 +165,13 @@ public ColumnConverter createColumnConverter( default: break; } - // Delegate for the remaining logic return super.createColumnConverter(mapping); } - protected boolean isJsonType(ColumnDefinition columnDefn) { String typeName = columnDefn.typeName(); return JSON_TYPE_NAME.equalsIgnoreCase(typeName) || JSONB_TYPE_NAME.equalsIgnoreCase(typeName); } - @Override protected String getSqlType(SinkRecordField field) { if (field.schemaName() != null) { @@ -217,39 +212,89 @@ protected String getSqlType(SinkRecordField field) { } } + @Override + protected boolean maybeBindLogical( + PreparedStatement statement, + int index, + Schema schema, + Object value + ) throws SQLException { + if (schema.name() != null) { + switch (schema.name()) { + case POSTGRES_UUID_TYPE_NAME: + case POSTGRES_OPTIONAL_UUID_TYPE_NAME: + case POSTGRES_JSONB_TYPE_NAME: + case POSTGRES_OPTIONAL_JSONB_TYPE_NAME: + statement.setObject(index, value, Types.OTHER); + return true; + default: + return super.maybeBindLogical(statement, index, schema, value); + } + } + return false; + } + @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { final Transform transform = (builder, col) -> { builder.appendIdentifierQuoted(col.name()) - .append("=EXCLUDED.") - .appendIdentifierQuoted(col.name()); + .append("=EXCLUDED.") + .appendIdentifierQuoted(col.name()); }; - ExpressionBuilder builder = expressionBuilder(); builder.append("INSERT INTO "); builder.append(table); builder.append(" ("); builder.appendList() - .delimitedBy(",") - .transformedBy(ExpressionBuilder.columnNames()) - .of(keyColumns, nonKeyColumns); + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNames()) + .of(keyColumns, nonKeyColumns); builder.append(") VALUES ("); builder.appendMultiple(",", "?", keyColumns.size() + nonKeyColumns.size()); builder.append(") ON CONFLICT ("); builder.appendList() - .delimitedBy(",") - .transformedBy(ExpressionBuilder.columnNames()) - .of(keyColumns); + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNames()) + .of(keyColumns); builder.append(") DO UPDATE SET "); builder.appendList() - .delimitedBy(",") - .transformedBy(transform) - .of(nonKeyColumns); + .delimitedBy(",") + .transformedBy(transform) + .of(nonKeyColumns); return builder.toString(); } - -} + @Override + protected void setArrayStatement(PreparedStatement statement, + Object value, + Schema schema, + int index, + Connection connection) throws SQLException { + Object[] objects = ((List) value).toArray(); + switch (schema.valueSchema().type()) { + case INT16: + statement.setArray(index, connection.createArrayOf("smallint", objects)); + break; + case INT32: + statement.setArray(index, connection.createArrayOf("integer", objects)); + break; + case INT64: + statement.setArray(index, connection.createArrayOf("long", objects)); + break; + case STRING: + statement.setArray(index, connection.createArrayOf("text", objects)); + break; + case FLOAT32: + statement.setArray(index, connection.createArrayOf("float", objects)); + break; + case FLOAT64: + statement.setArray(index, connection.createArrayOf("double", objects)); + break; + default: + throw new IllegalArgumentException("Type of array elements is not within supported types"); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java index a95b2d07e..a75bd9102 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -134,10 +135,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { // https://help.sap.com/hana_one/html/sql_replace_upsert.html ExpressionBuilder builder = expressionBuilder(); builder.append("UPSERT "); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java index 5af92a5b2..b1879e2e6 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -144,10 +145,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("merge into "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java index 7134568cf..304552e8e 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialect.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -114,10 +115,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("INSERT OR REPLACE INTO "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java index 1b4cb4f94..2fd65e20d 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialect.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; @@ -242,10 +243,10 @@ public List buildAlterTable( @Override public String buildUpsertQueryStatement( - TableId table, - Collection keyColumns, - Collection nonKeyColumns - ) { + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields) { ExpressionBuilder builder = expressionBuilder(); builder.append("merge into "); builder.append(table); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java index f2faf6515..2dd846a62 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialect.java @@ -16,6 +16,8 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -26,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; @@ -61,10 +64,17 @@ public VerticaDatabaseDialect(AbstractConfig config) { @Override protected String getSqlType(SinkRecordField field) { + return getSqlType(field, true); + } + + private String getSqlType(SinkRecordField field, boolean sized) { if (field.schemaName() != null) { switch (field.schemaName()) { case Decimal.LOGICAL_NAME: - return "DECIMAL(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")"; + return "DECIMAL" + ( + sized ? "(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")" + : "" + ); case Date.LOGICAL_NAME: return "DATE"; case Time.LOGICAL_NAME: @@ -92,9 +102,9 @@ protected String getSqlType(SinkRecordField field) { case BOOLEAN: return "BOOLEAN"; case STRING: - return "VARCHAR(1024)"; + return "VARCHAR" + (sized ? "(1024)" : ""); case BYTES: - return "VARBINARY(1024)"; + return "VARBINARY" + (sized ? "(1024)" : ""); default: return super.getSqlType(field); } @@ -111,4 +121,70 @@ public List buildAlterTable( } return queries; } + + @Override + public String buildUpsertQueryStatement( + TableId table, + Collection keyColumns, + Collection nonKeyColumns, + Map allFields + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("MERGE INTO "); + builder.append(table); + builder.append(" AS target USING (SELECT "); + builder.appendList() + .delimitedBy(", ") + .transformedBy((b, input) -> transformTypedParam(b, (ColumnId) input, allFields)) + .of(keyColumns, nonKeyColumns); + builder.append(") AS incoming ON ("); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(this::transformAs) + .of(keyColumns); + builder.append(")"); + builder.append(" WHEN MATCHED THEN UPDATE SET "); + builder.appendList() + .delimitedBy(",") + .transformedBy(this::transformUpdate) + .of(nonKeyColumns, keyColumns); + builder.append(" WHEN NOT MATCHED THEN INSERT ("); + builder.appendList() + .delimitedBy(", ") + .transformedBy(ExpressionBuilder.columnNamesWithPrefix("")) + .of(nonKeyColumns, keyColumns); + builder.append(") VALUES ("); + builder.appendList() + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNamesWithPrefix("incoming.")) + .of(nonKeyColumns, keyColumns); + builder.append(");"); + return builder.toString(); + } + + private void transformAs(ExpressionBuilder builder, ColumnId col) { + builder.append("target.") + .appendIdentifierQuoted(col.name()) + .append("=incoming.") + .appendIdentifierQuoted(col.name()); + } + + private void transformUpdate(ExpressionBuilder builder, ColumnId col) { + builder.appendIdentifierQuoted(col.name()) + .append("=incoming.") + .appendIdentifierQuoted(col.name()); + } + + private void transformTypedParam( + ExpressionBuilder builder, + ColumnId col, + Map allFields + ) { + SinkRecordField field = allFields.get(col.name()); + + builder.append("?::") + .append(getSqlType(field, false)) + .append(" AS ") + .appendIdentifierQuoted(col.name()); + } } diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/VerticaTempTableDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaTempTableDatabaseDialect.java new file mode 100644 index 000000000..19a8b010f --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/dialect/VerticaTempTableDatabaseDialect.java @@ -0,0 +1,86 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.jdbc.dialect; + +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.TableId; +import org.apache.kafka.common.config.AbstractConfig; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class VerticaTempTableDatabaseDialect extends VerticaDatabaseDialect { + /** + * Create a new dialect instance with the given connector configuration. + * + * @param config the connector configuration; may not be null + */ + public VerticaTempTableDatabaseDialect(AbstractConfig config) { + super(config); + } + + /** + * @param connection the database connection; may not be null + * @param table name of the table to be created. Must not be qualified + * @param fields record fields to be sinked + * @param preserveOnCommit Use 'ON COMMIT PRESERVE ROWS' but default is false + * @throws SQLException thrown by database + */ + public void recreateTempTable( + Connection connection, String table, + Collection fields, + boolean preserveOnCommit) throws SQLException { + String dropSql = "DROP TABLE IF EXISTS " + table; + String createSql = buildCreateTempTableStatement(table, fields, preserveOnCommit); + applyDdlStatements(connection, Arrays.asList(dropSql, createSql)); + } + + private String buildCreateTempTableStatement( + String table, + Collection fields, boolean preserveOnCommit + ) { + ExpressionBuilder builder = expressionBuilder(); + + final List pkFieldNames = extractPrimaryKeyFieldNames(fields); + builder.append("CREATE LOCAL TEMPORARY TABLE IF NOT EXISTS "); + builder.append(table); + builder.append(" ("); + writeColumnsSpec(builder, fields.stream().filter(SinkRecordField::isPrimaryKey).collect(Collectors.toList())); + if (!pkFieldNames.isEmpty()) { + builder.append(","); + builder.append(System.lineSeparator()); + builder.append("PRIMARY KEY("); + builder.appendList() + .delimitedBy(",") + .transformedBy(ExpressionBuilder.quote()) + .of(pkFieldNames); + builder.append(")"); + } + builder.append(")"); + if (preserveOnCommit) { + builder.append(" ON COMMIT PRESERVE ROWS"); + } + return builder.toString(); + } + +} diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index 522296cc4..4c03ae782 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -1,21 +1,21 @@ /* - * Copyright 2016 Confluent Inc. + * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.sink; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -27,8 +27,8 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import io.confluent.connect.jdbc.dialect.DatabaseDialect; @@ -38,6 +38,10 @@ import io.confluent.connect.jdbc.util.ColumnId; import io.confluent.connect.jdbc.util.TableId; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.InsertMode.INSERT; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + public class BufferedRecords { private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class); @@ -48,10 +52,14 @@ public class BufferedRecords { private final Connection connection; private List records = new ArrayList<>(); - private SchemaPair currentSchemaPair; + private Schema keySchema; + private Schema valueSchema; private FieldsMetadata fieldsMetadata; - private PreparedStatement preparedStatement; - private StatementBinder preparedStatementBinder; + private PreparedStatement updatePreparedStatement; + private PreparedStatement deletePreparedStatement; + private StatementBinder updateStatementBinder; + private StatementBinder deleteStatementBinder; + private boolean deletesInBatch = false; public BufferedRecords( JdbcSinkConfig config, @@ -68,102 +76,70 @@ public BufferedRecords( } public List add(SinkRecord record) throws SQLException { - final SchemaPair schemaPair = new SchemaPair( - record.keySchema(), - record.valueSchema() - ); + final List flushed = new ArrayList<>(); - if (currentSchemaPair == null) { - currentSchemaPair = schemaPair; - // re-initialize everything that depends on the record schema - fieldsMetadata = FieldsMetadata.extract( - tableId.tableName(), - config.pkMode, - config.pkFields, - config.fieldsWhitelist, - currentSchemaPair - ); - dbStructure.createOrAmendIfNecessary( - config, - connection, - tableId, - fieldsMetadata - ); + final Schema recordKeySchema = record.keySchema(); + final Schema recordValueSchema = record.valueSchema(); - final String sql = getInsertSql(); - log.debug( - "{} sql: {}", - config.insertMode, - sql - ); - close(); - preparedStatement = connection.prepareStatement(sql); - preparedStatementBinder = dbDialect.statementBinder( - preparedStatement, - config.pkMode, - schemaPair, - fieldsMetadata, - config.insertMode - ); - } + boolean schemaChanged = (recordKeySchema != null && !recordKeySchema.equals(keySchema)) + || (recordValueSchema != null && !recordValueSchema.equals(valueSchema)); - final List flushed; - if (currentSchemaPair.equals(schemaPair)) { - // Continue with current batch state - records.add(record); - if (records.size() >= config.batchSize) { - flushed = flush(); - } else { - flushed = Collections.emptyList(); + if (schemaChanged) { + // value schema is not null and has changed. This is a real schema change. + flushed.addAll(flush()); + keySchema = recordKeySchema; + valueSchema = recordValueSchema; + resetSchemaState(recordKeySchema, recordValueSchema); + } else if (record.value() == null) { + // For deletes, both the value and value schema come in as null. // NOT TRUE! + // We don't want to treat this as a schema change if key schemas is the same + // otherwise we flush unnecessarily. + if (config.deleteEnabled) { + deletesInBatch = true; } - } else { - // Each batch needs to have the same SchemaPair, so get the buffered records out, reset - // state and re-attempt the add - flushed = flush(); - currentSchemaPair = null; - flushed.addAll(add(record)); + } else if (config.deleteEnabled && deletesInBatch) { + // flush so an insert after a delete of same record isn't lost + flushed.addAll(flush()); + } + + records.add(record); + + if (records.size() >= config.batchSize) { + flushed.addAll(flush()); } return flushed; } + public List flush() throws SQLException { if (records.isEmpty()) { + log.debug("Records is empty"); return new ArrayList<>(); } + log.debug("Flushing {} buffered records", records.size()); for (SinkRecord record : records) { - preparedStatementBinder.bindRecord(record); - } - int totalUpdateCount = 0; - boolean successNoInfo = false; - for (int updateCount : preparedStatement.executeBatch()) { - if (updateCount == Statement.SUCCESS_NO_INFO) { - successNoInfo = true; - continue; + if (isNull(record.value()) && nonNull(deleteStatementBinder)) { + deleteStatementBinder.bindRecord(record); + } else { + updateStatementBinder.bindRecord(record); } - totalUpdateCount += updateCount; } - if (totalUpdateCount != records.size() && !successNoInfo) { - switch (config.insertMode) { - case INSERT: - throw new ConnectException(String.format( - "Update count (%d) did not sum up to total number of records inserted (%d)", - totalUpdateCount, - records.size() - )); - case UPSERT: - case UPDATE: - log.trace( - "{} records:{} resulting in in totalUpdateCount:{}", - config.insertMode, - records.size(), - totalUpdateCount - ); - break; - default: - throw new ConnectException("Unknown insert mode: " + config.insertMode); - } + Optional totalUpdateCount = executeUpdates(); + long totalDeleteCount = executeDeletes(); + + final long expectedCount = updateRecordCount(); + log.trace("{} records:{} resulting in totalUpdateCount:{} totalDeleteCount:{}", + config.insertMode, records.size(), totalUpdateCount, totalDeleteCount + ); + if (totalUpdateCount.filter(total -> total != expectedCount).isPresent() + && config.insertMode == INSERT) { + throw new ConnectException(String.format( + "Update count (%d) did not sum up to total number of records inserted (%d)", + totalUpdateCount.get(), + expectedCount + )); } - if (successNoInfo) { + if (!totalUpdateCount.isPresent()) { log.info( "{} records:{} , but no count of the number of rows it affected is available", config.insertMode, @@ -173,13 +149,107 @@ public List flush() throws SQLException { final List flushedRecords = records; records = new ArrayList<>(); + deletesInBatch = false; return flushedRecords; } + /** + * @return an optional count of all updated rows or an empty optional if no info is available + */ + private Optional executeUpdates() throws SQLException { + Optional count = Optional.empty(); + for (int updateCount : updatePreparedStatement.executeBatch()) { + if (updateCount != Statement.SUCCESS_NO_INFO) { + count = count.isPresent() + ? count.map(total -> total + updateCount) + : Optional.of((long) updateCount); + } + } + return count; + } + + private long executeDeletes() throws SQLException { + long totalDeleteCount = 0; + if (nonNull(deletePreparedStatement)) { + for (int updateCount : deletePreparedStatement.executeBatch()) { + if (updateCount != Statement.SUCCESS_NO_INFO) { + totalDeleteCount += updateCount; + } + } + } + return totalDeleteCount; + } + + private long updateRecordCount() { + return records + .stream() + // ignore deletes + .filter(record -> nonNull(record.value()) || !config.deleteEnabled) + .count(); + } + public void close() throws SQLException { - if (preparedStatement != null) { - preparedStatement.close(); - preparedStatement = null; + log.info( + "Closing BufferedRecords with updatePreparedStatement: {} deletePreparedStatement: {}", + updatePreparedStatement, + deletePreparedStatement + ); + if (nonNull(updatePreparedStatement)) { + updatePreparedStatement.close(); + updatePreparedStatement = null; + } + if (nonNull(deletePreparedStatement)) { + deletePreparedStatement.close(); + deletePreparedStatement = null; + } + } + + private void resetSchemaState(Schema keySchema, Schema valueSchema) throws SQLException { + // re-initialize everything that depends on the record schema + final SchemaPair schemaPair = new SchemaPair( + keySchema, + valueSchema + ); + fieldsMetadata = FieldsMetadata.extract( + tableId.tableName(), + config.pkMode, + config.pkFields, + config.fieldsWhitelist, + schemaPair + ); + dbStructure.createOrAmendIfNecessary( + config, + connection, + tableId, + fieldsMetadata + ); + final String insertSql = getInsertSql(); + final String deleteSql = getDeleteSql(); + log.debug( + "{} sql: {} deleteSql: {} meta: {}", + config.insertMode, + insertSql, + deleteSql, + fieldsMetadata + ); + close(); + updatePreparedStatement = dbDialect.createPreparedStatement(connection, insertSql); + updateStatementBinder = dbDialect.statementBinder( + updatePreparedStatement, + config.pkMode, + schemaPair, + fieldsMetadata, + config.insertMode + ); + if (config.deleteEnabled && nonNull(deleteSql)) { + deletePreparedStatement = dbDialect.createPreparedStatement(connection, deleteSql); + deleteStatementBinder = dbDialect.statementBinder( + deletePreparedStatement, + config.pkMode, + schemaPair, + fieldsMetadata, + config.insertMode + ); } } @@ -194,8 +264,8 @@ private String getInsertSql() { case UPSERT: if (fieldsMetadata.keyFieldNames.isEmpty()) { throw new ConnectException(String.format( - "Write to table '%s' in UPSERT mode requires key field names to be known, check the" - + " primary key configuration", + "Write to table '%s' in UPSERT mode requires key field names to be known" + + ", check the primary key configuration", tableId )); } @@ -203,7 +273,8 @@ private String getInsertSql() { return dbDialect.buildUpsertQueryStatement( tableId, asColumns(fieldsMetadata.keyFieldNames), - asColumns(fieldsMetadata.nonKeyFieldNames) + asColumns(fieldsMetadata.nonKeyFieldNames), + fieldsMetadata.allFields ); } catch (UnsupportedOperationException e) { throw new ConnectException(String.format( @@ -223,9 +294,38 @@ private String getInsertSql() { } } + private String getDeleteSql() { + String sql = null; + if (config.deleteEnabled) { + switch (config.pkMode) { + case RECORD_KEY: + if (fieldsMetadata.keyFieldNames.isEmpty()) { + throw new ConnectException("Require primary keys to support delete"); + } + try { + sql = dbDialect.buildDeleteStatement( + tableId, + asColumns(fieldsMetadata.keyFieldNames) + ); + } catch (UnsupportedOperationException e) { + throw new ConnectException(String.format( + "Deletes to table '%s' are not supported with the %s dialect.", + tableId, + dbDialect.name() + )); + } + break; + + default: + throw new ConnectException("Deletes are only supported for pk.mode record_key"); + } + } + return sql; + } + private Collection asColumns(Collection names) { return names.stream() - .map(name -> new ColumnId(tableId, name)) - .collect(Collectors.toList()); + .map(name -> new ColumnId(tableId, name)) + .collect(Collectors.toList()); } -} +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/sink/DbStructure.java b/src/main/java/io/confluent/connect/jdbc/sink/DbStructure.java index fb55194af..2d126e127 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/DbStructure.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/DbStructure.java @@ -1,17 +1,16 @@ /* - * Copyright 2016 Confluent Inc. + * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.sink; @@ -90,6 +89,7 @@ void create( ); } String sql = dbDialect.buildCreateTableStatement(tableId, fieldsMetadata.allFields.values()); + log.info("Creating table with sql: {}", sql); dbDialect.applyDdlStatements(connection, Collections.singletonList(sql)); } @@ -128,11 +128,13 @@ boolean amendIfNecessary( return false; } - for (SinkRecordField missingField: missingFields) { + for (SinkRecordField missingField : missingFields) { if (!missingField.isOptional() && missingField.defaultValue() == null) { throw new ConnectException( - "Cannot ALTER to add missing field " + missingField - + ", as it is not optional and does not have a default value" + String.format( + "Cannot ALTER %s to add missing field %s, as it is not optional and " + + "does not have a default value", + tableId, missingField) ); } } @@ -188,6 +190,7 @@ Set missingFields( final Set missingFields = new HashSet<>(); for (SinkRecordField field : fields) { if (!dbColumnNames.contains(field.name())) { + log.debug("Found missing field: {}", field); missingFields.add(field); } } @@ -198,7 +201,7 @@ Set missingFields( // check if the missing fields can be located by ignoring case Set columnNamesLowerCase = new HashSet<>(); - for (String columnName: dbColumnNames) { + for (String columnName : dbColumnNames) { columnNamesLowerCase.add(columnName.toLowerCase()); } @@ -210,7 +213,7 @@ Set missingFields( } final Set missingFieldsIgnoreCase = new HashSet<>(); - for (SinkRecordField missing: missingFields) { + for (SinkRecordField missing : missingFields) { if (!columnNamesLowerCase.contains(missing.name().toLowerCase())) { missingFieldsIgnoreCase.add(missing); } @@ -226,4 +229,4 @@ Set missingFields( return missingFieldsIgnoreCase; } -} +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index cb3ada95c..037e5c356 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -98,6 +98,13 @@ public enum PrimaryKeyMode { + " table, when possible."; private static final String BATCH_SIZE_DISPLAY = "Batch Size"; + public static final String DELETE_ENABLED = "delete.enabled"; + private static final String DELETE_ENABLED_DEFAULT = "false"; + private static final String DELETE_ENABLED_DOC = + "Whether to treat ``null`` record values as deletes. Requires ``pk.mode`` to be" + + " ``record_key``."; + private static final String DELETE_ENABLED_DISPLAY = "Enable deletes"; + public static final String AUTO_CREATE = "auto.create"; private static final String AUTO_CREATE_DEFAULT = "false"; private static final String AUTO_CREATE_DOC = @@ -259,6 +266,17 @@ public enum PrimaryKeyMode { ConfigDef.Width.SHORT, BATCH_SIZE_DISPLAY ) + .define( + DELETE_ENABLED, + ConfigDef.Type.BOOLEAN, + DELETE_ENABLED_DEFAULT, + ConfigDef.Importance.MEDIUM, + DELETE_ENABLED_DOC, + WRITES_GROUP, + 3, + ConfigDef.Width.SHORT, + DELETE_ENABLED_DISPLAY + ) // Data Mapping .define( TABLE_NAME_FORMAT, @@ -356,6 +374,7 @@ public enum PrimaryKeyMode { public final String connectionPassword; public final String tableNameFormat; public final int batchSize; + public final boolean deleteEnabled; public final int maxRetries; public final int retryBackoffMs; public final boolean autoCreate; @@ -373,6 +392,7 @@ public JdbcSinkConfig(Map props) { connectionPassword = getPasswordValue(CONNECTION_PASSWORD); tableNameFormat = getString(TABLE_NAME_FORMAT).trim(); batchSize = getInt(BATCH_SIZE); + deleteEnabled = getBoolean(DELETE_ENABLED); maxRetries = getInt(MAX_RETRIES); retryBackoffMs = getInt(RETRY_BACKOFF_MS); autoCreate = getBoolean(AUTO_CREATE); diff --git a/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java b/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java index 9592ca041..48bd59227 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/PreparedStatementBinder.java @@ -1,21 +1,24 @@ /* - * Copyright 2016 Confluent Inc. + * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.sink; +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.dialect.DatabaseDialect.StatementBinder; +import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; +import io.confluent.connect.jdbc.sink.metadata.SchemaPair; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -25,10 +28,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; -import io.confluent.connect.jdbc.dialect.DatabaseDialect; -import io.confluent.connect.jdbc.dialect.DatabaseDialect.StatementBinder; -import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; -import io.confluent.connect.jdbc.sink.metadata.SchemaPair; +import static java.util.Objects.isNull; public class PreparedStatementBinder implements StatementBinder { @@ -58,31 +58,38 @@ public PreparedStatementBinder( @Override public void bindRecord(SinkRecord record) throws SQLException { final Struct valueStruct = (Struct) record.value(); - + final boolean isDelete = isNull(valueStruct); // Assumption: the relevant SQL has placeholders for keyFieldNames first followed by // nonKeyFieldNames, in iteration order for all INSERT/ UPSERT queries + // the relevant SQL has placeholders for keyFieldNames, + // in iteration order for all DELETE queries // the relevant SQL has placeholders for nonKeyFieldNames first followed by // keyFieldNames, in iteration order for all UPDATE queries int index = 1; - switch (insertMode) { - case INSERT: - case UPSERT: - index = bindKeyFields(record, index); - bindNonKeyFields(record, valueStruct, index); - break; - - case UPDATE: - index = bindNonKeyFields(record, valueStruct, index); - bindKeyFields(record, index); - break; - default: - throw new AssertionError(); + if (isDelete) { + bindKeyFields(record, index); + } else { + switch (insertMode) { + case INSERT: + case UPSERT: + index = bindKeyFields(record, index); + bindNonKeyFields(record, valueStruct, index); + break; + + case UPDATE: + index = bindNonKeyFields(record, valueStruct, index); + bindKeyFields(record, index); + break; + default: + throw new AssertionError(); + } } statement.addBatch(); } + protected int bindKeyFields(SinkRecord record, int index) throws SQLException { switch (pkMode) { case NONE: @@ -141,4 +148,4 @@ protected int bindNonKeyFields( protected void bindField(int index, Schema schema, Object value) throws SQLException { dialect.bindField(statement, index, schema, value); } -} +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/jdbc/sink/VerticaBulkOpsBufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/VerticaBulkOpsBufferedRecords.java new file mode 100644 index 000000000..be6c73358 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/sink/VerticaBulkOpsBufferedRecords.java @@ -0,0 +1,365 @@ +package io.confluent.connect.jdbc.sink; + +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.dialect.VerticaTempTableDatabaseDialect; +import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; +import io.confluent.connect.jdbc.sink.metadata.SchemaPair; +import io.confluent.connect.jdbc.util.ColumnId; +import io.confluent.connect.jdbc.util.ExpressionBuilder; +import io.confluent.connect.jdbc.util.TableId; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class VerticaBulkOpsBufferedRecords { + + private static final Logger log = LoggerFactory.getLogger(VerticaBulkOpsBufferedRecords.class); + + private final TableId tableId; + private final TableId tmpTableId; + private final JdbcSinkConfig config; + private final VerticaTempTableDatabaseDialect dbDialect; + private final DbStructure dbStructure; + private final Connection connection; + + private Map deleteOps = new HashMap<>(); + private Map insertOps = new HashMap<>(); + + private Schema knownKeySchema; + private Schema knownValueSchema; + private VerticaBulkOpsBufferedRecords.RecordSchemaDerivedState schemaState; + + public VerticaBulkOpsBufferedRecords( + JdbcSinkConfig config, + TableId tableId, + VerticaTempTableDatabaseDialect dbDialect, + DbStructure dbStructure, + Connection connection, + Schema keySchema, + Schema valueSchema + ) throws SQLException { + this.tableId = tableId; + String tempTableName = ExpressionBuilder.create().append(tableId, false) + .toString().replace('.', '_'); + this.tmpTableId = new TableId(null, null, "local_temp_" + tempTableName); + this.config = config; + this.dbDialect = dbDialect; + this.dbStructure = dbStructure; + this.connection = connection; + this.knownKeySchema = keySchema; + this.knownValueSchema = valueSchema; + this.schemaState = new VerticaBulkOpsBufferedRecords.RecordSchemaDerivedState(keySchema, valueSchema); + } + + public List add(SinkRecord record) throws SQLException { + + final Schema recordKeySchema = record.keySchema(); + final Schema recordValueSchema = record.valueSchema(); + + boolean isSchemaChange = (recordKeySchema != null && !recordKeySchema.equals(knownKeySchema)) + || (recordValueSchema != null && !recordValueSchema.equals(knownValueSchema)); + + ArrayList flushed = new ArrayList<>(); + if (isSchemaChange) { + // flush buffer with existing schema + flushed.addAll(flush()); + // reset/reinitialize schema state + knownKeySchema = recordKeySchema; + knownValueSchema = recordValueSchema; + schemaState.close(); + schemaState = new VerticaBulkOpsBufferedRecords.RecordSchemaDerivedState(recordKeySchema, recordValueSchema); + } + + if (record.value() == null) { + insertOps.remove(record.key()); + deleteOps.put(record.key(), record); + } else { + deleteOps.remove(record.key()); + insertOps.put(record.key(), record); + } + + if ((insertOps.size() + deleteOps.size()) >= config.batchSize) { + log.info("Flushing buffered buffer after exceeding configured batch size {}.", + config.batchSize); + flushed.addAll(flush()); + } + return flushed; + } + + public void close() throws SQLException { + schemaState.close(); + } + + public List flush() throws SQLException { + if (insertOps.isEmpty() && deleteOps.isEmpty()) { + return Collections.emptyList(); + } + + log.debug("TO DELETE: {}, TO INSERT: {}", deleteOps.size(), insertOps.size()); + for (Map.Entry e : deleteOps.entrySet()) { + schemaState.bulkTmpInsertBinder.bindRecord(e.getValue()); + } + for (Map.Entry e : insertOps.entrySet()) { + SinkRecord toErase = e.getValue(); + SinkRecord sinkRecord = toErase.newRecord( + toErase.topic(), + toErase.kafkaPartition(), + toErase.keySchema(), + toErase.key(), + toErase.valueSchema(), + null, + toErase.timestamp()); + schemaState.bulkTmpInsertBinder.bindRecord(sinkRecord); + } + for (Map.Entry e : insertOps.entrySet()) { + schemaState.bulkInsertBinder.bindRecord(e.getValue()); + } + //1. Insert into tem + int totalACount = 0; + for (int updateCount : schemaState.bulkTmpInsertStatement.executeBatch()) { + if (updateCount != Statement.SUCCESS_NO_INFO) { + totalACount += updateCount; + } + } + log.debug("INSERTED INTO TMP: {}", totalACount); + + //2. Bulk delete from temp + int deletedRecords; + + try { + deletedRecords = schemaState.bulkDeleteStatement.executeUpdate(); + } catch (SQLException e) { + log.error(e.getSQLState()); + throw e; + } + boolean deleteSuccess = deletedRecords == (deleteOps.size() + insertOps.size()); + log.debug("DELETE SUCCESS: {}, DELETED RECORDS: {}", deleteSuccess, deletedRecords); + + //3. Bulk insert + int totalInsertCount = 0; + if (!insertOps.isEmpty()) { + for (int updateCount : schemaState.bulkInsertStatement.executeBatch()) { + if (updateCount != Statement.SUCCESS_NO_INFO) { + totalInsertCount += updateCount; + } + } + } + log.debug("INSERTED INTO DESTINATION: {}", totalInsertCount); + + boolean truncatedTempSuccess; + try { + truncatedTempSuccess = schemaState.truncateStatement.execute(); + } catch (SQLException e) { + log.error("TRUNCATED ERROR {}", e.getSQLState()); + throw e; + } + log.debug("TRUNCATE SUCCESS: {}", truncatedTempSuccess); + + checkAffectedRowCount(deletedRecords, totalInsertCount, truncatedTempSuccess); + + connection.commit(); + + final List flushedRecords = new LinkedList<>(); + flushedRecords.addAll(new ArrayList<>(deleteOps.values())); + flushedRecords.addAll(new ArrayList<>(insertOps.values())); + return flushedRecords; + } + + private void checkAffectedRowCount(int deletedRecords, int insertedRecords, boolean truncate) { + if(deletedRecords > deleteOps.size() + insertOps.size()){ + throw new ConnectException(String.format( + "Deleted row count (%d) did not sum up to total number of buffer inserted/deleted (%d)", + deletedRecords, + deleteOps.size() + insertOps.size() + )); + } else if (insertedRecords != insertOps.size()) { + throw new ConnectException(String.format( + "Inserted count (%d) did not sum up to total number of buffer inserted/deleted (%d)", + insertedRecords, + insertOps.size() + )); + } else if (truncate) { + throw new ConnectException("Temp table not truncated"); + } + } + + private FieldsMetadata checkDatabaseSchema( + Schema keySchema, + Schema valueSchema) throws SQLException { + FieldsMetadata fieldsMetadata = FieldsMetadata.extract( + tableId.tableName(), + config.pkMode, + config.pkFields, + config.fieldsWhitelist, + keySchema, + valueSchema + ); + dbStructure.createOrAmendIfNecessary( + config, + connection, + tableId, + fieldsMetadata + ); + return fieldsMetadata; + } + + /** + * Inserts go into temporary local table. + */ + private String getInsertSql(FieldsMetadata fieldsMetadata) { + return dbDialect.buildInsertStatement( + tableId, + asColumns(fieldsMetadata.keyFieldNames), + asColumns(fieldsMetadata.nonKeyFieldNames) + ); + } + + /** + * Inserts go into temporary local table. + */ + private String getTempInsertSql(FieldsMetadata fieldsMetadata) { + return dbDialect.buildInsertStatement( + tmpTableId, + asColumns(fieldsMetadata.keyFieldNames), + Collections.emptyList() + ); + } + + private String getTruncateSql(TableId tableId) { + return dbDialect.truncateTableStatement(tableId); + } + + /** + * Deletes are slow and will go into directly into target table + * but we have to do it to preserve order of operations. + */ + private String getDeleteSql(FieldsMetadata fieldsMetadata) { + String sql = null; + if (config.deleteEnabled) { + switch (config.pkMode) { + case NONE: + case KAFKA: + case RECORD_VALUE: + throw new ConnectException("Deletes are only supported for pk.mode record_key"); + case RECORD_KEY: + if (fieldsMetadata.keyFieldNames.isEmpty()) { + throw new ConnectException("Require primary keys to support delete"); + } + sql = dbDialect.buildBulkDeleteStatement( + tableId, + tmpTableId, + asColumns(fieldsMetadata.keyFieldNames) + ); + break; + default: + break; + } + } + return sql; + } + + private Collection asColumns(Collection names) { + return names.stream() + .map(name -> new ColumnId(tableId, name)) + .collect(Collectors.toList()); + } + + class RecordSchemaDerivedState { + private final PreparedStatement bulkDeleteStatement; + private PreparedStatement bulkInsertStatement; + private final PreparedStatement truncateStatement; + private DatabaseDialect.StatementBinder bulkInsertBinder; + private PreparedStatement bulkTmpInsertStatement; + private DatabaseDialect.StatementBinder bulkTmpInsertBinder; + + RecordSchemaDerivedState(Schema keySchema, Schema valueSchema) throws SQLException { + FieldsMetadata fieldsMetadata = checkDatabaseSchema(keySchema, valueSchema); + + log.debug(fieldsMetadata.keyFieldNames.toString()); + log.debug(fieldsMetadata.nonKeyFieldNames.toString()); + + final String insertSql = getInsertSql(fieldsMetadata); + + final String deleteSql = getDeleteSql(fieldsMetadata); + + final String truncateSql = getTruncateSql(tmpTableId); + + log.debug( + "\n{} sql: {}\nDELETE sql: {}", + config.insertMode, + insertSql, + deleteSql + ); + + // temporary table must exist for insert prepared statement + // drop & create is performed for case that schema changes in the middle of buffer + dbDialect.recreateTempTable( + connection, + tmpTableId.tableName(), + fieldsMetadata.allFields.values(), + true); + + SchemaPair schemaPair = new SchemaPair(keySchema, valueSchema); + + bulkDeleteStatement = config.deleteEnabled + ? connection.prepareStatement(deleteSql) : null; + + truncateStatement = config.deleteEnabled + ? connection.prepareStatement(truncateSql) : null; + + + if (valueSchema != null) { + + bulkInsertStatement = connection.prepareStatement(insertSql); + bulkInsertBinder = dbDialect.statementBinder( + bulkInsertStatement, + config.pkMode, + schemaPair, + fieldsMetadata, + config.insertMode + ); + } + + bulkTmpInsertStatement = connection.prepareStatement(getTempInsertSql(fieldsMetadata)); + bulkTmpInsertBinder = dbDialect.statementBinder( + bulkTmpInsertStatement, + config.pkMode, + schemaPair, + fieldsMetadata, + config.insertMode + ); + + } + + void close() throws SQLException { + if (bulkInsertStatement != null) { + bulkInsertStatement.close(); + } + if (bulkTmpInsertStatement != null) { + bulkTmpInsertStatement.close(); + } + if (bulkDeleteStatement != null) { + bulkDeleteStatement.close(); + } + if (truncateStatement != null) { + truncateStatement.close(); + } + } + } + +} diff --git a/src/main/java/io/confluent/connect/jdbc/sink/VerticaJdbcDbWriter.java b/src/main/java/io/confluent/connect/jdbc/sink/VerticaJdbcDbWriter.java new file mode 100644 index 000000000..c77dafa91 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/sink/VerticaJdbcDbWriter.java @@ -0,0 +1,77 @@ +/* + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.jdbc.sink; + +import io.confluent.connect.jdbc.dialect.VerticaTempTableDatabaseDialect; +import io.confluent.connect.jdbc.util.TableId; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Replace BufferedRecords with custom VerticaBufferedRecords in {@link #write(Collection)} + */ +public class VerticaJdbcDbWriter extends JdbcDbWriter { + private JdbcSinkConfig config; + private VerticaTempTableDatabaseDialect dbDialect; + private DbStructure dbStructure; + + private static final Logger log = LoggerFactory.getLogger(VerticaJdbcDbWriter.class); + + VerticaJdbcDbWriter( + JdbcSinkConfig config, + VerticaTempTableDatabaseDialect dbDialect, + DbStructure dbStructure) { + super(config, dbDialect, dbStructure); + this.config = config; + this.dbDialect = dbDialect; + this.dbStructure = dbStructure; + } + + @Override + void write(final Collection records) throws SQLException { + final Connection connection = cachedConnectionProvider.getConnection(); + + final Map bufferByTable = new HashMap<>(); + log.info("{} records to write", records.size()); + for (SinkRecord record : records) { + final TableId tableId = destinationTable(record.topic()); + VerticaBulkOpsBufferedRecords buffer = bufferByTable.get(tableId); + if (buffer == null) { + buffer = new VerticaBulkOpsBufferedRecords( + config, tableId, dbDialect, dbStructure, + connection, record.keySchema(), record.valueSchema()); + bufferByTable.put(tableId, buffer); + } + buffer.add(record); + } + for (Map.Entry entry : bufferByTable.entrySet()) { + VerticaBulkOpsBufferedRecords buffer = entry.getValue(); + log.info("Flushing records into {}", entry.getKey()); + buffer.flush(); + buffer.close(); + } + connection.commit(); + log.info("{} records committed", records.size()); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/sink/VerticaSinkTask.java b/src/main/java/io/confluent/connect/jdbc/sink/VerticaSinkTask.java new file mode 100644 index 000000000..f7d399194 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/sink/VerticaSinkTask.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.jdbc.sink; + +import io.confluent.connect.jdbc.dialect.VerticaTempTableDatabaseDialect; + +/** + * Just change JdbcDbWriter to VerticaJdbcDbWriter in {@link #initWriter()} + */ +public class VerticaSinkTask extends JdbcSinkTask { + + @Override + void initWriter() { + VerticaTempTableDatabaseDialect dialect = new VerticaTempTableDatabaseDialect(config); + this.dialect = dialect; + final DbStructure dbStructure = new DbStructure(dialect); + writer = new VerticaJdbcDbWriter(config, dialect, dbStructure); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java index 018f92e93..0a9ffe2c9 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java @@ -140,6 +140,11 @@ protected void createPreparedStatement(Connection db) throws SQLException { incrementingColumn = new ColumnId(tableId, incrementingColumnName); } + doCreatePreparedStatement(db, incrementingColumn); + } + + private void doCreatePreparedStatement(Connection db, ColumnId incrementingColumn) + throws SQLException { ExpressionBuilder builder = dialect.expressionBuilder(); switch (mode) { case TABLE: diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java index dca551802..1c107ddd8 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.ConnectException; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.math.BigDecimal; @@ -349,6 +350,7 @@ public void bindFieldStructUnsupported() throws SQLException { dialect.bindField(mock(PreparedStatement.class), 1, structSchema, new Struct(structSchema)); } + @Ignore @Test(expected = ConnectException.class) public void bindFieldArrayUnsupported() throws SQLException { Schema arraySchema = SchemaBuilder.array(Schema.INT8_SCHEMA); diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java index 94a62148d..0ee58a618 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/Db2DatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -184,7 +185,7 @@ public void shouldBuildUpsertStatement() { + "values(DAT.\"columnA\",DAT.\"columnB\",DAT.\"columnC\",DAT.\"columnD\"," + "DAT.\"id1\"," + "DAT.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -202,8 +203,8 @@ public void upsert() { String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", "score" - ) - ); + ), + Collections.emptyMap()); assertEquals(expected, sql); } @@ -214,7 +215,15 @@ public void upsertOnlyKeyCols() { + ".\"actor_id\"=DAT.\"actor_id\" when not matched then insert(\"actor\"" + ".\"actor_id\") values(DAT.\"actor_id\")"; String sql = dialect.buildUpsertQueryStatement( - actor, columns(actor, "actor_id"), columns(actor)); + actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java index b616c10b4..09b2bec66 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/DerbyDatabaseDialectTest.java @@ -23,6 +23,7 @@ import org.junit.Ignore; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -186,7 +187,7 @@ public void shouldBuildUpsertStatement() { + ".\"id2\") " + "values(DAT.\"columnA\",DAT.\"columnB\",DAT.\"columnC\",DAT.\"columnD\",DAT.\"id1\"," + "DAT.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } @@ -205,8 +206,8 @@ public void upsert() { String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", "score" - ) - ); + ), + Collections.emptyMap()); assertEquals(expected, sql); } @@ -217,7 +218,15 @@ public void upsertOnlyKeyCols() { + ".\"actor_id\"=DAT.\"actor_id\" when not matched then insert(\"actor\"" + ".\"actor_id\") values(DAT.\"actor_id\")"; String sql = dialect.buildUpsertQueryStatement( - actor, columns(actor, "actor_id"), columns(actor)); + actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java index fc4a8d48e..ed9b53bc1 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java @@ -319,7 +319,7 @@ public void shouldBuildAlterTableStatement() { @Test(expected = UnsupportedOperationException.class) public void shouldBuildUpsertStatement() { - dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); } @@ -380,4 +380,14 @@ public void writeColumnSpec() { verifyWriteColumnSpec("\"foo\" DUMMY NOT NULL", new SinkRecordField(Schema.OPTIONAL_INT32_SCHEMA, "foo", true)); verifyWriteColumnSpec("\"foo\" DUMMY NULL", new SinkRecordField(Schema.OPTIONAL_INT32_SCHEMA, "foo", false)); } + + + @Test + public void testBulkDelete() { + TableId destTable = new TableId("x", "y", "z"); + TableId tmpTable = new TableId("a", "b", "c"); + String result = dialect.buildBulkDeleteStatement(destTable, tmpTable, Arrays.asList(new ColumnId(destTable, "id1"),new ColumnId(destTable, "id2"))); + String expected = "DELETE FROM \"x\".\"y\".\"z\" WHERE EXISTS (SELECT 1 FROM \"a\".\"b\".\"c\" tmp WHERE \"x\".\"y\".\"z\".id1=tmp.id1 AND \"x\".\"y\".\"z\".id2=tmp.id2)"; + assertEquals(expected, result); + } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java index 1817d5837..efb6727fa 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/MySqlDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -119,7 +120,15 @@ public void shouldBuildUpsertStatement() { " values(?,?,?,?,?,?) on duplicate key update `columnA`=values(`columnA`)," + "`columnB`=values(`columnB`),`columnC`=values(`columnC`),`columnD`=values" + "(`columnD`)"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM `myTable` WHERE `id1` = ? AND `id2` = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -164,7 +173,7 @@ public void upsert() { "`last_name`=values(`last_name`),`score`=values(`score`)"; String sql = dialect.buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor, "first_name", "last_name", - "score")); + "score"), Collections.emptyMap()); assertEquals(expected, sql); } @@ -174,7 +183,7 @@ public void upsertOnlyKeyCols() { String expected = "insert into `actor`(`actor_id`) " + "values(?) on duplicate key update `actor_id`=values(`actor_id`)"; String sql = dialect - .buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor)); + .buildUpsertQueryStatement(actor, columns(actor, "actor_id"), columns(actor), Collections.emptyMap()); assertEquals(expected, sql); } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java index 14c8d9e4e..b8981fe57 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -126,7 +127,15 @@ public void shouldBuildUpsertStatement() { "\"myTable\".\"id1\",\"myTable\".\"id2\") values(incoming.\"columnA\"," + "incoming.\"columnB\",incoming.\"columnC\",incoming.\"columnD\",incoming" + ".\"id1\",incoming.\"id2\")"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -178,7 +187,7 @@ public void upsert() { "\"ARTICLE\".\"author\") " + "values(incoming.\"body\",incoming.\"title\",incoming.\"author\")"; String actual = dialect.buildUpsertQueryStatement(article, columns(article, "title", "author"), - columns(article, "body")); + columns(article, "body"), Collections.emptyMap()); assertEquals(expected, actual); } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 19af3e1cf..7dd3a6a3a 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -119,7 +120,15 @@ public void shouldBuildUpsertStatement() { "\"id2\") DO UPDATE SET \"columnA\"=EXCLUDED" + ".\"columnA\",\"columnB\"=EXCLUDED.\"columnB\",\"columnC\"=EXCLUDED" + ".\"columnC\",\"columnD\"=EXCLUDED.\"columnD\""; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -163,7 +172,7 @@ public void upsert() { "VALUES (?,?,?,?) ON CONFLICT (\"id\") DO UPDATE SET \"name\"=EXCLUDED.\"name\"," + "\"salary\"=EXCLUDED.\"salary\",\"address\"=EXCLUDED.\"address\"", dialect .buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java index c9213b6ca..4cfce64ec 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SapHanaDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -120,10 +121,17 @@ public void shouldBuildAlterTableStatement() { public void shouldBuildUpsertStatement() { String expected = "UPSERT \"myTable\"(\"id1\",\"id2\",\"columnA\",\"columnB\",\"columnC\"," + "\"columnD\") VALUES(?,?,?,?,?,?) WITH PRIMARY KEY"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); assertEquals(expected, sql); } + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } @Test public void createOneColNoPk() { @@ -165,7 +173,7 @@ public void upsert() { assertEquals( "UPSERT \"tableA\"(\"col1\",\"col2\",\"col3\",\"col4\") VALUES(?,?,?,?) WITH PRIMARY KEY", dialect.buildUpsertQueryStatement(tableA, columns(tableA, "col1"), - columns(tableA, "col2", "col3", "col4"))); + columns(tableA, "col2", "col3", "col4"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java index 62d94df8e..6711cf24e 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.util.TableId; @@ -124,7 +125,15 @@ public void shouldBuildUpsertStatement() { "[columnB], [columnC], [columnD], [id1], [id2]) values (incoming.[columnA]," + "incoming.[columnB],incoming.[columnC],incoming.[columnD],incoming.[id1]," + "incoming.[id2]);"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM [myTable] WHERE [id1] = ? AND [id2] = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -173,7 +182,7 @@ public void upsert1() { "([name], [salary], [address], [id]) values (incoming.[name],incoming" + ".[salary],incoming.[address],incoming.[id]);", dialect.buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } @Test @@ -188,6 +197,6 @@ public void upsert2() { "matched then insert ([ISBN], [year], [pages], [author], [title]) values (incoming" + ".[ISBN],incoming.[year]," + "incoming.[pages],incoming.[author],incoming.[title]);", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java index 5601e7fa6..b996d2599 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqliteDatabaseDialectTest.java @@ -22,10 +22,12 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.sql.SQLException; import java.sql.Types; +import java.util.Collections; import java.util.List; import io.confluent.connect.jdbc.sink.SqliteHelper; @@ -35,6 +37,7 @@ import static org.junit.Assert.assertEquals; +@Ignore public class SqliteDatabaseDialectTest extends BaseDialectTest { private final SqliteHelper sqliteHelper = new SqliteHelper(getClass().getSimpleName()); @@ -138,7 +141,15 @@ public void shouldBuildAlterTableStatement() { public void shouldBuildUpsertStatement() { String expected = "INSERT OR REPLACE INTO `myTable`(`id1`,`id2`,`columnA`,`columnB`," + "`columnC`,`columnD`) VALUES(?,?,?,?,?,?)"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM `myTable` WHERE `id1` = ? AND `id2` = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -180,7 +191,7 @@ public void upsert() { assertEquals( "INSERT OR REPLACE INTO `Book`(`author`,`title`,`ISBN`,`year`,`pages`) VALUES(?,?,?,?,?)", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } @Test(expected = SQLException.class) diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java index d11376d11..b4da2cff6 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SybaseDatabaseDialectTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -154,7 +155,15 @@ public void shouldBuildUpsertStatement() { "\"columnB\", \"columnC\", \"columnD\", \"id1\", \"id2\") values (incoming.\"columnA\"," + "incoming.\"columnB\",incoming.\"columnC\",incoming.\"columnD\",incoming.\"id1\"," + "incoming.\"id2\");"; - String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + String sql = dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD, Collections.emptyMap()); + assertEquals(expected, sql); + } + + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + assertEquals(expected, sql); } @@ -203,7 +212,7 @@ public void upsert1() { "(\"name\", \"salary\", \"address\", \"id\") values (incoming.\"name\",incoming" + ".\"salary\",incoming.\"address\",incoming.\"id\");", dialect.buildUpsertQueryStatement(customer, columns(customer, "id"), - columns(customer, "name", "salary", "address"))); + columns(customer, "name", "salary", "address"), Collections.emptyMap())); } @Test @@ -218,7 +227,7 @@ public void upsert2() { "matched then insert (\"ISBN\", \"year\", \"pages\", \"author\", \"title\") values (incoming" + ".\"ISBN\",incoming.\"year\"," + "incoming.\"pages\",incoming.\"author\",incoming.\"title\");", dialect.buildUpsertQueryStatement(book, columns(book, "author", "title"), - columns(book, "ISBN", "year", "pages"))); + columns(book, "ISBN", "year", "pages"), Collections.emptyMap())); } @Test diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java index 00df56ee1..770c79421 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/VerticaDatabaseDialectTest.java @@ -14,6 +14,8 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import io.confluent.connect.jdbc.util.TableId; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -22,6 +24,8 @@ import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.junit.Assert.assertEquals; @@ -114,9 +118,37 @@ public void shouldBuildAlterTableStatement() { assertStatements(sql, statements); } - @Test(expected = UnsupportedOperationException.class) + @Test + public void shouldBuildDeleteStatement() { + String expected = "DELETE FROM \"myTable\" WHERE \"id1\" = ? AND \"id2\" = ?"; + String sql = dialect.buildDeleteStatement(tableId, pkColumns); + + assertEquals(expected, sql); + } + + @Test public void shouldBuildUpsertStatement() { - dialect.buildUpsertQueryStatement(tableId, pkColumns, columnsAtoD); + TableId book = tableId("Book"); + HashMap fields = new HashMap<>(); + fields.put("author", new SinkRecordField(Schema.STRING_SCHEMA, "author", true)); + fields.put("title", new SinkRecordField(Schema.STRING_SCHEMA, "title", true)); + fields.put("ISBN", new SinkRecordField(Schema.STRING_SCHEMA, "ISBN", false)); + fields.put("year", new SinkRecordField(Schema.INT32_SCHEMA, "year", false)); + fields.put("pages", new SinkRecordField(Schema.INT32_SCHEMA, "pages", false)); + assertEquals( + "MERGE INTO \"Book\" AS target USING (" + + "SELECT ?::VARCHAR AS \"author\", ?::VARCHAR AS \"title\", ?::VARCHAR AS \"ISBN\", ?::INT AS \"year\", ?::INT AS \"pages\"" + + ") AS incoming ON (target.\"author\"=incoming.\"author\" AND target.\"title\"=incoming.\"title\")" + + " WHEN MATCHED THEN UPDATE SET \"ISBN\"=incoming.\"ISBN\",\"year\"=incoming.\"year\",\"pages\"=incoming.\"pages\"," + + "\"author\"=incoming.\"author\",\"title\"=incoming.\"title\"" + + " WHEN NOT MATCHED THEN INSERT (\"ISBN\", \"year\", \"pages\", \"author\", \"title\") VALUES (" + + "incoming.\"ISBN\",incoming.\"year\",incoming.\"pages\",incoming.\"author\",incoming.\"title\");", + dialect.buildUpsertQueryStatement( + book, + columns(book, "author", "title"), + columns(book, "ISBN", "year", "pages"), + fields) + ); } diff --git a/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java b/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java index 417758bac..181c641a5 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java @@ -1,17 +1,16 @@ /* - * Copyright 2016 Confluent Inc. + * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.sink; @@ -75,19 +74,19 @@ public void correctBatching() throws SQLException { final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); final Schema schemaA = SchemaBuilder.struct() - .field("name", Schema.STRING_SCHEMA) - .build(); + .field("name", Schema.STRING_SCHEMA) + .build(); final Struct valueA = new Struct(schemaA) - .put("name", "cuba"); + .put("name", "cuba"); final SinkRecord recordA = new SinkRecord("dummy", 0, null, null, schemaA, valueA, 0); final Schema schemaB = SchemaBuilder.struct() - .field("name", Schema.STRING_SCHEMA) - .field("age", Schema.OPTIONAL_INT32_SCHEMA) - .build(); + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); final Struct valueB = new Struct(schemaB) - .put("name", "cuba") - .put("age", 4); + .put("name", "cuba") + .put("age", 4); final SinkRecord recordB = new SinkRecord("dummy", 1, null, null, schemaB, valueB, 1); // test records are batched correctly based on schema equality as records are added @@ -104,6 +103,123 @@ public void correctBatching() throws SQLException { assertEquals(Collections.singletonList(recordA), buffer.flush()); } + @Test + public void insertThenDeleteInBatchNoFlush() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("delete.enabled", true); + props.put("insert.mode", "upsert"); + props.put("pk.mode", "record_key"); + props.put("batch.size", 1000); // sufficiently high to not cause flushes due to buffer being full + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final String url = sqliteHelper.sqliteUri(); + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); + final DbStructure dbStructure = new DbStructure(dbDialect); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); + + final Schema keySchemaA = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .build(); + final Schema valueSchemaA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build(); + final Struct keyA = new Struct(keySchemaA) + .put("id", 1234L); + final Struct valueA = new Struct(valueSchemaA) + .put("name", "cuba"); + final SinkRecord recordA = new SinkRecord("dummy", 0, keySchemaA, keyA, valueSchemaA, valueA, 0); + final SinkRecord recordADelete = new SinkRecord("dummy", 0, keySchemaA, keyA, null, null, 0); + + final Schema schemaB = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + final Struct valueB = new Struct(schemaB) + .put("name", "cuba") + .put("age", 4); + final SinkRecord recordB = new SinkRecord("dummy", 1, keySchemaA, keyA, schemaB, valueB, 1); + + // test records are batched correctly based on schema equality as records are added + // (schemaA,schemaA,schemaA,schemaB,schemaA) -> ([schemaA,schemaA,schemaA],[schemaB],[schemaA]) + + assertEquals(Collections.emptyList(), buffer.add(recordA)); + assertEquals(Collections.emptyList(), buffer.add(recordA)); + + // delete should not cause a flush (i.e. not treated as a schema change) + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + + assertEquals(Arrays.asList(recordA, recordA, recordADelete), buffer.add(recordB)); + + assertEquals(Collections.singletonList(recordB), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.flush()); + } + + @Test + public void insertThenDeleteThenInsertInBatchFlush() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("delete.enabled", true); + props.put("insert.mode", "upsert"); + props.put("pk.mode", "record_key"); + props.put("batch.size", 1000); // sufficiently high to not cause flushes due to buffer being full + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final String url = sqliteHelper.sqliteUri(); + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); + final DbStructure dbStructure = new DbStructure(dbDialect); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); + + final Schema keySchemaA = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .build(); + final Schema valueSchemaA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build(); + final Struct keyA = new Struct(keySchemaA) + .put("id", 1234L); + final Struct valueA = new Struct(valueSchemaA) + .put("name", "cuba"); + final SinkRecord recordA = new SinkRecord("dummy", 0, keySchemaA, keyA, valueSchemaA, valueA, 0); + final SinkRecord recordADelete = new SinkRecord("dummy", 0, keySchemaA, keyA, null, null, 0); + + final Schema schemaB = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + final Struct valueB = new Struct(schemaB) + .put("name", "cuba") + .put("age", 4); + final SinkRecord recordB = new SinkRecord("dummy", 1, keySchemaA, keyA, schemaB, valueB, 1); + + // test records are batched correctly based on schema equality as records are added + // (schemaA,schemaA,schemaA,schemaB,schemaA) -> ([schemaA,schemaA,schemaA],[schemaB],[schemaA]) + + assertEquals(Collections.emptyList(), buffer.add(recordA)); + assertEquals(Collections.emptyList(), buffer.add(recordA)); + + // delete should not cause a flush (i.e. not treated as a schema change) + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + + // insert after default should flush to insure insert isn't lost in batching + assertEquals(Arrays.asList(recordA, recordA, recordADelete), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.add(recordB)); + + assertEquals(Collections.singletonList(recordB), buffer.add(recordA)); + + assertEquals(Collections.singletonList(recordA), buffer.flush()); + } + @Test public void testFlushSuccessNoInfo() throws SQLException { final HashMap props = new HashMap<>(); @@ -122,10 +238,10 @@ public void testFlushSuccessNoInfo() throws SQLException { final DbStructure dbStructureMock = mock(DbStructure.class); when(dbStructureMock.createOrAmendIfNecessary(Matchers.any(JdbcSinkConfig.class), - Matchers.any(Connection.class), - Matchers.any(TableId.class), - Matchers.any(FieldsMetadata.class))) - .thenReturn(true); + Matchers.any(Connection.class), + Matchers.any(TableId.class), + Matchers.any(FieldsMetadata.class))) + .thenReturn(true); PreparedStatement preparedStatementMock = mock(PreparedStatement.class); when(preparedStatementMock.executeBatch()).thenReturn(batchResponse); @@ -135,7 +251,7 @@ public void testFlushSuccessNoInfo() throws SQLException { final TableId tableId = new TableId(null, null, "dummy"); final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, - dbStructureMock, connectionMock); + dbStructureMock, connectionMock); final Schema schemaA = SchemaBuilder.struct().field("name", Schema.STRING_SCHEMA).build(); final Struct valueA = new Struct(schemaA).put("name", "cuba"); @@ -165,10 +281,10 @@ public void testInsertModeUpdate() throws SQLException { final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); final DbStructure dbStructureMock = mock(DbStructure.class); when(dbStructureMock.createOrAmendIfNecessary(Matchers.any(JdbcSinkConfig.class), - Matchers.any(Connection.class), - Matchers.any(TableId.class), - Matchers.any(FieldsMetadata.class))) - .thenReturn(true); + Matchers.any(Connection.class), + Matchers.any(TableId.class), + Matchers.any(FieldsMetadata.class))) + .thenReturn(true); final Connection connectionMock = mock(Connection.class); final TableId tableId = new TableId(null, null, "dummy"); @@ -183,4 +299,63 @@ public void testInsertModeUpdate() throws SQLException { Mockito.verify(connectionMock, Mockito.times(1)).prepareStatement(Matchers.eq("UPDATE `dummy` SET `name` = ?")); } + + @Test + public void deleteThenInsertInBatchNoFlush() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("delete.enabled", true); + props.put("insert.mode", "upsert"); + props.put("pk.mode", "record_key"); + props.put("batch.size", 1000); // sufficiently high to not cause flushes due to buffer being full + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final String url = sqliteHelper.sqliteUri(); + sqliteHelper.createTable("CREATE TABLE dummy (id INT PRIMARY KEY, name VARCHAR(256) );"); // table must exist! + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(url, config); + final DbStructure dbStructure = new DbStructure(dbDialect); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, sqliteHelper.connection); + + final Schema keySchemaA = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .build(); + final Schema valueSchemaA = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build(); + final Struct keyA = new Struct(keySchemaA) + .put("id", 1234L); + final Struct valueA = new Struct(valueSchemaA) + .put("name", "cuba"); + final SinkRecord recordA = new SinkRecord("dummy", 0, keySchemaA, keyA, valueSchemaA, valueA, 0); + final SinkRecord recordADelete = new SinkRecord("dummy", 0, keySchemaA, keyA, null, null, 0); + + final Schema schemaB = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + final Struct valueB = new Struct(schemaB) + .put("name", "cuba") + .put("age", 4); + final SinkRecord recordB = new SinkRecord("dummy", 1, keySchemaA, keyA, schemaB, valueB, 1); + + // test records are batched correctly based on schema equality as records are added + // (schemaA,schemaA,schemaA,schemaB,schemaA) -> ([schemaA,schemaA,schemaA],[schemaB],[schemaA]) + + // delete should not cause a flush (i.e. not treated as a schema change) + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + assertEquals(Collections.emptyList(), buffer.add(recordADelete)); + // insert following delete should flush + assertEquals(Arrays.asList(recordADelete, recordADelete), buffer.add(recordA)); + assertEquals(Collections.emptyList(), buffer.add(recordA)); + // schema change should flush + assertEquals(Arrays.asList(recordA, recordA), buffer.add(recordB)); + // schema change should flush + assertEquals(Collections.singletonList(recordB), buffer.add(recordA)); + // schema change should flush + assertEquals(Collections.singletonList(recordA), buffer.flush()); + } } diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java index aea8a038c..c4d0de12e 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcDbWriterTest.java @@ -74,6 +74,99 @@ private JdbcDbWriter newWriter(Map props) { return new JdbcDbWriter(config, dialect, dbStructure); } + @Test + public void idempotentDeletes() throws SQLException { + String topic = "books"; + int partition = 7; + long offset = 42; + + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", "true"); + props.put("delete.enabled", "true"); + props.put("pk.mode", "record_key"); + props.put("insert.mode", "upsert"); + + writer = newWriter(props); + + Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + Struct keyStruct = new Struct(keySchema).put("id", 0L); + + Schema valueSchema = SchemaBuilder.struct() + .field("author", Schema.STRING_SCHEMA) + .field("title", Schema.STRING_SCHEMA) + .build(); + + Struct valueStruct = new Struct(valueSchema) + .put("author", "Tom Robbins") + .put("title", "Villa Incognito"); + + SinkRecord record = new SinkRecord(topic, partition, keySchema, keyStruct, valueSchema, valueStruct, offset); + + writer.write(Collections.nCopies(2, record)); + + SinkRecord deleteRecord = new SinkRecord(topic, partition, keySchema, keyStruct, null, null, offset); + writer.write(Collections.nCopies(2, deleteRecord)); + + assertEquals( + 1, + sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(0, rs.getInt(1)); + } + }) + ); + } + + @Test + public void insertDeleteInsertSameRecord() throws SQLException { + String topic = "books"; + int partition = 7; + long offset = 42; + + Map props = new HashMap<>(); + props.put("connection.url", sqliteHelper.sqliteUri()); + props.put("auto.create", "true"); + props.put("delete.enabled", "true"); + props.put("pk.mode", "record_key"); + props.put("insert.mode", "upsert"); + + writer = newWriter(props); + + Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + Struct keyStruct = new Struct(keySchema).put("id", 0L); + + Schema valueSchema = SchemaBuilder.struct() + .field("author", Schema.STRING_SCHEMA) + .field("title", Schema.STRING_SCHEMA) + .build(); + + Struct valueStruct = new Struct(valueSchema) + .put("author", "Tom Robbins") + .put("title", "Villa Incognito"); + + SinkRecord record = new SinkRecord(topic, partition, keySchema, keyStruct, valueSchema, valueStruct, offset); + SinkRecord deleteRecord = new SinkRecord(topic, partition, keySchema, keyStruct, null, null, offset); + writer.write(Collections.singletonList(record)); + writer.write(Collections.singletonList(deleteRecord)); + writer.write(Collections.singletonList(record)); + + assertEquals( + 1, + sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { + @Override + public void read(ResultSet rs) throws SQLException { + assertEquals(1, rs.getInt(1)); + } + }) + ); + } + @Test public void autoCreateWithAutoEvolve() throws SQLException { String topic = "books"; diff --git a/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java b/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java index 62903c671..0898b8092 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/PreparedStatementBinderTest.java @@ -1,21 +1,24 @@ /* - * Copyright 2016 Confluent Inc. + * Copyright 2018 Confluent Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.confluent.io/confluent-community-license * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.confluent.connect.jdbc.sink; +import java.time.ZoneOffset; +import java.util.Calendar; +import java.util.TimeZone; + import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -137,89 +140,99 @@ public void bindRecordInsert() throws SQLException, ParseException { verify(statement, times(1)).setDouble(index++, valueStruct.getFloat64("double")); verify(statement, times(1)).setBytes(index++, valueStruct.getBytes("bytes")); verify(statement, times(1)).setBigDecimal(index++, (BigDecimal) valueStruct.get("decimal")); - verify(statement, times(1)).setDate(index++, new java.sql.Date(((java.util.Date) valueStruct.get("date")).getTime()), DateTimeUtils.UTC_CALENDAR.get()); - verify(statement, times(1)).setTime(index++, new java.sql.Time(((java.util.Date) valueStruct.get("time")).getTime()), DateTimeUtils.UTC_CALENDAR.get()); - verify(statement, times(1)).setTimestamp(index++, new java.sql.Timestamp(((java.util.Date) valueStruct.get("timestamp")).getTime()), DateTimeUtils.UTC_CALENDAR.get()); + Calendar utcCalendar = DateTimeUtils.UTC_CALENDAR.get(); + verify( + statement, + times(1) + ).setDate(index++, new java.sql.Date(((java.util.Date) valueStruct.get("date")).getTime()), utcCalendar); + verify( + statement, + times(1) + ).setTime(index++, new java.sql.Time(((java.util.Date) valueStruct.get("time")).getTime()), utcCalendar); + verify( + statement, + times(1) + ).setTimestamp(index++, new java.sql.Timestamp(((java.util.Date) valueStruct.get("timestamp")).getTime()), utcCalendar); // last field is optional and is null-valued in struct verify(statement, times(1)).setObject(index++, null); } - @Test - public void bindRecordUpsertMode() throws SQLException, ParseException { - Schema valueSchema = SchemaBuilder.struct().name("com.example.Person") - .field("firstName", Schema.STRING_SCHEMA) - .field("long", Schema.INT64_SCHEMA) - .build(); + @Test + public void bindRecordUpsertMode() throws SQLException, ParseException { + Schema valueSchema = SchemaBuilder.struct().name("com.example.Person") + .field("firstName", Schema.STRING_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .build(); - Struct valueStruct = new Struct(valueSchema) - .put("firstName", "Alex") - .put("long", (long) 12425436); + Struct valueStruct = new Struct(valueSchema) + .put("firstName", "Alex") + .put("long", (long) 12425436); - SchemaPair schemaPair = new SchemaPair(null, valueSchema); + SchemaPair schemaPair = new SchemaPair(null, valueSchema); - JdbcSinkConfig.PrimaryKeyMode pkMode = JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE; + JdbcSinkConfig.PrimaryKeyMode pkMode = JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE; - List pkFields = Collections.singletonList("long"); + List pkFields = Collections.singletonList("long"); - FieldsMetadata fieldsMetadata = FieldsMetadata.extract("people", pkMode, pkFields, Collections.emptySet(), schemaPair); + FieldsMetadata fieldsMetadata = FieldsMetadata.extract("people", pkMode, pkFields, Collections.emptySet(), schemaPair); - PreparedStatement statement = mock(PreparedStatement.class); + PreparedStatement statement = mock(PreparedStatement.class); - PreparedStatementBinder binder = new PreparedStatementBinder( - dialect, - statement, - pkMode, - schemaPair, - fieldsMetadata, JdbcSinkConfig.InsertMode.UPSERT - ); + PreparedStatementBinder binder = new PreparedStatementBinder( + dialect, + statement, + pkMode, + schemaPair, + fieldsMetadata, JdbcSinkConfig.InsertMode.UPSERT + ); - binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); + binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); - int index = 1; - // key field first - verify(statement, times(1)).setLong(index++, valueStruct.getInt64("long")); - // rest in order of schema def - verify(statement, times(1)).setString(index++, valueStruct.getString("firstName")); - } + int index = 1; + // key field first + verify(statement, times(1)).setLong(index++, valueStruct.getInt64("long")); + // rest in order of schema def + verify(statement, times(1)).setString(index++, valueStruct.getString("firstName")); + } - @Test - public void bindRecordUpdateMode() throws SQLException, ParseException { - Schema valueSchema = SchemaBuilder.struct().name("com.example.Person") - .field("firstName", Schema.STRING_SCHEMA) - .field("long", Schema.INT64_SCHEMA) - .build(); + @Test + public void bindRecordUpdateMode() throws SQLException, ParseException { + Schema valueSchema = SchemaBuilder.struct().name("com.example.Person") + .field("firstName", Schema.STRING_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .build(); - Struct valueStruct = new Struct(valueSchema) - .put("firstName", "Alex") - .put("long", (long) 12425436); + Struct valueStruct = new Struct(valueSchema) + .put("firstName", "Alex") + .put("long", (long) 12425436); - SchemaPair schemaPair = new SchemaPair(null, valueSchema); + SchemaPair schemaPair = new SchemaPair(null, valueSchema); - JdbcSinkConfig.PrimaryKeyMode pkMode = JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE; + JdbcSinkConfig.PrimaryKeyMode pkMode = JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE; - List pkFields = Collections.singletonList("long"); + List pkFields = Collections.singletonList("long"); - FieldsMetadata fieldsMetadata = FieldsMetadata.extract("people", pkMode, pkFields, - Collections.emptySet(), schemaPair); + FieldsMetadata fieldsMetadata = FieldsMetadata.extract("people", pkMode, pkFields, + Collections.emptySet(), schemaPair); - PreparedStatement statement = mock(PreparedStatement.class); + PreparedStatement statement = mock(PreparedStatement.class); - PreparedStatementBinder binder = new PreparedStatementBinder( - dialect, - statement, - pkMode, - schemaPair, - fieldsMetadata, JdbcSinkConfig.InsertMode.UPDATE - ); + PreparedStatementBinder binder = new PreparedStatementBinder( + dialect, + statement, + pkMode, + schemaPair, + fieldsMetadata, JdbcSinkConfig.InsertMode.UPDATE + ); - binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); + binder.bindRecord(new SinkRecord("topic", 0, null, null, valueSchema, valueStruct, 0)); - int index = 1; + int index = 1; - // non key first - verify(statement, times(1)).setString(index++, valueStruct.getString("firstName")); - // last the keys - verify(statement, times(1)).setLong(index++, valueStruct.getInt64("long")); - } + // non key first + verify(statement, times(1)).setString(index++, valueStruct.getString("firstName")); + // last the keys + verify(statement, times(1)).setLong(index++, valueStruct.getInt64("long")); + } -} +} \ No newline at end of file