From 580af85dd0e59c74c3346673867cc6c6a2717b87 Mon Sep 17 00:00:00 2001 From: wheelly Date: Sun, 18 Feb 2024 10:44:55 +0200 Subject: [PATCH] #138 - kafka.read - fixing tests --- core/src/datayoga_core/blocks/kafka/read/block.py | 4 +++- integration-tests/common/kafka_utils.py | 12 +++++------- integration-tests/test_kafka_to_redis.py | 4 ++-- integration-tests/test_kafka_to_stdout.py | 14 ++++++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index fd7b6a87..7b905765 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -39,9 +39,11 @@ async def produce(self) -> AsyncGenerator[List[Message], None]: consumer = Consumer(**{ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.group, - 'enable.auto.commit': False + 'enable.auto.commit': False, + 'auto.offset.reset': 'earliest', }) logger.debug(f"Producing {self.get_block_name()}") + #consumer.assign([TopicPartition(self.topic, 0)]) if self.seek_to_beginning: def on_assign(c, ps): diff --git a/integration-tests/common/kafka_utils.py b/integration-tests/common/kafka_utils.py index 42a789c3..a1b2f0d5 100644 --- a/integration-tests/common/kafka_utils.py +++ b/integration-tests/common/kafka_utils.py @@ -1,10 +1,8 @@ -from confluent_kafka import Producer +from kafka import KafkaProducer from testcontainers.kafka import KafkaContainer def get_kafka_container() -> KafkaContainer: - return KafkaContainer().with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT) + return (KafkaContainer(image="confluentinc/cp-kafka:latest") + .with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT)) -def get_kafka_producer(bootstrap_servers: str) -> Producer: - return Producer({ - "bootstrap.servers": bootstrap_servers, - "group.id": "integration-tests" - }) +def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer: + return KafkaProducer(bootstrap_servers=bootstrap_servers) diff --git a/integration-tests/test_kafka_to_redis.py b/integration-tests/test_kafka_to_redis.py index 6669e47b..6adfc8d4 100644 --- a/integration-tests/test_kafka_to_redis.py +++ b/integration-tests/test_kafka_to_redis.py @@ -17,8 +17,8 @@ def test_kafka_to_redis(): bootstrap_servers = kafka.get_bootstrap_server() producer = kafka_utils.get_kafka_producer(bootstrap_servers) - producer.produce("integration-tests", message_one) - producer.produce("integration-tests", message_two) + producer.send("integration-tests", message_one) + producer.send("integration-tests", message_two) producer.flush() run_job("tests.kafka_to_redis") diff --git a/integration-tests/test_kafka_to_stdout.py b/integration-tests/test_kafka_to_stdout.py index 08512660..357530e2 100644 --- a/integration-tests/test_kafka_to_stdout.py +++ b/integration-tests/test_kafka_to_stdout.py @@ -1,6 +1,5 @@ import logging import os -import time from common import kafka_utils from common.utils import run_job @@ -13,22 +12,25 @@ def test_kafka_to_stdout(tmpdir): kafka_container = kafka_utils.get_kafka_container() + output_file = tmpdir.join("tests_kafka_to_stdout.txt") try: + with kafka_container as kafka: bootstrap_servers = kafka.get_bootstrap_server() + #bootstrap_servers = "host.docker.internal:9093" producer = kafka_utils.get_kafka_producer(bootstrap_servers) - producer.produce("integration-tests", message_one) - producer.produce("integration-tests", message_two) + producer.send("integration-tests", message_one) + producer.send("integration-tests", message_two) producer.flush() - output_file = tmpdir.join("tests_kafka_to_stdout.txt") + run_job("tests.kafka_to_stdout", None, output_file) result = output_file.readlines() assert len(result) == 2 assert result[0].strip().encode() == message_one assert result[1].strip().encode() == message_two - producer.produce("integration-tests", message_three) - producer.produce("integration-tests", message_four) + producer.send("integration-tests", message_three) + producer.send("integration-tests", message_four) producer.flush() run_job("tests.kafka_to_stdout", None, output_file)