Skip to content

Commit

Permalink
update package names
Browse files Browse the repository at this point in the history
  • Loading branch information
dyllamt committed Feb 23, 2024
1 parent d4c1d4a commit 9c8e330
Show file tree
Hide file tree
Showing 19 changed files with 61 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ test-unit:

.PHONY: docker-build
docker-build:
docker build -t coinbase-connector .
docker build -t coinbase-producer .

.PHONY: test-integration
test-integration:
Expand Down
41 changes: 17 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,33 @@
# coinbase-connector
Connector between coinbase websocket feeds and kafka.
# coinbase-producer
Kafka producer for coinbase feeds.

# Contents
#### Replicas

The application impliments an async webserver that subscribes to coinbase websocket feeds and forwards the messages to kafka. It is recommended that you deploy it in a replica set.
If replicas are deployed, message deduplication should be implemented in downstream components.

## Docker
# Packages

There is a docker image of the application published [here](https://github.com/dyllamt/coinbase-connector/pkgs/container/coinbase-connector).

## Helm

There is a packaged chart published [here](https://github.com/dyllamt/coinbase-connector/tree/gh-pages). A starter chart for kafka is also included.
[Producer image](https://github.com/dyllamt/coinbase-producer/pkgs/container/coinbase-producer)
[Producer chart](https://github.com/dyllamt/coinbase-producer/tree/gh-pages/coinbase-producer).
[Kafka chart](https://github.com/dyllamt/coinbase-producer/tree/gh-pages/coinbase-kafka)

# Developer Notes

## CI/CD
- on pull requests: format, unit, and integration tests.
- merge into main: docker and helm release (if [version](https://github.com/dyllamt/coinbase-connector/blob/main/VERSION) bumped).

## Local Testing

- `make install` install python dependencies.
- `make test-format` tests formating and performs static type checking.
- `make test-unit` tests live message consumption to a mock kafka stream.
- `make test-integration` tests helm deployment with kafka broker.

## Logging

#### Info
- subscription messages sent to coinbase.
- messages consumed from coinbase.

#### Warnings
- coinbase reconnection errors.

## Replicas
## Local Testing

If multiple replicas are deployed, kafka consumers should implement deduplication logic.
- `make install` install python dependencies.
- `make test-format` tests formating and performs static type checking.
- `make test-unit` tests live message consumption to a mock kafka stream.
- `make test-integration` tests helm deployment with kafka broker.

## CI/CD
- `pull request`: format, unit, and integration tests.
- `merge to main`: docker and helm release (if [version](https://github.com/dyllamt/coinbase-producer/blob/main/VERSION) bumped).
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.5
0.0.6
2 changes: 1 addition & 1 deletion charts/coinbase-kafka/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v2
description: "Simple kafka deployment for coinbase data."
description: "Kafka broker chart for coinbase feeds."
name: coinbase-kafka
version: 0.0.1 # dynamic version set by gh-action from VERSION file
4 changes: 2 additions & 2 deletions charts/coinbase-kafka/templates/kafka-cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: coinbase-cluster
name: {{ .Values.cluster.name }}
spec:
kafka:
version: 3.6.0
replicas: {{ .Values.cluster.kafka.replicas }}
replicas: {{ .Values.cluster.broker.replicas }}
listeners:
- name: plain
port: 9092
Expand Down
3 changes: 2 additions & 1 deletion charts/coinbase-kafka/templates/kafka-topic.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{- $root := . -}}
{{- range .Values.topics }}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: {{ . }}
labels:
strimzi.io/cluster: coinbase-cluster
strimzi.io/cluster: {{ $root.Values.cluster.name }}
spec:
partitions: 1
replicas: 1
Expand Down
6 changes: 4 additions & 2 deletions charts/coinbase-kafka/values.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
cluster:
# -- name of strimzi kafka crd
name: coinbase-kafka
# -- storage class for broker logs
storage: ephemeral
kafka:
broker:
# -- number of broker replicas
replicas: 1
zookeeper:
# -- number of zookeper replicas
replicas: 1
# -- topics on the cluster
# -- published topics
topics:
- ticker
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v2
description: "Forwarding service from coinbase to kafka."
name: coinbase-connector
description: "Kafka producer for coinbase feeds."
name: coinbase-producer
version: 0.0.1 # dynamic version set by gh-action from VERSION file
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.connector.deploymentName }}
name: {{ .Values.producer.deploymentName }}
spec:
replicas: {{ .Values.connector.replicas }}
replicas: {{ .Values.producer.replicas }}
selector:
matchLabels:
app: {{ .Values.connector.appName }}
app: {{ .Values.producer.appName }}
template:
metadata:
labels:
app: {{ .Values.connector.appName }}
app: {{ .Values.producer.appName }}
annotations:
rollme: {{ randAlphaNum 5 | quote }}
spec:
Expand All @@ -29,9 +29,9 @@ spec:
- name: KAFKA_CLUSTER_NAME
value: {{ .Values.kafka.clusterName }}
containers:
- name: {{ .Values.connector.appName }}
image: {{ .Values.connector.image }}
imagePullPolicy: {{ .Values.connector.imagePullPolicy }}
- name: {{ .Values.producer.appName }}
image: {{ .Values.producer.image }}
imagePullPolicy: {{ .Values.producer.imagePullPolicy }}
ports:
- containerPort: 8000
env:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.connector.serviceName }}
name: {{ .Values.producer.serviceName }}
spec:
# type: LoadBalancer
selector:
app: {{ .Values.connector.appName }}
app: {{ .Values.producer.appName }}
ports:
- protocol: TCP
port: 8000
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
connector:
producer:
# -- name of deployment in k8s
deploymentName: coinbase-connector-deployment
deploymentName: coinbase-producer-deployment
# -- name of service in k8s
serviceName: coinbase-connector-service
serviceName: coinbase-producer-service
# -- app name linking deployment and service
appName: coinbase-connector
appName: coinbase-producer
# -- deployment image
image: coinbase-connector:latest # default overwritten by gh-actions
image: coinbase-producer:latest # default overwritten by gh-actions
# -- image pull policy
imagePullPolicy: IfNotPresent
# -- replicas for deployment
Expand All @@ -15,8 +15,8 @@ kafka:
# -- namespace where kafka is deployed
clusterNamespace: "dev"
# -- name of the kafka cluster
clusterName: "coinbase-cluster"
clusterName: "coinbase-kafka"
# -- address of the kafka cluster
address: "coinbase-cluster-kafka-bootstrap.dev.svc.cluster.local:9092"
address: "coinbase-kafka-kafka-bootstrap.dev.svc.cluster.local:9092"
# -- the topic to forward data to
topic: "ticker"
2 changes: 1 addition & 1 deletion scripts/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ helm install strimzi-operator strimzi/strimzi-kafka-operator --namespace $NAMESP
helm install coinbase-kafka ../charts/coinbase-kafka/ -f ../charts/coinbase-kafka/values.yaml --namespace $NAMESPACE --wait

# coinbase connector
helm install coinbase-connector ../charts/coinbase-connector/ -f ../charts/coinbase-connector/values.yaml --namespace $NAMESPACE --wait
helm install coinbase-producer ../charts/coinbase-producer/ -f ../charts/coinbase-producer/values.yaml --namespace $NAMESPACE --wait
2 changes: 1 addition & 1 deletion scripts/teardown.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
helm uninstall -n dev coinbase-connector
helm uninstall -n dev coinbase-producer
helm uninstall -n dev coinbase-kafka
helm uninstall -n dev strimzi-operator
2 changes: 1 addition & 1 deletion scripts/test-consumer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TOPIC_NAME="ticker"
KAFKA_NAMESPACE="dev"

# Kafka broker pod name (change this to match your Kafka broker pod name)
KAFKA_POD_NAME=coinbase-cluster-kafka-0
KAFKA_POD_NAME=coinbase-kafka-kafka-0

# For stopping message consumption
TIMEOUT_DURATION=30
Expand Down
18 changes: 10 additions & 8 deletions src/coinbase/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# logging configuration

logger = logging.getLogger("coinbase-connector")
logger = logging.getLogger("coinbase-producer")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
Expand Down Expand Up @@ -39,17 +39,19 @@ async def publish_message_to_kafka(producer: kafka.KafkaProducer, topic: str, me
coinbase_subscription_message = {"type": "subscribe", "product_ids": ["ETH-USD", "BTC-USD"], "channels": ["ticker"]}


async def subscribe_to_feeds(websocket: websockets.WebSocketClientProtocol):
async def subscribe_to_feeds(websocket: websockets.WebSocketClientProtocol): # type: ignore
await websocket.send(json.dumps(coinbase_subscription_message))


async def message_handler(websocket: websockets.WebSocketClientProtocol, producer: kafka.KafkaProducer, topic: str):
async def message_handler(
websocket: websockets.WebSocketClientProtocol, producer: kafka.KafkaProducer, topic: str # type: ignore
):
async for message in websocket:
await publish_message_to_kafka(producer, topic, message) # type: ignore


async def connect_and_serve(coinbase_address: str, producer: kafka.KafkaProducer, topic: str):
async with websockets.connect(coinbase_address) as websocket:
async def connect_and_serve_messages(coinbase_address: str, producer: kafka.KafkaProducer, topic: str):
async with websockets.connect(coinbase_address) as websocket: # type: ignore
await subscribe_to_feeds(websocket)
subscription = await websocket.recv() # skip the first message, which is the subscription message
logger.info(f"{subscription}") # type: ignore
Expand All @@ -60,7 +62,7 @@ async def connect_and_serve(coinbase_address: str, producer: kafka.KafkaProducer


async def main(coinbase_address: str, kafka_address: str, kafka_topic: str):
"""Connector between coinbase websocket feeds and kafka.
"""Kafka producer for coinbase feeds.
Reconnect error handling and logging is implemented.
Expand All @@ -77,8 +79,8 @@ async def main(coinbase_address: str, kafka_address: str, kafka_topic: str):

while True:
try:
await connect_and_serve(coinbase_address, producer, kafka_topic)
except websockets.ConnectionClosed:
await connect_and_serve_messages(coinbase_address, producer, kafka_topic)
except websockets.ConnectionClosed: # type: ignore
logger.warning("Connection closed: reconnecting after 5 seconds...")
await asyncio.sleep(5)

Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


def test_messages_in_topic(kubernetes_services):

# executes kafka listener on a broker node
out, err = run_script("scripts/test-consumer.sh")

Expand Down
1 change: 0 additions & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class KafkaMockProducer(kafka.KafkaProducer):

def __init__(self):
self.messages = [] # collects messages sent by producer

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import coinbase.main as main


def test_connect_and_serve(kafka_producer):
def test_connect_and_serve_messages(kafka_producer):
print("cool")

# run streaming loop for 10 seconds
try:
asyncio.run(
asyncio.wait_for(
main.connect_and_serve(main.default_coinbase_address, producer=kafka_producer, topic=""),
main.connect_and_serve_messages(main.default_coinbase_address, producer=kafka_producer, topic=""),
timeout=5,
)
)
Expand Down

0 comments on commit 9c8e330

Please sign in to comment.