Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent 6070bc6 commit 7a5ffeb
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 76 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -58,6 +58,6 @@ public ConfigDef config() {

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
return CosmosConstants.CURRENT_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
Expand Down Expand Up @@ -100,7 +100,7 @@ public ConfigDef config() {

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
return CosmosConstants.CURRENT_VERSION;
}

private List<Map<String, String>> getTaskConfigs(int maxTasks) {
Expand Down Expand Up @@ -315,7 +315,7 @@ private List<FeedRange> getFeedRanges(CosmosContainerProperties containerPropert
.getContainer(containerProperties.getId())
.getFeedRanges()
.onErrorMap(throwable ->
KafkaCosmosExceptionsHelper.convertToConnectException(
CosmosExceptionsHelper.convertToConnectException(
throwable,
"GetFeedRanges failed for container " + containerProperties.getId()))
.block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
}

return KafkaCosmosConstants.USER_AGENT_SUFFIX;
return CosmosConstants.USER_AGENT_SUFFIX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Common Configuration for Cosmos DB Kafka source connector and sink connector.
*/
public class KafkaCosmosConfig extends AbstractConfig {
public class CosmosConfig extends AbstractConfig {
protected static final ConfigDef.Validator NON_EMPTY_STRING = new ConfigDef.NonEmptyString();
private static final String CONFIG_PREFIX = "kafka.connect.cosmos.";

Expand Down Expand Up @@ -132,7 +132,7 @@ public class KafkaCosmosConfig extends AbstractConfig {
private final CosmosAccountConfig accountConfig;
private final CosmosThroughputControlConfig throughputControlConfig;

public KafkaCosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
public CosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
super(config, parsedConfig);
this.accountConfig = this.parseAccountConfig();
this.throughputControlConfig = this.parseThroughputControlConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.azure.core.util.CoreUtils;

public class KafkaCosmosConstants {
public class CosmosConstants {
public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties";
public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version");
public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;

public class KafkaCosmosExceptionsHelper {
public class CosmosExceptionsHelper {
public static boolean isTransientFailure(int statusCode, int substatusCode) {
return statusCode == KafkaCosmosConstants.StatusCodes.GONE
|| statusCode == KafkaCosmosConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == KafkaCosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == KafkaCosmosConstants.StatusCodes.NOTFOUND && substatusCode == KafkaCosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);
return statusCode == CosmosConstants.StatusCodes.GONE
|| statusCode == CosmosConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == CosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == CosmosConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == CosmosConstants.StatusCodes.NOTFOUND && substatusCode == CosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);

}

Expand All @@ -36,13 +36,13 @@ public static boolean isFeedRangeGoneException(Throwable throwable) {
}

public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) {
return statusCode == KafkaCosmosConstants.StatusCodes.GONE
&& (substatusCode == KafkaCosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == KafkaCosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
return statusCode == CosmosConstants.StatusCodes.GONE
&& (substatusCode == CosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == CosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
}

public static ConnectException convertToConnectException(Throwable throwable, String message) {
if (KafkaCosmosExceptionsHelper.isTransientFailure(throwable)) {
if (CosmosExceptionsHelper.isTransientFailure(throwable)) {
return new RetriableException(message, throwable);
}

Expand All @@ -51,31 +51,31 @@ public static ConnectException convertToConnectException(Throwable throwable, St

public static boolean isResourceExistsException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.CONFLICT;
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.CONFLICT;
}

return false;
}

public static boolean isNotFoundException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.NOTFOUND;
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.NOTFOUND;
}

return false;
}

public static boolean isPreconditionFailedException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.PRECONDITION_FAILED;
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.PRECONDITION_FAILED;
}

return false;
}

public static boolean isTimeoutException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT;
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.REQUEST_TIMEOUT;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class KafkaCosmosSchedulers {
public class CosmosSchedulers {
private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "kafka-cosmos-sink-bounded-elastic";
private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class KafkaCosmosThroughputControlHelper {
public class CosmosThroughputControlHelper {
public static CosmosAsyncContainer tryEnableThroughputControl(
CosmosAsyncContainer container,
CosmosAsyncClient throughputControlCosmosClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.cosmos.kafka.connect.implementation.sink;

import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig;
import com.azure.cosmos.kafka.connect.implementation.CosmosConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

Expand All @@ -15,7 +15,7 @@
/**
* Common Configuration for Cosmos DB Kafka sink connector.
*/
public class CosmosSinkConfig extends KafkaCosmosConfig {
public class CosmosSinkConfig extends CosmosConfig {
private static final String SINK_CONFIG_PREFIX = "kafka.connect.cosmos.sink.";

// error tolerance
Expand Down Expand Up @@ -115,7 +115,7 @@ public CosmosSinkConfig(ConfigDef config, Map<String, ?> parsedConfig) {
}

public static ConfigDef getConfigDef() {
ConfigDef configDef = KafkaCosmosConfig.getConfigDef();
ConfigDef configDef = CosmosConfig.getConfigDef();

defineWriteConfig(configDef);
defineContainersConfig(configDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand All @@ -29,7 +29,7 @@ public class CosmosSinkTask extends SinkTask {

@Override
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
return CosmosConstants.CURRENT_VERSION;
}

@Override
Expand Down Expand Up @@ -96,7 +96,7 @@ record -> this.sinkTaskConfig
.getDatabase(this.sinkTaskConfig.getContainersConfig().getDatabaseName())
.getContainer(containerName);

KafkaCosmosThroughputControlHelper
CosmosThroughputControlHelper
.tryEnableThroughputControl(
container,
this.throughputControlClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosSchedulers;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemRequestOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void writeCore(CosmosAsyncContainer container, List<SinkOperation> sinkOp
.executeBulkOperations(
Flux.fromIterable(itemOperations)
.mergeWith(bulkRetryEmitter.asFlux())
.publishOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC),
.publishOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC),
bulkExecutionOptions);
return cosmosBulkOperationResponseFlux;
})
Expand Down Expand Up @@ -120,7 +120,7 @@ public void writeCore(CosmosAsyncContainer container, List<SinkOperation> sinkOp

return Mono.empty();
})
.subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC)
.subscribeOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC)
.blockLast();
}

Expand All @@ -134,7 +134,7 @@ private CosmosBulkExecutionOptions getBulkExecutionOperations() {
.setMaxConcurrentCosmosPartitions(bulkExecutionOptions, this.writeConfig.getBulkMaxConcurrentCosmosPartitions());
}

KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig);

return bulkExecutionOptions;
}
Expand Down Expand Up @@ -247,7 +247,7 @@ private Mono<Void> scheduleRetry(
return Mono.empty();
});

if (KafkaCosmosExceptionsHelper.isTimeoutException(exception)) {
if (CosmosExceptionsHelper.isTimeoutException(exception)) {
Duration delayDuration = Duration.ofMillis(
MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS
+ RANDOM.nextInt(MAX_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS - MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS));
Expand Down Expand Up @@ -288,16 +288,16 @@ BulkOperationFailedException handleErrorStatusCode(
private boolean shouldIgnore(BulkOperationFailedException failedException) {
switch (this.writeConfig.getItemWriteStrategy()) {
case ITEM_APPEND:
return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException);
return CosmosExceptionsHelper.isResourceExistsException(failedException);
case ITEM_DELETE:
return KafkaCosmosExceptionsHelper.isNotFoundException(failedException);
return CosmosExceptionsHelper.isNotFoundException(failedException);
case ITEM_DELETE_IF_NOT_MODIFIED:
return KafkaCosmosExceptionsHelper.isNotFoundException(failedException)
|| KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException);
return CosmosExceptionsHelper.isNotFoundException(failedException)
|| CosmosExceptionsHelper.isPreconditionFailedException(failedException);
case ITEM_OVERWRITE_IF_NOT_MODIFIED:
return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException)
|| KafkaCosmosExceptionsHelper.isNotFoundException(failedException)
|| KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException);
return CosmosExceptionsHelper.isResourceExistsException(failedException)
|| CosmosExceptionsHelper.isNotFoundException(failedException)
|| CosmosExceptionsHelper.isPreconditionFailedException(failedException);
default:
return false;
}
Expand Down
Loading

0 comments on commit 7a5ffeb

Please sign in to comment.