From 7a5ffeb3ce39c929ad42826fbd2c645f7ecea98f Mon Sep 17 00:00:00 2001 From: annie-mac Date: Fri, 29 Mar 2024 08:48:08 -0700 Subject: [PATCH] refactor --- sdk/cosmos/azure-cosmos-kafka-connect/pom.xml | 1 + .../kafka/connect/CosmosSinkConnector.java | 4 +-- .../kafka/connect/CosmosSourceConnector.java | 8 +++--- .../implementation/CosmosClientStore.java | 4 +-- ...fkaCosmosConfig.java => CosmosConfig.java} | 4 +-- ...mosConstants.java => CosmosConstants.java} | 2 +- ...elper.java => CosmosExceptionsHelper.java} | 28 +++++++++---------- ...sSchedulers.java => CosmosSchedulers.java} | 2 +- ...ava => CosmosThroughputControlHelper.java} | 2 +- .../implementation/sink/CosmosSinkConfig.java | 6 ++-- .../implementation/sink/CosmosSinkTask.java | 8 +++--- .../sink/KafkaCosmosBulkWriter.java | 28 +++++++++---------- .../sink/KafkaCosmosPointWriter.java | 26 ++++++++--------- .../sink/KafkaCosmosWriterBase.java | 4 +-- .../source/CosmosSourceConfig.java | 6 ++-- .../source/CosmosSourceTask.java | 16 +++++------ .../source/MetadataMonitorThread.java | 4 +-- 17 files changed, 77 insertions(+), 76 deletions(-) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosConfig.java => CosmosConfig.java} (99%) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosConstants.java => CosmosConstants.java} (97%) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosExceptionsHelper.java => CosmosExceptionsHelper.java} (68%) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosSchedulers.java => CosmosSchedulers.java} (95%) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosThroughputControlHelper.java => CosmosThroughputControlHelper.java} (98%) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml index 6dc24310792e3..bd0833d7d57b2 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml +++ b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml @@ -48,6 +48,7 @@ Licensed under the MIT License. --add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED + --add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java index ef38399c74396..7526b1449ef01 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java @@ -3,7 +3,7 @@ package com.azure.cosmos.kafka.connect; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask; import org.apache.kafka.common.config.ConfigDef; @@ -58,6 +58,6 @@ public ConfigDef config() { @Override public String version() { - return KafkaCosmosConstants.CURRENT_VERSION; + return CosmosConstants.CURRENT_VERSION; } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index bbf568d9b9fb7..10b2627b8a7c9 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -8,8 +8,8 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader; import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask; @@ -100,7 +100,7 @@ public ConfigDef config() { @Override public String version() { - return KafkaCosmosConstants.CURRENT_VERSION; + return CosmosConstants.CURRENT_VERSION; } private List> getTaskConfigs(int maxTasks) { @@ -315,7 +315,7 @@ private List getFeedRanges(CosmosContainerProperties containerPropert .getContainer(containerProperties.getId()) .getFeedRanges() .onErrorMap(throwable -> - KafkaCosmosExceptionsHelper.convertToConnectException( + CosmosExceptionsHelper.convertToConnectException( throwable, "GetFeedRanges failed for container " + containerProperties.getId())) .block(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java index 10589369d0e8d..40812a54500ca 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientStore.java @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) { if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) { - return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName(); + return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName(); } - return KafkaCosmosConstants.USER_AGENT_SUFFIX; + return CosmosConstants.USER_AGENT_SUFFIX; } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java similarity index 99% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java index 10d115ac247fc..bd37756a0446b 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConfig.java @@ -22,7 +22,7 @@ /** * Common Configuration for Cosmos DB Kafka source connector and sink connector. */ -public class KafkaCosmosConfig extends AbstractConfig { +public class CosmosConfig extends AbstractConfig { protected static final ConfigDef.Validator NON_EMPTY_STRING = new ConfigDef.NonEmptyString(); private static final String CONFIG_PREFIX = "kafka.connect.cosmos."; @@ -132,7 +132,7 @@ public class KafkaCosmosConfig extends AbstractConfig { private final CosmosAccountConfig accountConfig; private final CosmosThroughputControlConfig throughputControlConfig; - public KafkaCosmosConfig(ConfigDef config, Map parsedConfig) { + public CosmosConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); this.accountConfig = this.parseAccountConfig(); this.throughputControlConfig = this.parseThroughputControlConfig(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java similarity index 97% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java index 50db99a79c634..e7029c4f95669 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosConstants.java @@ -5,7 +5,7 @@ import com.azure.core.util.CoreUtils; -public class KafkaCosmosConstants { +public class CosmosConstants { public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties"; public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version"); public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name"); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java similarity index 68% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java index a8adfd35a22ab..07574b8076f16 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosExceptionsHelper.java @@ -7,13 +7,13 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; -public class KafkaCosmosExceptionsHelper { +public class CosmosExceptionsHelper { public static boolean isTransientFailure(int statusCode, int substatusCode) { - return statusCode == KafkaCosmosConstants.StatusCodes.GONE - || statusCode == KafkaCosmosConstants.StatusCodes.SERVICE_UNAVAILABLE - || statusCode == KafkaCosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR - || statusCode == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT - || (statusCode == KafkaCosmosConstants.StatusCodes.NOTFOUND && substatusCode == KafkaCosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + return statusCode == CosmosConstants.StatusCodes.GONE + || statusCode == CosmosConstants.StatusCodes.SERVICE_UNAVAILABLE + || statusCode == CosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR + || statusCode == CosmosConstants.StatusCodes.REQUEST_TIMEOUT + || (statusCode == CosmosConstants.StatusCodes.NOTFOUND && substatusCode == CosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); } @@ -36,13 +36,13 @@ public static boolean isFeedRangeGoneException(Throwable throwable) { } public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) { - return statusCode == KafkaCosmosConstants.StatusCodes.GONE - && (substatusCode == KafkaCosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE - || substatusCode == KafkaCosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); + return statusCode == CosmosConstants.StatusCodes.GONE + && (substatusCode == CosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE + || substatusCode == CosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); } public static ConnectException convertToConnectException(Throwable throwable, String message) { - if (KafkaCosmosExceptionsHelper.isTransientFailure(throwable)) { + if (CosmosExceptionsHelper.isTransientFailure(throwable)) { return new RetriableException(message, throwable); } @@ -51,7 +51,7 @@ public static ConnectException convertToConnectException(Throwable throwable, St public static boolean isResourceExistsException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.CONFLICT; + return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.CONFLICT; } return false; @@ -59,7 +59,7 @@ public static boolean isResourceExistsException(Throwable throwable) { public static boolean isNotFoundException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.NOTFOUND; + return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.NOTFOUND; } return false; @@ -67,7 +67,7 @@ public static boolean isNotFoundException(Throwable throwable) { public static boolean isPreconditionFailedException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.PRECONDITION_FAILED; + return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.PRECONDITION_FAILED; } return false; @@ -75,7 +75,7 @@ public static boolean isPreconditionFailedException(Throwable throwable) { public static boolean isTimeoutException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT; + return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.REQUEST_TIMEOUT; } return false; diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java similarity index 95% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java index 58784644d9620..24d41151ae138 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosSchedulers.java @@ -6,7 +6,7 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class KafkaCosmosSchedulers { +public class CosmosSchedulers { private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "kafka-cosmos-sink-bounded-elastic"; private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java similarity index 98% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java index f6719b0a58e0c..f30210f103736 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java @@ -14,7 +14,7 @@ import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; -public class KafkaCosmosThroughputControlHelper { +public class CosmosThroughputControlHelper { public static CosmosAsyncContainer tryEnableThroughputControl( CosmosAsyncContainer container, CosmosAsyncClient throughputControlCosmosClient, diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java index d029105846a56..91640ab2f1229 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkConfig.java @@ -4,7 +4,7 @@ package com.azure.cosmos.kafka.connect.implementation.sink; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig; +import com.azure.cosmos.kafka.connect.implementation.CosmosConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -15,7 +15,7 @@ /** * Common Configuration for Cosmos DB Kafka sink connector. */ -public class CosmosSinkConfig extends KafkaCosmosConfig { +public class CosmosSinkConfig extends CosmosConfig { private static final String SINK_CONFIG_PREFIX = "kafka.connect.cosmos.sink."; // error tolerance @@ -115,7 +115,7 @@ public CosmosSinkConfig(ConfigDef config, Map parsedConfig) { } public static ConfigDef getConfigDef() { - ConfigDef configDef = KafkaCosmosConfig.getConfigDef(); + ConfigDef configDef = CosmosConfig.getConfigDef(); defineWriteConfig(configDef); defineContainersConfig(configDef); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index af7731387cf82..c18dbc41dcbc1 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -7,8 +7,8 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -29,7 +29,7 @@ public class CosmosSinkTask extends SinkTask { @Override public String version() { - return KafkaCosmosConstants.CURRENT_VERSION; + return CosmosConstants.CURRENT_VERSION; } @Override @@ -96,7 +96,7 @@ record -> this.sinkTaskConfig .getDatabase(this.sinkTaskConfig.getContainersConfig().getDatabaseName()) .getContainer(containerName); - KafkaCosmosThroughputControlHelper + CosmosThroughputControlHelper .tryEnableThroughputControl( container, this.throughputControlClient, diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java index ea63c8345401c..44700368ebe4a 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java @@ -10,9 +10,9 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosSchedulers; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosBulkExecutionOptions; import com.azure.cosmos.models.CosmosBulkItemRequestOptions; import com.azure.cosmos.models.CosmosBulkItemResponse; @@ -78,7 +78,7 @@ public void writeCore(CosmosAsyncContainer container, List sinkOp .executeBulkOperations( Flux.fromIterable(itemOperations) .mergeWith(bulkRetryEmitter.asFlux()) - .publishOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC), + .publishOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC), bulkExecutionOptions); return cosmosBulkOperationResponseFlux; }) @@ -120,7 +120,7 @@ public void writeCore(CosmosAsyncContainer container, List sinkOp return Mono.empty(); }) - .subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC) + .subscribeOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC) .blockLast(); } @@ -134,7 +134,7 @@ private CosmosBulkExecutionOptions getBulkExecutionOperations() { .setMaxConcurrentCosmosPartitions(bulkExecutionOptions, this.writeConfig.getBulkMaxConcurrentCosmosPartitions()); } - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig); return bulkExecutionOptions; } @@ -247,7 +247,7 @@ private Mono scheduleRetry( return Mono.empty(); }); - if (KafkaCosmosExceptionsHelper.isTimeoutException(exception)) { + if (CosmosExceptionsHelper.isTimeoutException(exception)) { Duration delayDuration = Duration.ofMillis( MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS + RANDOM.nextInt(MAX_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS - MIN_DELAY_ON_408_REQUEST_TIMEOUT_IN_MS)); @@ -288,16 +288,16 @@ BulkOperationFailedException handleErrorStatusCode( private boolean shouldIgnore(BulkOperationFailedException failedException) { switch (this.writeConfig.getItemWriteStrategy()) { case ITEM_APPEND: - return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException); + return CosmosExceptionsHelper.isResourceExistsException(failedException); case ITEM_DELETE: - return KafkaCosmosExceptionsHelper.isNotFoundException(failedException); + return CosmosExceptionsHelper.isNotFoundException(failedException); case ITEM_DELETE_IF_NOT_MODIFIED: - return KafkaCosmosExceptionsHelper.isNotFoundException(failedException) - || KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException); + return CosmosExceptionsHelper.isNotFoundException(failedException) + || CosmosExceptionsHelper.isPreconditionFailedException(failedException); case ITEM_OVERWRITE_IF_NOT_MODIFIED: - return KafkaCosmosExceptionsHelper.isResourceExistsException(failedException) - || KafkaCosmosExceptionsHelper.isNotFoundException(failedException) - || KafkaCosmosExceptionsHelper.isPreconditionFailedException(failedException); + return CosmosExceptionsHelper.isResourceExistsException(failedException) + || CosmosExceptionsHelper.isNotFoundException(failedException) + || CosmosExceptionsHelper.isPreconditionFailedException(failedException); default: return false; } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java index 2bd58242584f3..a60e14e067432 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java @@ -8,9 +8,9 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.guava25.base.Function; import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosSchedulers; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosItemRequestOptions; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.slf4j.Logger; @@ -71,7 +71,7 @@ private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkO executeWithRetry( (operation) -> { CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); return container.upsertItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, (throwable) -> false, // no exceptions should be ignored @@ -83,10 +83,10 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO executeWithRetry( (operation) -> { CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, - (throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable), + (throwable) -> CosmosExceptionsHelper.isResourceExistsException(throwable), sinkOperation ); } @@ -96,7 +96,7 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO (operation) -> { CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions(); itemRequestOptions.setIfMatchETag(etag); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); return ImplementationBridgeHelpers .CosmosAsyncContainerHelper @@ -111,8 +111,8 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO }); }, (throwable) -> { - return KafkaCosmosExceptionsHelper.isNotFoundException(throwable) - || KafkaCosmosExceptionsHelper.isPreconditionFailedException(throwable); + return CosmosExceptionsHelper.isNotFoundException(throwable) + || CosmosExceptionsHelper.isPreconditionFailedException(throwable); }, sinkOperation ); @@ -129,7 +129,7 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO } } - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); return ImplementationBridgeHelpers .CosmosAsyncContainerHelper @@ -144,8 +144,8 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO }).then(); }, (throwable) -> { - return KafkaCosmosExceptionsHelper.isNotFoundException(throwable) - || KafkaCosmosExceptionsHelper.isPreconditionFailedException(throwable); + return CosmosExceptionsHelper.isNotFoundException(throwable) + || CosmosExceptionsHelper.isPreconditionFailedException(throwable); }, sinkOperation ); @@ -195,7 +195,7 @@ private void executeWithRetry( }) .repeat(() -> !sinkOperation.isCompleted()) .then() - .subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC) + .subscribeOn(CosmosSchedulers.SINK_BOUNDED_ELASTIC) .block(); } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java index c4633cc2df60a..9ddbaff7a6c72 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosWriterBase.java @@ -7,7 +7,7 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import org.apache.kafka.connect.sink.ErrantRecordReporter; @@ -89,7 +89,7 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe return false; } - return KafkaCosmosExceptionsHelper.isTransientFailure(exception); + return CosmosExceptionsHelper.isTransientFailure(exception); } protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java index 6cc1ba55a4506..1ede731b5498c 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java @@ -5,7 +5,7 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig; +import com.azure.cosmos.kafka.connect.implementation.CosmosConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -18,7 +18,7 @@ /** * Common Configuration for Cosmos DB Kafka source connector. */ -public class CosmosSourceConfig extends KafkaCosmosConfig { +public class CosmosSourceConfig extends CosmosConfig { // configuration only targets to source connector private static final String SOURCE_CONFIG_PREFIX = "kafka.connect.cosmos.source."; @@ -109,7 +109,7 @@ public CosmosSourceConfig(ConfigDef configDef, Map parsedConfigs) { } public static ConfigDef getConfigDef() { - ConfigDef configDef = KafkaCosmosConfig.getConfigDef(); + ConfigDef configDef = CosmosConfig.getConfigDef(); defineContainersConfig(configDef); defineMetadataConfig(configDef); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java index e29aa11b9f708..cfb0c9d3106f9 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java @@ -9,9 +9,9 @@ import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.implementation.guava25.base.Stopwatch; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosConstants; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; @@ -43,7 +43,7 @@ public class CosmosSourceTask extends SourceTask { @Override public String version() { - return KafkaCosmosConstants.CURRENT_VERSION; + return CosmosConstants.CURRENT_VERSION; } @Override @@ -121,7 +121,7 @@ public List poll() { this.taskUnitsQueue.add(taskUnit); // TODO[Public Preview]: add checking for max retries checking - throw KafkaCosmosExceptionsHelper.convertToConnectException(e, "PollTask failed"); + throw CosmosExceptionsHelper.convertToConnectException(e, "PollTask failed"); } } @@ -167,7 +167,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit this.cosmosClient .getDatabase(feedRangeTaskUnit.getDatabaseName()) .getContainer(feedRangeTaskUnit.getContainerName()); - KafkaCosmosThroughputControlHelper.tryEnableThroughputControl( + CosmosThroughputControlHelper.tryEnableThroughputControl( container, this.throughputControlCosmosClient, this.taskConfig.getThroughputControlConfig()); @@ -178,7 +178,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit // split/merge will be handled in source task ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(changeFeedRequestOptions); - KafkaCosmosThroughputControlHelper + CosmosThroughputControlHelper .tryPopulateThroughputControlGroupName( changeFeedRequestOptions, this.taskConfig.getThroughputControlConfig()); @@ -191,7 +191,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit return Pair.of(records, false); }) .onErrorResume(throwable -> { - if (KafkaCosmosExceptionsHelper.isFeedRangeGoneException(throwable)) { + if (CosmosExceptionsHelper.isFeedRangeGoneException(throwable)) { return this.handleFeedRangeGone(feedRangeTaskUnit) .map(shouldRemoveOriginalTaskUnit -> Pair.of(new ArrayList<>(), shouldRemoveOriginalTaskUnit)); } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java index 4aadf980dc60a..49901acbfbf07 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataMonitorThread.java @@ -6,7 +6,7 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.SqlParameter; @@ -136,7 +136,7 @@ public Mono> getAllContainers() { .byPage() .flatMapIterable(response -> response.getResults()) .collectList() - .onErrorMap(throwable -> KafkaCosmosExceptionsHelper.convertToConnectException(throwable, "getAllContainers failed.")); + .onErrorMap(throwable -> CosmosExceptionsHelper.convertToConnectException(throwable, "getAllContainers failed.")); } public List getContainerRidsFromOffset() {