From 6187d54ecf2b40421f1609e5631471248e247168 Mon Sep 17 00:00:00 2001 From: mukesh-ctds Date: Thu, 25 Jul 2024 10:44:51 +0530 Subject: [PATCH] Updated configs --- pom.xml | 2 +- .../integration/containers/DebeziumMySQLContainer.java | 2 +- .../containers/DebeziumPostgreSqlContainer.java | 2 +- .../tests/integration/io/sources/SourceTester.java | 6 ++++++ .../sources/debezium/DebeziumMongoDbSourceTester.java | 10 ++++++++++ .../io/sources/debezium/DebeziumMsSqlSourceTester.java | 4 ++-- .../io/sources/debezium/DebeziumMySqlSourceTester.java | 2 +- .../sources/debezium/DebeziumOracleDbSourceTester.java | 2 +- .../debezium/DebeziumPostgreSqlSourceTester.java | 7 ++----- .../debezium/PulsarDebeziumOracleSourceTest.java | 2 +- .../io/sources/debezium/PulsarDebeziumSourcesTest.java | 4 ++-- 11 files changed, 28 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index 49f2fb06338d2..49492423caf24 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,7 @@ flexible messaging model and an intuitive client API. 334 2.13 2.13.10 - 2.6.2.Final + 2.6.1.Final 42.5.0 8.0.30 diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java index 6d87dd65eef15..7381c8e1b548f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java @@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends ChaosContainer { protected int numEntriesToInsert = 1; protected int numEntriesExpectAfterStart = 9; + /* + *In Debezium 2.5, they introduced several new timestamp fields, + * ts_us, and ts_ns, which represent the millisecond-based time values in microseconds and nanoseconds respectively. + */ public static final Set DEBEZIUM_FIELD_SET = new HashSet() {{ add("before"); add("after"); add("source"); add("op"); add("ts_ms"); + add("ts_us"); + add("ts_ns"); add("transaction"); }}; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java index 9e2e2bb6feddf..99cb45ad2c2fb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java @@ -44,8 +44,15 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) { this.pulsarCluster = cluster; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; + /* + *The `mongodb.connection.string` property replaces the deprecated `mongodb.hosts` property in release 2.2 + * that was used to provide earlier versions of the connector with the host address of the configuration server replica. + * In the current release, use mongodb.connection.string to provide the connector with the addresses of MongoDB routers, + * also known as mongos. + */ sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); sourceConfig.put("mongodb.connection.string", "mongodb://" + DebeziumMongoDbContainer.NAME + ":27017/?replicaSet=rs0"); + sourceConfig.put("mongodb.name", "dbserver1"); sourceConfig.put("mongodb.user", "debezium"); sourceConfig.put("mongodb.password", "dbz"); sourceConfig.put("mongodb.task.id", "1"); @@ -67,6 +74,9 @@ public void prepareSource() throws Exception { log.info("debezium mongodb server already contains preconfigured data."); } + /* + * mongo is deprecated in 2.6.1.Final release and now we have use mongosh instead + */ @Override public void prepareInsertEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java index 6e617d919b398..ca7bcaecae74d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java @@ -150,12 +150,12 @@ public int initialDelayForMsgReceive() { @Override public String keyContains() { - return "mssql.dbo.customers.Key"; + return "mssql.TestDB.dbo.customers.Key"; } @Override public String valueContains() { - return "mssql.dbo.customers.Value"; + return "mssql.TestDB.dbo.customers.Value"; } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java index 90ea2a55af679..82344e0fc46af 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java @@ -34,7 +34,7 @@ * It reads binlog from MySQL, and store the debezium output into Pulsar. * This test verify that the target topic contains wanted number messages. * - * Debezium MySQL Container is "debezium/example-mysql:0.8", + * Debezium MySQL Container is "debezium/example-mysql:2.5.0.Final", * which is a MySQL database server preconfigured with an inventory database. */ @Slf4j diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java index d9a4dcd5c68e7..39e26ce5d7045 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java @@ -52,7 +52,7 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) { super(NAME); this.pulsarCluster = cluster; this.numEntriesToInsert = 1; - this.numEntriesExpectAfterStart = 0; + this.numEntriesExpectAfterStart = 1; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java index ecbb5ea901ad4..b316983565f9f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java @@ -37,7 +37,7 @@ * It reads binlog from Postgres, and store the debezium output into Pulsar. * This test verify that the target topic contains wanted number messages. * - * Debezium Postgresql Container is "debezium/example-postgres:0.10", + * Debezium Postgresql Container is "debezium/example-postgres:2.5.0.Final", * which is a Postgresql database server preconfigured with an inventory database. */ @Slf4j @@ -76,13 +76,10 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) { sourceConfig.put("database.server.name", "dbserver1"); sourceConfig.put("database.dbname", "postgres"); sourceConfig.put("schema.whitelist", "inventory"); - sourceConfig.put("plugin.name", "pgoutput"); + sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom"); sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); - sourceConfig.put("schema.history.internal", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"); sourceConfig.put("topic.namespace", "debezium/postgresql"); sourceConfig.put("topic.prefix", "dbserver1"); - sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom"); - sourceConfig.put("table.include.list", "inventory"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java index 1da0c1243972a..ffece0878b75b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java @@ -56,7 +56,7 @@ private void testDebeziumOracleDbConnect(String converterClassName, boolean json final String sourceName = "test-source-debezium-oracle-" + functionRuntimeType + "-" + randomName(8); // This is the event count to be created by prepareSource. - final int numMessages = 1; + final int numMessages = 39; @Cleanup PulsarClient client = PulsarClient.builder() diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java index 2791e3012b214..b3db8dad3c748 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java @@ -93,7 +93,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit + "-" + functionRuntimeType + "-" + randomName(8); // This is the binlog count that contained in mysql container. - final int numMessages = 47; + final int numMessages = 52; @Cleanup PulsarClient client = PulsarClient.builder() @@ -138,7 +138,7 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8); // This is the binlog count that contained in postgresql container. - final int numMessages = 26; + final int numMessages = 29; @Cleanup PulsarClient client = PulsarClient.builder()