diff --git a/pom.xml b/pom.xml
index 5955be3d24b75..49f2fb06338d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,7 +164,7 @@ flexible messaging model and an intuitive client API.
334
2.13
2.13.10
- 1.9.7.Final
+ 2.6.2.Final
42.5.0
8.0.30
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 9e731fe48bbdb..2a0ca8125b0e4 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -88,7 +88,7 @@ public void open(Map config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
// database.history : implementation class for database history.
- setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
+ setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);
// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index 8bad8885a4c55..5d2fbe9a2a368 100644
--- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -26,12 +26,12 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryException;
+import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -52,12 +52,12 @@
import org.apache.pulsar.client.api.Schema;
/**
- * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
+ * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*/
@Slf4j
@ThreadSafe
-public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
.withDisplayName("Database history topic name")
@@ -94,11 +94,11 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
.withValidation(Field::isOptional);
public static final Field.Set ALL_FIELDS = Field.setOf(
- TOPIC,
- SERVICE_URL,
- CLIENT_BUILDER,
- DatabaseHistory.NAME,
- READER_CONFIG);
+ TOPIC,
+ SERVICE_URL,
+ CLIENT_BUILDER,
+ SchemaHistory.NAME,
+ READER_CONFIG);
private final ObjectMapper mapper = new ObjectMapper();
private final DocumentReader reader = DocumentReader.defaultReader();
@@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
public void configure(
Configuration config,
HistoryRecordComparator comparator,
- DatabaseHistoryListener listener,
+ SchemaHistoryListener listener,
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
@@ -148,7 +148,7 @@ public void configure(
}
// Copy the relevant portions of the configuration and add useful defaults ...
- this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+ this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
log.info("Configure to store the debezium database history {} to pulsar topic {}",
dbHistoryName, topicName);
@@ -201,7 +201,7 @@ public void start() {
}
@Override
- protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+ " is called before storing database history records.");
@@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
try {
producer.send(record.toString());
} catch (PulsarClientException e) {
- throw new DatabaseHistoryException(e);
+ throw new SchemaHistoryException(e);
}
}
diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 081cfdcc5435a..8f6badd5c194b 100644
--- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -27,8 +27,8 @@
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
@@ -80,8 +80,8 @@ protected void cleanup() throws Exception {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
- .with(DatabaseHistory.NAME, "my-db-history")
- .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
+ .with(SchemaHistory.NAME, "my-db-history")
+ .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
if (testWithClientBuilder) {
ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString());
@@ -101,7 +101,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
}
// Start up the history ...
- history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
+ history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
history.start();
// Should be able to call start more than once ...
@@ -160,7 +160,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
// Stop the history (which should stop the producer) ...
history.stop();
history = new PulsarDatabaseHistory();
- history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
+ history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
// no need to start
// Recover from the very beginning to just past the first change ...
@@ -240,11 +240,11 @@ public void testExists() throws Exception {
Configuration config = Configuration.create()
.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
.with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
- .with(DatabaseHistory.NAME, "my-db-history")
- .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+ .with(SchemaHistory.NAME, "my-db-history")
+ .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();
- history.configure(config, null, DatabaseHistoryListener.NOOP, true);
+ history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();
// dummytopic should not exist yet
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 696bf9380a8ce..0ef4a6647b455 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -50,9 +50,13 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
sourceConfig.put("mongodb.password", "dbz");
sourceConfig.put("mongodb.task.id","1");
sourceConfig.put("database.include.list", "inventory");
- sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mongodb");
sourceConfig.put("capture.mode", "oplog");
+ sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
+ sourceConfig.put("topic.prefix", "test");
+ sourceConfig.put("collection.include.list", "inventory");
+ sourceConfig.put("mongodb.connection.string", "mongodb://" + DebeziumMongoDbContainer.NAME + ":27017/?replicaSet=rs0");
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index f1bd8a824c083..c49bca716478e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -63,8 +63,12 @@ public DebeziumMsSqlSourceTester(PulsarCluster cluster) {
sourceConfig.put("database.server.name", "mssql");
sourceConfig.put("database.dbname", "TestDB");
sourceConfig.put("snapshot.mode", "schema_only");
- sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mssql");
+ sourceConfig.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
+ sourceConfig.put("topic.prefix", "test");
+ sourceConfig.put("database.names", "TestDB");
+ sourceConfig.put("table.include.list", "inventory");
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 7958fa019925f..c907dc6772405 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -63,12 +63,16 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
if (!testWithClientBuilder) {
- sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
}
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
(converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
+ sourceConfig.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");
+ sourceConfig.put("topic.prefix", "test");
+ sourceConfig.put("database.include.list", "inventory");
+ sourceConfig.put("include.schema.changes", "true");
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 40078d67365ca..d52bbb97a10f9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -63,10 +63,11 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) {
sourceConfig.put("database.server.name", "XE");
sourceConfig.put("database.dbname", "XE");
sourceConfig.put("snapshot.mode", "schema_only");
-
sourceConfig.put("schema.include.list", "inv");
- sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/oracle");
+ sourceConfig.put("connector.class", "io.debezium.connector.oracle.OracleConnector");
+ sourceConfig.put("topic.prefix", "test");
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
index 80401c60b6e9f..283292e9284ba 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
@@ -76,8 +76,11 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
sourceConfig.put("database.dbname", "postgres");
sourceConfig.put("schema.whitelist", "inventory");
sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
- sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/postgresql");
+ sourceConfig.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
+ sourceConfig.put("topic.prefix", "test");
+ sourceConfig.put("table.include.list", "inventory");
}
@Override