diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java index 7526b1449ef01..dc05ff76f91ec 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java @@ -3,10 +3,12 @@ package com.azure.cosmos.kafka.connect; -import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.slf4j.Logger; @@ -15,6 +17,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig; /** * A Sink connector that publishes topic messages to CosmosDB. @@ -58,6 +64,24 @@ public ConfigDef config() { @Override public String version() { - return CosmosConstants.CURRENT_VERSION; + return KafkaCosmosConstants.CURRENT_VERSION; + } + + @Override + public Config validate(Map connectorConfigs) { + Config config = super.validate(connectorConfigs); + //there are errors based on the config def + if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) { + return config; + } + + Map configValues = + config + .configValues() + .stream() + .collect(Collectors.toMap(ConfigValue::name, Function.identity())); + + validateThroughputControlConfig(connectorConfigs, configValues); + return config; } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index 10b2627b8a7c9..4752fdcfb07de 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -8,8 +8,8 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask; @@ -22,7 +22,9 @@ import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.FeedRange; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; @@ -38,8 +40,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; +import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig; + /*** * The CosmosDb source connector. */ @@ -100,7 +105,7 @@ public ConfigDef config() { @Override public String version() { - return CosmosConstants.CURRENT_VERSION; + return KafkaCosmosConstants.CURRENT_VERSION; } private List> getTaskConfigs(int maxTasks) { @@ -315,7 +320,7 @@ private List getFeedRanges(CosmosContainerProperties containerPropert .getContainer(containerProperties.getId()) .getFeedRanges() .onErrorMap(throwable -> - CosmosExceptionsHelper.convertToConnectException( + KafkaCosmosExceptionsHelper.convertToConnectException( throwable, "GetFeedRanges failed for container " + containerProperties.getId())) .block(); @@ -348,6 +353,24 @@ private Map getContainersTopicMap(List connectorConfigs) { + Config config = super.validate(connectorConfigs); + //there are errors based on the config def + if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) { + return config; + } + + Map configValues = + config + .configValues() + .stream() + .collect(Collectors.toMap(ConfigValue::name, Function.identity())); + + validateThroughputControlConfig(connectorConfigs, configValues); + return config; + } + @Override public void close() { this.stop(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java index 40812a54500ca..10589369d0e8d 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) { if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) { - return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName(); + return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName(); } - return CosmosConstants.USER_AGENT_SUFFIX; + return KafkaCosmosConstants.USER_AGENT_SUFFIX; } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java deleted file mode 100644 index e7029c4f95669..0000000000000 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.kafka.connect.implementation; - -import com.azure.core.util.CoreUtils; - -public class CosmosConstants { - public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties"; - public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version"); - public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name"); - public static final String USER_AGENT_SUFFIX = String.format("KafkaConnect/%s/%s", CURRENT_NAME, CURRENT_VERSION); - - public static class StatusCodes { - public static final int NOTFOUND = 404; - public static final int REQUEST_TIMEOUT = 408; - public static final int GONE = 410; - public static final int CONFLICT = 409; - public static final int PRECONDITION_FAILED = 412; - public static final int SERVICE_UNAVAILABLE = 503; - public static final int INTERNAL_SERVER_ERROR = 500; - } - - public static class SubStatusCodes { - public static final int READ_SESSION_NOT_AVAILABLE = 1002; - public static final int PARTITION_KEY_RANGE_GONE = 1002; - public static final int COMPLETING_SPLIT_OR_MERGE = 1007; - } -} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java similarity index 99% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java index bd37756a0446b..10d115ac247fc 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java @@ -22,7 +22,7 @@ /** * Common Configuration for Cosmos DB Kafka source connector and sink connector. */ -public class CosmosConfig extends AbstractConfig { +public class KafkaCosmosConfig extends AbstractConfig { protected static final ConfigDef.Validator NON_EMPTY_STRING = new ConfigDef.NonEmptyString(); private static final String CONFIG_PREFIX = "kafka.connect.cosmos."; @@ -132,7 +132,7 @@ public class CosmosConfig extends AbstractConfig { private final CosmosAccountConfig accountConfig; private final CosmosThroughputControlConfig throughputControlConfig; - public CosmosConfig(ConfigDef config, Map parsedConfig) { + public KafkaCosmosConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); this.accountConfig = this.parseAccountConfig(); this.throughputControlConfig = this.parseThroughputControlConfig(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java new file mode 100644 index 0000000000000..01a89ef20f532 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementation; + +import com.azure.core.util.CoreUtils; + +public class KafkaCosmosConstants { + public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties"; + public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version"); + public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name"); + public static final String USER_AGENT_SUFFIX = String.format("KafkaConnect/%s/%s", CURRENT_NAME, CURRENT_VERSION); +} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java similarity index 69% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java index 07574b8076f16..0d5a8bb8759e3 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java @@ -4,16 +4,17 @@ package com.azure.cosmos.kafka.connect.implementation; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.HttpConstants; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; -public class CosmosExceptionsHelper { +public class KafkaCosmosExceptionsHelper { public static boolean isTransientFailure(int statusCode, int substatusCode) { - return statusCode == CosmosConstants.StatusCodes.GONE - || statusCode == CosmosConstants.StatusCodes.SERVICE_UNAVAILABLE - || statusCode == CosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR - || statusCode == CosmosConstants.StatusCodes.REQUEST_TIMEOUT - || (statusCode == CosmosConstants.StatusCodes.NOTFOUND && substatusCode == CosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + return statusCode == HttpConstants.StatusCodes.GONE + || statusCode == HttpConstants.StatusCodes.SERVICE_UNAVAILABLE + || statusCode == HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR + || statusCode == HttpConstants.StatusCodes.REQUEST_TIMEOUT + || (statusCode == HttpConstants.StatusCodes.NOTFOUND && substatusCode == HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); } @@ -36,13 +37,13 @@ public static boolean isFeedRangeGoneException(Throwable throwable) { } public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) { - return statusCode == CosmosConstants.StatusCodes.GONE - && (substatusCode == CosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE - || substatusCode == CosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); + return statusCode == HttpConstants.StatusCodes.GONE + && (substatusCode == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE + || substatusCode == HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); } public static ConnectException convertToConnectException(Throwable throwable, String message) { - if (CosmosExceptionsHelper.isTransientFailure(throwable)) { + if (KafkaCosmosExceptionsHelper.isTransientFailure(throwable)) { return new RetriableException(message, throwable); } @@ -51,7 +52,7 @@ public static ConnectException convertToConnectException(Throwable throwable, St public static boolean isResourceExistsException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.CONFLICT; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.CONFLICT; } return false; @@ -59,7 +60,7 @@ public static boolean isResourceExistsException(Throwable throwable) { public static boolean isNotFoundException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.NOTFOUND; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.NOTFOUND; } return false; @@ -67,7 +68,7 @@ public static boolean isNotFoundException(Throwable throwable) { public static boolean isPreconditionFailedException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.PRECONDITION_FAILED; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.PRECONDITION_FAILED; } return false; @@ -75,7 +76,7 @@ public static boolean isPreconditionFailedException(Throwable throwable) { public static boolean isTimeoutException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.REQUEST_TIMEOUT; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.REQUEST_TIMEOUT; } return false; diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java similarity index 95% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java index 24d41151ae138..58784644d9620 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java @@ -6,7 +6,7 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class CosmosSchedulers { +public class KafkaCosmosSchedulers { private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "kafka-cosmos-sink-bounded-elastic"; private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java deleted file mode 100644 index 69f99f35f8baf..0000000000000 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.kafka.connect.implementation.sink; - -import org.apache.kafka.connect.errors.ConnectException; - -/** - * Generic CosmosDb sink write exceptions. - */ -public class CosmosDBWriteException extends ConnectException { - /** - * - */ - private static final long serialVersionUID = 1L; - - public CosmosDBWriteException(String message) { - super(message); - } -} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java index 91640ab2f1229..d029105846a56 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java @@ -4,7 +4,7 @@ package com.azure.cosmos.kafka.connect.implementation.sink; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.CosmosConfig; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -15,7 +15,7 @@ /** * Common Configuration for Cosmos DB Kafka sink connector. */ -public class CosmosSinkConfig extends CosmosConfig { +public class CosmosSinkConfig extends KafkaCosmosConfig { private static final String SINK_CONFIG_PREFIX = "kafka.connect.cosmos.sink."; // error tolerance @@ -115,7 +115,7 @@ public CosmosSinkConfig(ConfigDef config, Map parsedConfig) { } public static ConfigDef getConfigDef() { - ConfigDef configDef = CosmosConfig.getConfigDef(); + ConfigDef configDef = KafkaCosmosConfig.getConfigDef(); defineWriteConfig(configDef); defineContainersConfig(configDef); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index c18dbc41dcbc1..9a3bf9e2565bc 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -7,7 +7,7 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -29,7 +29,7 @@ public class CosmosSinkTask extends SinkTask { @Override public String version() { - return CosmosConstants.CURRENT_VERSION; + return KafkaCosmosConstants.CURRENT_VERSION; } @Override diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java index 44700368ebe4a..61b67b44f9c49 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java @@ -10,8 +10,8 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.CosmosSchedulers; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosBulkExecutionOptions; import com.azure.cosmos.models.CosmosBulkItemRequestOptions; @@ -78,7 +78,7 @@ public void writeCore(CosmosAsyncContainer container, List sinkOp .executeBulkOperations( Flux.fromIterable(itemOperations) .mergeWith(bulkRetryEmitter.asFlux()) - .publishOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC), + .publishOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC), bulkExecutionOptions); return cosmosBulkOperationResponseFlux; }) @@ -120,7 +120,7 @@ public void writeCore(CosmosAsyncContainer container, List sinkOp return Mono.empty(); }) - .subscribeOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC) + .subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC) .blockLast(); } @@ -247,7 +247,7 @@ private Mono scheduleRetry( return Mono.empty(); }); - if (CosmosExceptionsHelper.isTimeoutException(exception)) { + if (KafkaCosmosExceptionsHelper.isTimeoutException(exception)) { Duration delayDuration = Duration.ofMillis( MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS + RANDOM.nextInt(MAX_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS - MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS)); @@ -288,16 +288,16 @@ BulkOperationFailedException handleErrorStatusCode( private boolean shouldIgnore(BulkOperationFailedException failedException) { switch (this.writeConfig.getItemWriteStrategy()) { case ITEM_APPEND: - return CosmosExceptionsHelper.isResourceExistsException(failedException); + return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException); case ITEM_DELETE: - return CosmosExceptionsHelper.isNotFoundException(failedException); + return KafkaCosmosExceptionsHelper.isNotFoundException(failedException); case ITEM_DELETE_IF_NOT_MODIFIED: - return CosmosExceptionsHelper.isNotFoundException(failedException) - || CosmosExceptionsHelper.isPreconditionFailedException(failedException); + return KafkaCosmosExceptionsHelper.isNotFoundException(failedException) + || KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException); case ITEM_OVERWRITE_IF_NOT_MODIFIED: - return CosmosExceptionsHelper.isResourceExistsException(failedException) - || CosmosExceptionsHelper.isNotFoundException(failedException) - || CosmosExceptionsHelper.isPreconditionFailedException(failedException); + return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException) + || KafkaCosmosExceptionsHelper.isNotFoundException(failedException) + || KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException); default: return false; } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java index a60e14e067432..62a731ce81565 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java @@ -8,8 +8,8 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.guava25.base.Function; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.CosmosSchedulers; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosItemRequestOptions; import org.apache.kafka.connect.sink.ErrantRecordReporter; @@ -86,7 +86,7 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, - (throwable) -> CosmosExceptionsHelper.isResourceExistsException(throwable), + (throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable), sinkOperation ); } @@ -111,8 +111,8 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO }); }, (throwable) -> { - return CosmosExceptionsHelper.isNotFoundException(throwable) - || CosmosExceptionsHelper.isPreconditionFailedException(throwable); + return KafkaCosmosExceptionsHelper.isNotFoundException(throwable) + || KafkaCosmosExceptionsHelper.isPreconditionFailedException(throwable); }, sinkOperation ); @@ -144,8 +144,8 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO }).then(); }, (throwable) -> { - return CosmosExceptionsHelper.isNotFoundException(throwable) - || CosmosExceptionsHelper.isPreconditionFailedException(throwable); + return KafkaCosmosExceptionsHelper.isNotFoundException(throwable) + || KafkaCosmosExceptionsHelper.isPreconditionFailedException(throwable); }, sinkOperation ); @@ -195,7 +195,7 @@ private void executeWithRetry( }) .repeat(() -> !sinkOperation.isCompleted()) .then() - .subscribeOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC) + .subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC) .block(); } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java index 9ddbaff7a6c72..c4633cc2df60a 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java @@ -7,7 +7,7 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import org.apache.kafka.connect.sink.ErrantRecordReporter; @@ -89,7 +89,7 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe return false; } - return CosmosExceptionsHelper.isTransientFailure(exception); + return KafkaCosmosExceptionsHelper.isTransientFailure(exception); } protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java index 1ede731b5498c..6cc1ba55a4506 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java @@ -5,7 +5,7 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.CosmosConfig; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -18,7 +18,7 @@ /** * Common Configuration for Cosmos DB Kafka source connector. */ -public class CosmosSourceConfig extends CosmosConfig { +public class CosmosSourceConfig extends KafkaCosmosConfig { // configuration only targets to source connector private static final String SOURCE_CONFIG_PREFIX = "kafka.connect.cosmos.source."; @@ -109,7 +109,7 @@ public CosmosSourceConfig(ConfigDef configDef, Map parsedConfigs) { } public static ConfigDef getConfigDef() { - ConfigDef configDef = CosmosConfig.getConfigDef(); + ConfigDef configDef = KafkaCosmosConfig.getConfigDef(); defineContainersConfig(configDef); defineMetadataConfig(configDef); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java index cfb0c9d3106f9..cf4ed6c798c98 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java @@ -9,8 +9,8 @@ import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.implementation.guava25.base.Stopwatch; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.FeedRange; @@ -43,7 +43,7 @@ public class CosmosSourceTask extends SourceTask { @Override public String version() { - return CosmosConstants.CURRENT_VERSION; + return KafkaCosmosConstants.CURRENT_VERSION; } @Override @@ -121,7 +121,7 @@ public List poll() { this.taskUnitsQueue.add(taskUnit); // TODO[Public Preview]: add checking for max retries checking - throw CosmosExceptionsHelper.convertToConnectException(e, "PollTask failed"); + throw KafkaCosmosExceptionsHelper.convertToConnectException(e, "PollTask failed"); } } @@ -191,7 +191,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit return Pair.of(records, false); }) .onErrorResume(throwable -> { - if (CosmosExceptionsHelper.isFeedRangeGoneException(throwable)) { + if (KafkaCosmosExceptionsHelper.isFeedRangeGoneException(throwable)) { return this.handleFeedRangeGone(feedRangeTaskUnit) .map(shouldRemoveOriginalTaskUnit -> Pair.of(new ArrayList<>(), shouldRemoveOriginalTaskUnit)); } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java index 49901acbfbf07..4aadf980dc60a 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java @@ -6,7 +6,7 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; -import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.SqlParameter; @@ -136,7 +136,7 @@ public Mono> getAllContainers() { .byPage() .flatMapIterable(response -> response.getResults()) .collectList() - .onErrorMap(throwable -> CosmosExceptionsHelper.convertToConnectException(throwable, "getAllContainers failed.")); + .onErrorMap(throwable -> KafkaCosmosExceptionsHelper.convertToConnectException(throwable, "getAllContainers failed.")); } public List getContainerRidsFromOffset() {