Skip to content

Commit

Permalink
Backport upsert mode for Vertica to 5.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Julius Vitkauskas committed Mar 21, 2019
1 parent 24d3273 commit 7d30100
Show file tree
Hide file tree
Showing 24 changed files with 196 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,15 @@ String buildDeleteStatement(
* but may be empty
* @param nonKeyColumns the identifiers of the other columns in the table; may not be null but may
* be empty
* @param allFields all table fields; used to get type information for upserts
* @return the upsert/merge statement; may not be null
* @throws UnsupportedOperationException if the dialect does not support upserts
*/
String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
);
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields);

/**
* Build the DROP TABLE statement expression for the given table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.data.Timestamp;

import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// http://lpar.ath0.com/2013/08/12/upsert-in-db2/
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.data.Timestamp;

import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// http://lpar.ath0.com/2013/08/12/upsert-in-db2/
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,10 +1391,10 @@ public String buildUpdateStatement(

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -125,10 +126,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
//MySql doesn't support SQL 2003:merge so here how the upsert is handled
final Transform<ColumnId> transform = (builder, col) -> {
builder.appendIdentifierQuoted(col.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -161,10 +162,10 @@ public List<String> buildAlterTable(

@Override
public String buildUpsertQueryStatement(
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// https://blogs.oracle.com/cmar/entry/using_merge_to_do_an
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -219,10 +220,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
final Transform<ColumnId> transform = (builder, col) -> {
builder.appendIdentifierQuoted(col.name())
.append("=EXCLUDED.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -134,10 +135,10 @@ public List<String> buildAlterTable(

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// https://help.sap.com/hana_one/html/sql_replace_upsert.html
ExpressionBuilder builder = expressionBuilder();
builder.append("UPSERT ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -144,10 +145,10 @@ public List<String> buildAlterTable(

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
ExpressionBuilder builder = expressionBuilder();
builder.append("merge into ");
builder.append(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -114,10 +115,10 @@ public List<String> buildAlterTable(

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
ExpressionBuilder builder = expressionBuilder();
builder.append("INSERT OR REPLACE INTO ");
builder.append(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
Expand Down Expand Up @@ -242,10 +243,10 @@ public List<String> buildAlterTable(

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
ExpressionBuilder builder = expressionBuilder();
builder.append("merge into ");
builder.append(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.confluent.connect.jdbc.dialect;

import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.ExpressionBuilder;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -26,6 +28,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -61,10 +64,17 @@ public VerticaDatabaseDialect(AbstractConfig config) {

@Override
protected String getSqlType(SinkRecordField field) {
return getSqlType(field, true);
}

private String getSqlType(SinkRecordField field, boolean sized) {
if (field.schemaName() != null) {
switch (field.schemaName()) {
case Decimal.LOGICAL_NAME:
return "DECIMAL(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")";
return "DECIMAL" + (
sized ? "(18," + field.schemaParameters().get(Decimal.SCALE_FIELD) + ")"
: ""
);
case Date.LOGICAL_NAME:
return "DATE";
case Time.LOGICAL_NAME:
Expand Down Expand Up @@ -92,9 +102,9 @@ protected String getSqlType(SinkRecordField field) {
case BOOLEAN:
return "BOOLEAN";
case STRING:
return "VARCHAR(1024)";
return "VARCHAR" + (sized ? "(1024)" : "");
case BYTES:
return "VARBINARY(1024)";
return "VARBINARY" + (sized ? "(1024)" : "");
default:
return super.getSqlType(field);
}
Expand All @@ -111,4 +121,70 @@ public List<String> buildAlterTable(
}
return queries;
}

@Override
public String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields
) {
ExpressionBuilder builder = expressionBuilder();
builder.append("MERGE INTO ");
builder.append(table);
builder.append(" AS target USING (SELECT ");
builder.appendList()
.delimitedBy(", ")
.transformedBy((b, input) -> transformTypedParam(b, (ColumnId) input, allFields))
.of(keyColumns, nonKeyColumns);
builder.append(") AS incoming ON (");
builder.appendList()
.delimitedBy(" AND ")
.transformedBy(this::transformAs)
.of(keyColumns);
builder.append(")");
builder.append(" WHEN MATCHED THEN UPDATE SET ");
builder.appendList()
.delimitedBy(",")
.transformedBy(this::transformUpdate)
.of(nonKeyColumns, keyColumns);
builder.append(" WHEN NOT MATCHED THEN INSERT (");
builder.appendList()
.delimitedBy(", ")
.transformedBy(ExpressionBuilder.columnNamesWithPrefix(""))
.of(nonKeyColumns, keyColumns);
builder.append(") VALUES (");
builder.appendList()
.delimitedBy(",")
.transformedBy(ExpressionBuilder.columnNamesWithPrefix("incoming."))
.of(nonKeyColumns, keyColumns);
builder.append(");");
return builder.toString();
}

private void transformAs(ExpressionBuilder builder, ColumnId col) {
builder.append("target.")
.appendIdentifierQuoted(col.name())
.append("=incoming.")
.appendIdentifierQuoted(col.name());
}

private void transformUpdate(ExpressionBuilder builder, ColumnId col) {
builder.appendIdentifierQuoted(col.name())
.append("=incoming.")
.appendIdentifierQuoted(col.name());
}

private void transformTypedParam(
ExpressionBuilder builder,
ColumnId col,
Map<String, SinkRecordField> allFields
) {
SinkRecordField field = allFields.get(col.name());

builder.append("?::")
.append(getSqlType(field, false))
.append(" AS ")
.appendIdentifierQuoted(col.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ private String getInsertSql() {
return dbDialect.buildUpsertQueryStatement(
tableId,
asColumns(fieldsMetadata.keyFieldNames),
asColumns(fieldsMetadata.nonKeyFieldNames)
asColumns(fieldsMetadata.nonKeyFieldNames),
fieldsMetadata.allFields
);
} catch (UnsupportedOperationException e) {
throw new ConnectException(String.format(
Expand Down
Loading

0 comments on commit 7d30100

Please sign in to comment.