From 814648d6bcfc1c96588e6ef3473066bdd66e4b40 Mon Sep 17 00:00:00 2001 From: wheelly Date: Wed, 14 Feb 2024 13:39:25 +0200 Subject: [PATCH] #138 - kafka.read - kafka to redis --- .../resources/jobs/tests/kafka_to_redis.yaml | 16 ++++++++ integration-tests/test_kafka_to_redis.py | 41 +++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 integration-tests/resources/jobs/tests/kafka_to_redis.yaml create mode 100644 integration-tests/test_kafka_to_redis.py diff --git a/integration-tests/resources/jobs/tests/kafka_to_redis.yaml b/integration-tests/resources/jobs/tests/kafka_to_redis.yaml new file mode 100644 index 00000000..44b9c9be --- /dev/null +++ b/integration-tests/resources/jobs/tests/kafka_to_redis.yaml @@ -0,0 +1,16 @@ +input: + uses: kafka.read + with: + bootstrap_servers: kafka + topic: "integration-tests" + group: "integration-tests" + snapshot: true + seek_to_beginning: true +steps: + - uses: redis.write + with: + connection: cache + command: HSET + key: + expression: id + language: jmespath \ No newline at end of file diff --git a/integration-tests/test_kafka_to_redis.py b/integration-tests/test_kafka_to_redis.py new file mode 100644 index 00000000..6669e47b --- /dev/null +++ b/integration-tests/test_kafka_to_redis.py @@ -0,0 +1,41 @@ +import json +import logging + +from common import kafka_utils, redis_utils +from common.utils import run_job + +logger = logging.getLogger("dy") +message_one = b'{"id":"1","name":"Boris"}' +message_two = b'{"id":"2","name":"Ivan"}' + +def test_kafka_to_redis(): + kafka_container = kafka_utils.get_kafka_container() + try: + with kafka_container as kafka: + redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT) + redis_container.start() + + bootstrap_servers = kafka.get_bootstrap_server() + producer = kafka_utils.get_kafka_producer(bootstrap_servers) + producer.produce("integration-tests", message_one) + producer.produce("integration-tests", message_two) + producer.flush() + run_job("tests.kafka_to_redis") + + redis_client = redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT) + + assert len(redis_client.keys()) == 2 + + boris = redis_client.hgetall("1") + ivan = redis_client.hgetall("2") + + assert boris == json.loads(message_one.decode()) + assert ivan == json.loads(message_two.decode()) + finally: + redis_container.stop() + + + + + +