Skip to content

Commit

Permalink
Updated configs
Browse files Browse the repository at this point in the history
  • Loading branch information
mukesh-ctds committed Jul 25, 2024
1 parent 0656cdb commit 6187d54
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.10</scala-library.version>
<debezium.version>2.6.2.Final</debezium.version>
<debezium.version>2.6.1.Final</debezium.version>
<debezium.postgresql.version>42.5.0</debezium.postgresql.version>
<debezium.mysql.version>8.0.30</debezium.mysql.version>
<!-- Override version that brings CVE-2022-3143 with debezium -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContaine
public static final String NAME = "debezium-mysql-example";
static final Integer[] PORTS = { 3306 };

private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
private static final String IMAGE_NAME = "debezium/example-mysql:2.5.0.Final";

public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DebeziumPostgreSqlContainer extends ChaosContainer<DebeziumPostgreS
public static final String NAME = "debezium-postgresql-example";
static final Integer[] PORTS = { 5432 };

private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
private static final String IMAGE_NAME = "debezium/example-postgres:2.5.0.Final";

public DebeziumPostgreSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected int numEntriesToInsert = 1;
protected int numEntriesExpectAfterStart = 9;

/*
*In Debezium 2.5, they introduced several new timestamp fields,
* ts_us, and ts_ns, which represent the millisecond-based time values in microseconds and nanoseconds respectively.
*/
public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
add("after");
add("source");
add("op");
add("ts_ms");
add("ts_us");
add("ts_ns");
add("transaction");
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

/*
*The `mongodb.connection.string` property replaces the deprecated `mongodb.hosts` property in release 2.2
* that was used to provide earlier versions of the connector with the host address of the configuration server replica.
* In the current release, use mongodb.connection.string to provide the connector with the addresses of MongoDB routers,
* also known as mongos.
*/
sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
sourceConfig.put("mongodb.connection.string", "mongodb://" + DebeziumMongoDbContainer.NAME + ":27017/?replicaSet=rs0");
sourceConfig.put("mongodb.name", "dbserver1");
sourceConfig.put("mongodb.user", "debezium");
sourceConfig.put("mongodb.password", "dbz");
sourceConfig.put("mongodb.task.id", "1");
Expand All @@ -67,6 +74,9 @@ public void prepareSource() throws Exception {
log.info("debezium mongodb server already contains preconfigured data.");
}

/*
* mongo is deprecated in 2.6.1.Final release and now we have use mongosh instead
*/
@Override
public void prepareInsertEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ public int initialDelayForMsgReceive() {

@Override
public String keyContains() {
return "mssql.dbo.customers.Key";
return "mssql.TestDB.dbo.customers.Key";
}

@Override
public String valueContains() {
return "mssql.dbo.customers.Value";
return "mssql.TestDB.dbo.customers.Value";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* It reads binlog from MySQL, and store the debezium output into Pulsar.
* This test verify that the target topic contains wanted number messages.
*
* Debezium MySQL Container is "debezium/example-mysql:0.8",
* Debezium MySQL Container is "debezium/example-mysql:2.5.0.Final",
* which is a MySQL database server preconfigured with an inventory database.
*/
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) {
super(NAME);
this.pulsarCluster = cluster;
this.numEntriesToInsert = 1;
this.numEntriesExpectAfterStart = 0;
this.numEntriesExpectAfterStart = 1;

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* It reads binlog from Postgres, and store the debezium output into Pulsar.
* This test verify that the target topic contains wanted number messages.
*
* Debezium Postgresql Container is "debezium/example-postgres:0.10",
* Debezium Postgresql Container is "debezium/example-postgres:2.5.0.Final",
* which is a Postgresql database server preconfigured with an inventory database.
*/
@Slf4j
Expand Down Expand Up @@ -76,13 +76,10 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.dbname", "postgres");
sourceConfig.put("schema.whitelist", "inventory");
sourceConfig.put("plugin.name", "pgoutput");
sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("schema.history.internal", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory");
sourceConfig.put("topic.namespace", "debezium/postgresql");
sourceConfig.put("topic.prefix", "dbserver1");
sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
sourceConfig.put("table.include.list", "inventory");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testDebeziumOracleDbConnect(String converterClassName, boolean json
final String sourceName = "test-source-debezium-oracle-" + functionRuntimeType + "-" + randomName(8);

// This is the event count to be created by prepareSource.
final int numMessages = 1;
final int numMessages = 39;

@Cleanup
PulsarClient client = PulsarClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
+ "-" + functionRuntimeType + "-" + randomName(8);

// This is the binlog count that contained in mysql container.
final int numMessages = 47;
final int numMessages = 52;

@Cleanup
PulsarClient client = PulsarClient.builder()
Expand Down Expand Up @@ -138,7 +138,7 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);

// This is the binlog count that contained in postgresql container.
final int numMessages = 26;
final int numMessages = 29;

@Cleanup
PulsarClient client = PulsarClient.builder()
Expand Down

0 comments on commit 6187d54

Please sign in to comment.