Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent 7a5ffeb commit 03efa9c
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
Expand All @@ -15,6 +17,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;

/**
* A Sink connector that publishes topic messages to CosmosDB.
Expand Down Expand Up @@ -58,6 +64,24 @@ public ConfigDef config() {

@Override
public String version() {
return CosmosConstants.CURRENT_VERSION;
return KafkaCosmosConstants.CURRENT_VERSION;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
//there are errors based on the config def
if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) {
return config;
}

Map<String, ConfigValue> configValues =
config
.configValues()
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
Expand All @@ -22,7 +22,9 @@
import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
Expand All @@ -38,8 +40,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;

/***
* The CosmosDb source connector.
*/
Expand Down Expand Up @@ -100,7 +105,7 @@ public ConfigDef config() {

@Override
public String version() {
return CosmosConstants.CURRENT_VERSION;
return KafkaCosmosConstants.CURRENT_VERSION;
}

private List<Map<String, String>> getTaskConfigs(int maxTasks) {
Expand Down Expand Up @@ -315,7 +320,7 @@ private List<FeedRange> getFeedRanges(CosmosContainerProperties containerPropert
.getContainer(containerProperties.getId())
.getFeedRanges()
.onErrorMap(throwable ->
CosmosExceptionsHelper.convertToConnectException(
KafkaCosmosExceptionsHelper.convertToConnectException(
throwable,
"GetFeedRanges failed for container " + containerProperties.getId()))
.block();
Expand Down Expand Up @@ -348,6 +353,24 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties
return effectiveContainersTopicMap;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
//there are errors based on the config def
if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) {
return config;
}

Map<String, ConfigValue> configValues =
config
.configValues()
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
return config;
}

@Override
public void close() {
this.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return CosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
}

return CosmosConstants.USER_AGENT_SUFFIX;
return KafkaCosmosConstants.USER_AGENT_SUFFIX;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Common Configuration for Cosmos DB Kafka source connector and sink connector.
*/
public class CosmosConfig extends AbstractConfig {
public class KafkaCosmosConfig extends AbstractConfig {
protected static final ConfigDef.Validator NON_EMPTY_STRING = new ConfigDef.NonEmptyString();
private static final String CONFIG_PREFIX = "kafka.connect.cosmos.";

Expand Down Expand Up @@ -132,7 +132,7 @@ public class CosmosConfig extends AbstractConfig {
private final CosmosAccountConfig accountConfig;
private final CosmosThroughputControlConfig throughputControlConfig;

public CosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
public KafkaCosmosConfig(ConfigDef config, Map<String, ?> parsedConfig) {
super(config, parsedConfig);
this.accountConfig = this.parseAccountConfig();
this.throughputControlConfig = this.parseThroughputControlConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation;

import com.azure.core.util.CoreUtils;

public class KafkaCosmosConstants {
public static final String PROPERTIES_FILE_NAME = "azure-cosmos-kafka-connect.properties";
public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version");
public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name");
public static final String USER_AGENT_SUFFIX = String.format("KafkaConnect/%s/%s", CURRENT_NAME, CURRENT_VERSION);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
package com.azure.cosmos.kafka.connect.implementation;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;

public class CosmosExceptionsHelper {
public class KafkaCosmosExceptionsHelper {
public static boolean isTransientFailure(int statusCode, int substatusCode) {
return statusCode == CosmosConstants.StatusCodes.GONE
|| statusCode == CosmosConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == CosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == CosmosConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == CosmosConstants.StatusCodes.NOTFOUND && substatusCode == CosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);
return statusCode == HttpConstants.StatusCodes.GONE
|| statusCode == HttpConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == HttpConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == HttpConstants.StatusCodes.NOTFOUND && substatusCode == HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);

}

Expand All @@ -36,13 +37,13 @@ public static boolean isFeedRangeGoneException(Throwable throwable) {
}

public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) {
return statusCode == CosmosConstants.StatusCodes.GONE
&& (substatusCode == CosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == CosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
return statusCode == HttpConstants.StatusCodes.GONE
&& (substatusCode == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
}

public static ConnectException convertToConnectException(Throwable throwable, String message) {
if (CosmosExceptionsHelper.isTransientFailure(throwable)) {
if (KafkaCosmosExceptionsHelper.isTransientFailure(throwable)) {
return new RetriableException(message, throwable);
}

Expand All @@ -51,31 +52,31 @@ public static ConnectException convertToConnectException(Throwable throwable, St

public static boolean isResourceExistsException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.CONFLICT;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.CONFLICT;
}

return false;
}

public static boolean isNotFoundException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.NOTFOUND;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.NOTFOUND;
}

return false;
}

public static boolean isPreconditionFailedException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.PRECONDITION_FAILED;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.PRECONDITION_FAILED;
}

return false;
}

public static boolean isTimeoutException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == CosmosConstants.StatusCodes.REQUEST_TIMEOUT;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.REQUEST_TIMEOUT;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class CosmosSchedulers {
public class KafkaCosmosSchedulers {
private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "kafka-cosmos-sink-bounded-elastic";
private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.cosmos.kafka.connect.implementation.sink;

import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

Expand All @@ -15,7 +15,7 @@
/**
* Common Configuration for Cosmos DB Kafka sink connector.
*/
public class CosmosSinkConfig extends CosmosConfig {
public class CosmosSinkConfig extends KafkaCosmosConfig {
private static final String SINK_CONFIG_PREFIX = "kafka.connect.cosmos.sink.";

// error tolerance
Expand Down Expand Up @@ -115,7 +115,7 @@ public CosmosSinkConfig(ConfigDef config, Map<String, ?> parsedConfig) {
}

public static ConfigDef getConfigDef() {
ConfigDef configDef = CosmosConfig.getConfigDef();
ConfigDef configDef = KafkaCosmosConfig.getConfigDef();

defineWriteConfig(configDef);
defineContainersConfig(configDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -29,7 +29,7 @@ public class CosmosSinkTask extends SinkTask {

@Override
public String version() {
return CosmosConstants.CURRENT_VERSION;
return KafkaCosmosConstants.CURRENT_VERSION;
}

@Override
Expand Down
Loading

0 comments on commit 03efa9c

Please sign in to comment.