Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration field for minimalWaitForWindow in driver #34

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
<!-- Docs claim java 8 supported, but support is deprecated -->
<kafka.version>3.3.1</kafka.version>
<scylla.driver.version>3.11.2.1</scylla.driver.version>
<scylla.cdc.java.version>1.3.0</scylla.cdc.java.version>
<scylla.cdc.java.version>1.3.1-WINDOW</scylla.cdc.java.version>
<flogger.version>0.5.1</flogger.version>
<!-- added for transitive dependencies -->
<log4j.version>2.17.1</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
.withValidation(Field::isNonNegativeInteger)
.withDefault(30000);

public static final Field MINIMAL_WAIT_FOR_WINDOW_MS = Field.create("scylla.minimal.wait.for.window.time")
.withDisplayName("Minimal 'waitForWindow' time (ms)")
.withType(ConfigDef.Type.INT)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Minimal time between reading consecutive CDC log windows. Meant to be used as a simple throttling mechanism " +
"in situations where driver has a lot of old data to catch up with and ends up hogging resources. " +
"Value expressed in milliseconds.")
.withValidation(Field::isNonNegativeInteger)
.withDefault(0);

public static final CQLConfiguration.ConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = CQLConfiguration.ConsistencyLevel.QUORUM;
public static final Field CONSISTENCY_LEVEL = Field.create("scylla.consistency.level")
.withDisplayName("Consistency Level")
Expand Down Expand Up @@ -182,7 +193,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("Scylla")
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, LOCAL_DC_NAME, SSL_ENABLED, SSL_PROVIDER, SSL_TRUSTSTORE_PATH, SSL_TRUSTSTORE_PASSWORD, SSL_KEYSTORE_PATH, SSL_KEYSTORE_PASSWORD,SSL_CIPHER_SUITES, SSL_OPENSLL_KEYCERTCHAIN, SSL_OPENSLL_PRIVATEKEY)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE, MINIMAL_WAIT_FOR_WINDOW_MS)
.events(TABLE_NAMES)
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
// Exclude some Debezium options, which are not applicable/not supported by
Expand Down Expand Up @@ -270,6 +281,10 @@ public long getConfidenceWindowSizeMs() {
return config.getInteger(ScyllaConnectorConfig.CONFIDENCE_WINDOW_SIZE);
}

public long getMinimalWaitForWindowMs() {
return config.getInteger(ScyllaConnectorConfig.MINIMAL_WAIT_FOR_WINDOW_MS);
}

public long getHeartbeatIntervalMs() {
return config.getInteger(Heartbeat.HEARTBEAT_INTERVAL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
.withConsumer(changeConsumer)
.withQueryTimeWindowSizeMs(configuration.getQueryTimeWindowSizeMs())
.withConfidenceWindowSizeMs(configuration.getConfidenceWindowSizeMs())
.withMinimalWaitForWindowMs(configuration.getMinimalWaitForWindowMs())
.build();
Worker worker = new Worker(workerConfiguration);
try {
Expand Down