Skip to content

Commit

Permalink
#138 - kafka.read - fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 18, 2024
1 parent 0dfaaf9 commit 580af85
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
4 changes: 3 additions & 1 deletion core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ async def produce(self) -> AsyncGenerator[List[Message], None]:
consumer = Consumer(**{
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': False
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
})
logger.debug(f"Producing {self.get_block_name()}")
#consumer.assign([TopicPartition(self.topic, 0)])

if self.seek_to_beginning:
def on_assign(c, ps):
Expand Down
12 changes: 5 additions & 7 deletions integration-tests/common/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from confluent_kafka import Producer
from kafka import KafkaProducer
from testcontainers.kafka import KafkaContainer
def get_kafka_container() -> KafkaContainer:
return KafkaContainer().with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT)
return (KafkaContainer(image="confluentinc/cp-kafka:latest")
.with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT))

def get_kafka_producer(bootstrap_servers: str) -> Producer:
return Producer({
"bootstrap.servers": bootstrap_servers,
"group.id": "integration-tests"
})
def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer:
return KafkaProducer(bootstrap_servers=bootstrap_servers)
4 changes: 2 additions & 2 deletions integration-tests/test_kafka_to_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def test_kafka_to_redis():

bootstrap_servers = kafka.get_bootstrap_server()
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.produce("integration-tests", message_one)
producer.produce("integration-tests", message_two)
producer.send("integration-tests", message_one)
producer.send("integration-tests", message_two)
producer.flush()
run_job("tests.kafka_to_redis")

Expand Down
14 changes: 8 additions & 6 deletions integration-tests/test_kafka_to_stdout.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os
import time

from common import kafka_utils
from common.utils import run_job
Expand All @@ -13,22 +12,25 @@

def test_kafka_to_stdout(tmpdir):
kafka_container = kafka_utils.get_kafka_container()
output_file = tmpdir.join("tests_kafka_to_stdout.txt")
try:

with kafka_container as kafka:
bootstrap_servers = kafka.get_bootstrap_server()
#bootstrap_servers = "host.docker.internal:9093"
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.produce("integration-tests", message_one)
producer.produce("integration-tests", message_two)
producer.send("integration-tests", message_one)
producer.send("integration-tests", message_two)
producer.flush()
output_file = tmpdir.join("tests_kafka_to_stdout.txt")

run_job("tests.kafka_to_stdout", None, output_file)
result = output_file.readlines()
assert len(result) == 2
assert result[0].strip().encode() == message_one
assert result[1].strip().encode() == message_two

producer.produce("integration-tests", message_three)
producer.produce("integration-tests", message_four)
producer.send("integration-tests", message_three)
producer.send("integration-tests", message_four)
producer.flush()

run_job("tests.kafka_to_stdout", None, output_file)
Expand Down

0 comments on commit 580af85

Please sign in to comment.