diff --git a/.vscode/cspell.json b/.vscode/cspell.json
index d1ce21af57393..32f0621d9435b 100644
--- a/.vscode/cspell.json
+++ b/.vscode/cspell.json
@@ -140,6 +140,7 @@
"sdk/cosmos/azure-cosmos-encryption/**",
"sdk/cosmos/azure-cosmos-spark_3_2-12/**",
"sdk/spring/azure-spring-data-cosmos/**",
+ "sdk/cosmos/azure-cosmos-kafka-connect/**",
"sdk/deviceupdate/azure-iot-deviceupdate/**",
"sdk/e2e/src/**",
"sdk/eventgrid/azure-messaging-eventgrid-cloudnative-cloudevents/**",
@@ -723,7 +724,7 @@
"words": [
"Pfast",
"Pdirect",
- "Pmulti",
+ "Pmulti",
"Psplit",
"Pquery",
"Pcfp",
diff --git a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
index f8ac3f2776646..db3789c74a934 100755
--- a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
+++ b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
@@ -316,6 +316,9 @@ the main ServiceBusClientBuilder. -->
files="com.azure.cosmos.ClientSideRequestStatistics"/>
+
+
diff --git a/eng/versioning/external_dependencies.txt b/eng/versioning/external_dependencies.txt
index a75828c48d2cc..183fb2585631f 100644
--- a/eng/versioning/external_dependencies.txt
+++ b/eng/versioning/external_dependencies.txt
@@ -395,6 +395,10 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
# Cosmos Kafka connector tests only
+cosmos_org.apache.kafka:connect-runtime;3.6.0
+cosmos_org.testcontainers:testcontainers;1.19.5
+cosmos_org.testcontainers:kafka;1.19.5
+cosmos_org.sourcelab:kafka-connect-client;4.0.4
# Maven Tools for Cosmos Kafka connector only
cosmos_io.confluent:kafka-connect-maven-plugin;0.12.0
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
index 8d7cb3d876bc7..532c376c58a94 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
@@ -3,6 +3,7 @@
### 1.0.0-beta.1 (Unreleased)
#### Features Added
+* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md
new file mode 100644
index 0000000000000..8512ba8d7af79
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md
@@ -0,0 +1,25 @@
+## Configuration Reference:
+
+## Generic Configuration
+| Config Property Name | Default | Description |
+|:---------------------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
+| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Key |
+| `kafka.connect.cosmos.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
+| `kafka.connect.cosmos.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
+| `kafka.connect.cosmos.applicationName` | `""` | Application name. Will be added as the userAgent suffix. |
+
+## Source Connector Configuration
+| Config Property Name | Default | Description |
+|:----------------------------------------------------------|:-------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `kafka.connect.cosmos.source.database.name` | None | Cosmos DB database name. |
+| `kafka.connect.cosmos.source.containers.includeAll` | `false` | Flag to indicate whether reading from all containers. |
+| `kafka.connect.cosmos.source.containers.includedList` | `[]` | Containers included. This config will be ignored if kafka.connect.cosmos.source.includeAllContainers is true. |
+| `kafka.connect.cosmos.source.containers.topicMap` | `[]` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. By default, container name is used as the name of the kafka topic to publish data to, can use this property to override the default config |
+| `kafka.connect.cosmos.source.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is 'Beginning'. |
+| `kafka.connect.cosmos.source.changeFeed.mode` | `LatestVersion` | ChangeFeed mode (LatestVersion or AllVersionsAndDeletes). |
+| `kafka.connect.cosmos.source.changeFeed.maxItemCountHint` | `1000` | The maximum number of documents returned in a single change feed request. But the number of items received might be higher than the specified value if multiple items are changed by the same transaction. |
+| `kafka.connect.cosmos.source.metadata.poll.delay.ms` | `300000` | Indicates how often to check the metadata changes (including container split/merge, adding/removing/recreated containers). When changes are detected, it will reconfigure the tasks. Default is 5 minutes. |
+| `kafka.connect.cosmos.source.metadata.storage.topic` | `_cosmos.metadata.topic` | The name of the topic where the metadata are stored. The metadata topic will be created if it does not already exist, else it will use the pre-created topic. |
+| `kafka.connect.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. |
+| `kafka.connect.cosmos.source.messageKey.field` | `id` | The field to use as the message key. |
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
index 47496e3a0ea3f..76bdce066b676 100644
--- a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
@@ -37,8 +37,6 @@ Licensed under the MIT License.
UTF-8
0.01
0.02
- 11
- 11
azure_cosmos_kafka_connect
@@ -48,7 +46,12 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
- --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.models=ALL-UNNAMED
+ --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
+ --add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
+ --add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
+ --add-exports com.azure.cosmos/com.azure.cosmos.implementation.changefeed.common=com.azure.cosmos.kafka.connect
+ --add-exports com.azure.cosmos/com.azure.cosmos.implementation.feedranges=com.azure.cosmos.kafka.connect
+ --add-exports com.azure.cosmos/com.azure.cosmos.implementation.query=com.azure.cosmos.kafka.connect
@@ -94,6 +97,19 @@ Licensed under the MIT License.
1.10.0
+
+ org.apache.kafka
+ connect-runtime
+ 3.6.0
+ test
+
+
+ jackson-jaxrs-json-provider
+ com.fasterxml.jackson.jaxrs
+
+
+
+
org.testng
testng
@@ -160,6 +176,24 @@ Licensed under the MIT License.
1.14.12
test
+
+ org.testcontainers
+ testcontainers
+ 1.19.5
+ test
+
+
+ org.testcontainers
+ kafka
+ 1.19.5
+ test
+
+
+ org.sourcelab
+ kafka-connect-client
+ 4.0.4
+ test
+
@@ -204,6 +238,7 @@ Licensed under the MIT License.
com.azure:*
org.apache.kafka:connect-api:[3.6.0]
io.confluent:kafka-connect-maven-plugin:[0.12.0]
+ org.sourcelab:kafka-connect-client:[4.0.4]
@@ -221,6 +256,7 @@ Licensed under the MIT License.
shade
+ ${project.artifactId}-${project.version}-jar-with-dependencies
*:*:*:*
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/.gitignore b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/.gitignore
new file mode 100644
index 0000000000000..b170e557735bb
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/.gitignore
@@ -0,0 +1,8 @@
+connectors/
+log.txt
+
+# Exclude all temporary files in resources
+!resources/*example
+resources/sink.properties
+resources/source.properties
+resources/standalone.properties
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/Dockerfile b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/Dockerfile
new file mode 100644
index 0000000000000..37da2123ab868
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/Dockerfile
@@ -0,0 +1,7 @@
+# Build the Cosmos DB Connectors on top of the Kafka Connect image
+FROM confluentinc/cp-kafka-connect:7.5.0
+
+# Install datagen connector
+RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
+
+COPY connectors/ /etc/kafka-connect/jars
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/docker-compose.yml b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/docker-compose.yml
new file mode 100644
index 0000000000000..6f733fee3ab79
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/docker-compose.yml
@@ -0,0 +1,191 @@
+# Adapted from https://github.com/confluentinc/cp-all-in-one and https://github.com/simplesteph/kafka-stack-docker-compose
+version: '2.1'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.5.0
+ restart: unless-stopped
+ hostname: zookeeper
+ container_name: zookeeper
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ broker:
+ image: confluentinc/cp-server:7.5.0
+ hostname: broker
+ container_name: broker
+ ports:
+ - "9092:9092"
+ - "9101:9101"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_JMX_PORT: 9101
+ KAFKA_JMX_HOSTNAME: localhost
+ KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
+ CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
+ CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
+ CONFLUENT_METRICS_ENABLE: 'true'
+ CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
+ depends_on:
+ - zookeeper
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:7.5.0
+ hostname: schema-registry
+ container_name: schema-registry
+ ports:
+ - "8081:8081"
+ environment:
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+ depends_on:
+ - broker
+
+ schema-registry-ui:
+ image: landoop/schema-registry-ui:0.9.4
+ hostname: schema-registry-ui
+ container_name: schema-registry-ui
+ ports:
+ - "9001:8000"
+ environment:
+ SCHEMAREGISTRY_URL: http://schema-registry:8081/
+ PROXY: "true"
+ depends_on:
+ - schema-registry
+
+ rest-proxy:
+ image: confluentinc/cp-kafka-rest:7.5.0
+ ports:
+ - "8082:8082"
+ hostname: rest-proxy
+ container_name: rest-proxy
+ environment:
+ KAFKA_REST_HOST_NAME: rest-proxy
+ KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
+ KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
+ KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
+ depends_on:
+ - broker
+ - schema-registry
+
+ kafka-topics-ui:
+ image: landoop/kafka-topics-ui:0.9.4
+ hostname: kafka-topics-ui
+ container_name: kafka-topics-ui
+ ports:
+ - "9000:8000"
+ environment:
+ KAFKA_REST_PROXY_URL: "http://rest-proxy:8082/"
+ PROXY: "true"
+ depends_on:
+ - zookeeper
+ - broker
+ - schema-registry
+ - rest-proxy
+
+ connect:
+ # Using modified version of confluentinc/cp-kafka-connect:6.0.0 to avoid dealing with volume mounts
+ image: cosmosdb-kafka-connect:latest
+ hostname: connect
+ container_name: connect
+ ports:
+ - "8083:8083"
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
+ CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
+ CONNECT_REST_PORT: 8083
+ CONNECT_GROUP_ID: compose-connect-group
+ CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
+ CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
+ CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
+ CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
+ CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
+ CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
+ CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
+ # CLASSPATH required due to CC-2422
+ CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.0.0.jar
+ CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
+ CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
+ CONNECT_LOG4J_ROOT_LOGLEVEL: "WARN"
+ CONNECT_LOG4J_LOGGERS: "org.apache.kafka=INFO,org.reflections=ERROR,com.azure.cosmos.kafka=DEBUG"
+ CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars'
+ depends_on:
+ - zookeeper
+ - broker
+ - schema-registry
+ - rest-proxy
+
+ control-center:
+ image: confluentinc/cp-enterprise-control-center:7.5.0
+ hostname: control-center
+ container_name: control-center
+ ports:
+ - "9021:9021"
+ environment:
+ CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
+ CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'http://connect:8083'
+ CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
+ CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
+ CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
+ CONTROL_CENTER_REPLICATION_FACTOR: 1
+ CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
+ CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
+ CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
+ CONFLUENT_METRICS_TOPIC_REPLICATION: 1
+ PORT: 9021
+ depends_on:
+ - broker
+ - schema-registry
+ - connect
+
+ ksqldb-server:
+ image: confluentinc/cp-ksqldb-server:7.5.0
+ hostname: ksqldb-server
+ container_name: ksqldb-server
+ ports:
+ - "8088:8088"
+ environment:
+ KSQL_CONFIG_DIR: "/etc/ksql"
+ KSQL_BOOTSTRAP_SERVERS: "broker:29092"
+ KSQL_HOST_NAME: ksqldb-server
+ KSQL_LISTENERS: "http://0.0.0.0:8088"
+ KSQL_CACHE_MAX_BYTES_BUFFERING: 0
+ KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
+ KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
+ KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
+ KSQL_KSQL_CONNECT_URL: "http://connect:8083"
+ KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
+ KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
+ KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
+ depends_on:
+ - broker
+ - connect
+
+ zoonavigator:
+ image: elkozmon/zoonavigator:0.8.0
+ container_name: zoonavigator
+ ports:
+ - "9004:8000"
+ environment:
+ HTTP_PORT: 8000
+ AUTO_CONNECT_CONNECTION_STRING: zookeeper:2181
\ No newline at end of file
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1 b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1
new file mode 100644
index 0000000000000..9cb5c13150cdc
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1
@@ -0,0 +1,29 @@
+#!/usr/bin/env pwsh
+$ErrorActionPreference='Stop'
+cd $PSScriptRoot
+Write-Host "Shutting down Docker Compose orchestration..."
+docker-compose down
+
+Write-Host "Deleting prior Cosmos DB connectors..."
+rm -rf "$PSScriptRoot/connectors"
+New-Item -Path "$PSScriptRoot" -ItemType "directory" -Name "connectors" -Force | Out-Null
+cd $PSScriptRoot/../..
+
+Write-Host "Rebuilding Cosmos DB connectors..."
+mvn clean package -DskipTests -Dmaven.javadoc.skip
+copy target\*-jar-with-dependencies.jar $PSScriptRoot/connectors
+cd $PSScriptRoot
+
+Write-Host "Adding custom Insert UUID SMT"
+cd $PSScriptRoot/connectors
+git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q && cd insertuuid
+mvn clean package -DskipTests=true
+copy target\*.jar $PSScriptRoot/connectors
+rm -rf "$PSScriptRoot/connectors/insertuuid"
+cd $PSScriptRoot
+
+Write-Host "Building Cosmos DB Kafka Connect Docker image"
+docker build . -t cosmosdb-kafka-connect:latest
+
+Write-Host "Starting Docker Compose..."
+docker-compose up -d
\ No newline at end of file
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh
new file mode 100755
index 0000000000000..1f5dbd0566482
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+echo "Shutting down Docker Compose orchestration..."
+docker-compose down
+
+echo "Deleting prior Cosmos DB connectors..."
+rm -rf connectors
+mkdir connectors
+cd ../../
+
+echo "Rebuilding Cosmos DB connectors..."
+mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true
+cp target/*-jar-with-dependencies.jar src/docker/connectors
+cd src/docker
+
+echo "Adding custom Insert UUID SMT"
+cd connectors
+git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q && cd insertuuid
+mvn clean package -DskipTests=true
+cp target/*.jar ../
+cd .. && rm -rf insertuuid
+cd ../
+
+echo "Building Cosmos DB Kafka Connect Docker image"
+docker build . -t cosmosdb-kafka-connect:latest
+
+echo "Starting Docker Compose..."
+docker-compose up -d
\ No newline at end of file
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosConfig.java
deleted file mode 100644
index 41a7703fde72d..0000000000000
--- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosConfig.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.cosmos.kafka.connect;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-
-import java.util.Map;
-
-/**
- * Configuration for Cosmos DB Kafka connector
- */
-public class CosmosConfig extends AbstractConfig {
-
- /**
- * Initializes a new instance of the Cosmos DB Kafka Connector configuration
- * @param definition The configuration definition
- * @param originals The original config values
- * @param configProviderProps The configuration overrides for this provider
- * @param doLog Flag indicating whether the configuration should be logged
- */
- public CosmosConfig(ConfigDef definition, Map, ?> originals, Map configProviderProps, boolean doLog) {
- super(definition, originals, configProviderProps, doLog);
- }
-}
diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosDBSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosDBSourceConnector.java
new file mode 100644
index 0000000000000..d9f92e3731cd1
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosDBSourceConnector.java
@@ -0,0 +1,350 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.kafka.connect;
+
+import com.azure.cosmos.CosmosAsyncClient;
+import com.azure.cosmos.CosmosAsyncContainer;
+import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
+import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
+import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
+import com.azure.cosmos.kafka.connect.implementation.CosmosConstants;
+import com.azure.cosmos.kafka.connect.implementation.CosmosExceptionsHelper;
+import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
+import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceOffsetStorageReader;
+import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
+import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTaskConfig;
+import com.azure.cosmos.kafka.connect.implementation.source.FeedRangeContinuationTopicOffset;
+import com.azure.cosmos.kafka.connect.implementation.source.FeedRangeTaskUnit;
+import com.azure.cosmos.kafka.connect.implementation.source.FeedRangesMetadataTopicOffset;
+import com.azure.cosmos.kafka.connect.implementation.source.KafkaCosmosChangeFeedState;
+import com.azure.cosmos.kafka.connect.implementation.source.MetadataMonitorThread;
+import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit;
+import com.azure.cosmos.models.CosmosContainerProperties;
+import com.azure.cosmos.models.FeedRange;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/***
+ * The CosmosDb source connector.
+ */
+public class CosmosDBSourceConnector extends SourceConnector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBSourceConnector.class);
+ private CosmosSourceConfig config;
+ private CosmosAsyncClient cosmosClient;
+ private MetadataMonitorThread monitorThread;
+ private CosmosSourceOffsetStorageReader offsetStorageReader;
+
+ @Override
+ public void start(Map props) {
+ LOGGER.info("Starting the kafka cosmos source connector");
+ this.config = new CosmosSourceConfig(props);
+ this.cosmosClient = CosmosClientStore.getCosmosClient(this.config.getAccountConfig());
+ this.offsetStorageReader = new CosmosSourceOffsetStorageReader(this.context().offsetStorageReader());
+ this.monitorThread = new MetadataMonitorThread(
+ this.config.getContainersConfig(),
+ this.config.getMetadataConfig(),
+ this.context(),
+ this.offsetStorageReader,
+ this.cosmosClient
+ );
+
+ this.monitorThread.start();
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return CosmosSourceTask.class;
+ }
+
+ @Override
+ public List