Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adform changes #2

Open
wants to merge 21 commits into
base: 5.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.confluent</groupId>
<artifactId>common</artifactId>
<version>5.0.0</version>
<version>5.2.1</version>
</parent>

<groupId>io.confluent</groupId>
<groupId>io.confluent.adform</groupId>
<artifactId>kafka-connect-jdbc</artifactId>
<packaging>jar</packaging>
<name>kafka-connect-jdbc</name>
Expand All @@ -36,7 +36,7 @@
</organization>
<url>http://confluent.io</url>
<description>
A Kafka Connect JDBC connector for copying data between databases and Kafka.
A Kafka Connect JDBC connector for copying data between databases and Kafka.
</description>

<licenses>
Expand Down Expand Up @@ -83,7 +83,7 @@
<scope>provided</scope>
</dependency>

<!-- JDBC drivers, only included in runtime so they get packaged -->
<!-- JDBC drivers, only included in runtime so they get packaged -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
Expand Down Expand Up @@ -156,7 +156,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>-Werror</arg>
<!--<arg>-Werror</arg>-->
</compilerArgs>
</configuration>
</plugin>
Expand All @@ -171,18 +171,32 @@
</goals>
<configuration>
<title>Kafka Connect JDBC</title>
<documentationUrl>https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html</documentationUrl>
<documentationUrl>https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html
</documentationUrl>
<description>
The JDBC source connector allows you to import data from any relational database with a JDBC driver into Kafka topics. By using JDBC, this connector can support a wide variety of databases without requiring custom code for each one.
The JDBC source connector allows you to import data from any relational database with a
JDBC driver into Kafka topics. By using JDBC, this connector can support a wide variety
of databases without requiring custom code for each one.

Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. By default, all tables in a database are copied, each to its own output topic. The database is monitored for new or deleted tables and adapts automatically. When copying data from a table, the connector can load only new or modified rows by specifying which columns should be used to detect new or modified data.
Data is loaded by periodically executing a SQL query and creating an output record for
each row in the result set. By default, all tables in a database are copied, each to its
own output topic. The database is monitored for new or deleted tables and adapts
automatically. When copying data from a table, the connector can load only new or
modified rows by specifying which columns should be used to detect new or modified data.

The JDBC sink connector allows you to export data from Kafka topics to any relational database with a JDBC driver. By using JDBC, this connector can support a wide variety of databases without requiring a dedicated connector for each one. The connector polls data from Kafka to write to the database based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables, and limited auto-evolution is also supported.
The JDBC sink connector allows you to export data from Kafka topics to any relational
database with a JDBC driver. By using JDBC, this connector can support a wide variety of
databases without requiring a dedicated connector for each one. The connector polls data
from Kafka to write to the database based on the topics subscription. It is possible to
achieve idempotent writes with upserts. Auto-creation of tables, and limited
auto-evolution is also supported.
</description>
<logo>logos/jdbc.jpg</logo>

<supportProviderName>Confluent, Inc.</supportProviderName>
<supportSummary>Confluent supports the JDBC sink and source connectors alongside community members as part of its Confluent Platform open source offering.</supportSummary>
<supportSummary>Confluent supports the JDBC sink and source connectors alongside community
members as part of its Confluent Platform open source offering.
</supportSummary>
<supportUrl>https://docs.confluent.io/current/</supportUrl>
<supportLogo>logos/confluent.png</supportLogo>

Expand Down Expand Up @@ -248,7 +262,7 @@
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<phase>none</phase>
<configuration>
<suppressionsLocation>checkstyle/suppressions.xml</suppressionsLocation>
</configuration>
Expand All @@ -271,19 +285,19 @@
<profile>
<id>rpm</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>rpm-maven-plugin</artifactId>
<version>2.1.5</version>
<executions>
<execution>
<id>generate-rpm</id>
<goals>
<goal>rpm</goal>
</goals>
</execution>
</executions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>rpm-maven-plugin</artifactId>
<version>2.1.5</version>
<executions>
<execution>
<id>generate-rpm</id>
<goals>
<goal>rpm</goal>
</goals>
</execution>
</executions>
<configuration>
<group>Applications/Internet</group>
<packager>Confluent Packaging</packager>
Expand All @@ -298,7 +312,8 @@
<directory>/usr/share/java/${project.artifactId}</directory>
<sources>
<source>
<location>${project.package.home}/share/java/${project.artifactId}</location>
<location>${project.package.home}/share/java/${project.artifactId}
</location>
</source>
</sources>
</mapping>
Expand Down Expand Up @@ -356,12 +371,20 @@
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i ${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-jdbc</argument>
<argument>-i
${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-jdbc
</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses.html</argument>
<argument>-l ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses</argument>
<argument>-n ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/notices</argument>
<argument>-h
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses.html
</argument>
<argument>-l
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses
</argument>
<argument>-n
${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/notices
</argument>
<argument>-t ${project.name}</argument>
<argument>-x licenses-${licenses.version}.jar</argument>
</arguments>
Expand Down Expand Up @@ -391,8 +414,8 @@
</plugins>
</build>
</profile>
<profile>
<id>licenses-source</id>
<profile>
<id>licenses-source</id>
<build>
<plugins>
<plugin>
Expand All @@ -406,7 +429,9 @@
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i ${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-jdbc</argument>
<argument>-i
${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-jdbc
</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h ${project.basedir}/licenses.html</argument>
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/VerticaSinkConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.connect.jdbc;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.VerticaSinkTask;
import io.confluent.connect.jdbc.util.Version;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Cannot inherit from JdbcSinkConnector as it is final
* We only need to override {@link #taskClass()} to return our VerticaSinkTask
*/
public class VerticaSinkConnector extends SinkConnector {

private static final Logger log = LoggerFactory.getLogger(VerticaSinkConnector.class);

private Map<String, String> configProps;

public Class<? extends Task> taskClass() {
return VerticaSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
configs.add(configProps);
}
return configs;
}

@Override
public void start(Map<String, String> props) {
configProps = props;
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return JdbcSinkConfig.CONFIG_DEF;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
// TODO cross-fields validation here: pkFields against the pkMode
return super.validate(connectorConfigs);
}

@Override
public String version() {
return Version.getVersion();
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.jdbc.dialect;
Expand Down Expand Up @@ -358,9 +357,27 @@ String buildUpdateStatement(
String buildUpsertQueryStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields
);

/**
* Build the DELETE prepared statement expression for the given table and its columns. Variables
* for each key column should also appear in the WHERE clause of the statement.
*
* @param table the identifier of the table; may not be null
* @param keyColumns the identifiers of the columns in the primary/unique key; may not be null
* but may be empty
* @return the delete statement; may not be null
* @throws UnsupportedOperationException if the dialect does not support deletes
*/
default String buildDeleteStatement(
TableId table,
Collection<ColumnId> keyColumns
) {
throw new UnsupportedOperationException();
}

/**
* Build the DROP TABLE statement expression for the given table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.data.Timestamp;

import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// http://lpar.ath0.com/2013/08/12/upsert-in-db2/
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.data.Timestamp;

import java.util.Collection;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.SubprotocolBasedProvider;
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField;
Expand Down Expand Up @@ -112,10 +113,10 @@ protected String getSqlType(SinkRecordField field) {

@Override
public String buildUpsertQueryStatement(
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
final TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns,
Map<String, SinkRecordField> allFields) {
// http://lpar.ath0.com/2013/08/12/upsert-in-db2/
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand Down
Loading