diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py index c724c0ed1..bc85fda65 100644 --- a/tests/integration/kafka/test_consumer.py +++ b/tests/integration/kafka/test_consumer.py @@ -137,6 +137,31 @@ def test_commit_offsets( assert committed_partition.partition == 0 assert committed_partition.offset == 2 + def test_commit_offsets_empty( + self, + producer: KafkaProducer, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic) + producer.flush() + first_fut.result() + second_fut.result() + consumer.poll(timeout=POLL_TIMEOUT_S) + consumer.poll(timeout=POLL_TIMEOUT_S) + + [topic_partition] = consumer.commit(offsets=None, message=None) # default parameters + [committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)]) + + assert topic_partition.topic == new_topic.topic + assert topic_partition.partition == 0 + assert topic_partition.offset == 2 + assert committed_partition.topic == new_topic.topic + assert committed_partition.partition == 0 + assert committed_partition.offset == 2 + def test_commit_raises_for_unknown_partition( self, consumer: KafkaConsumer, diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index 1539b15f7..6ff948e5b 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -251,6 +251,80 @@ async def test_offsets(rest_async_client, admin_client, trail): assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}" +@pytest.mark.parametrize("trail", ["", "/"]) +async def test_offsets_no_payload(rest_async_client, admin_client, producer, trail): + group_name = "offset_group_no_payload" + fmt = "binary" + header = REST_HEADERS[fmt] + instance_id = await new_consumer( + rest_async_client, + group_name, + fmt=fmt, + trail=trail, + # By default this is true + payload_override={"auto.commit.enable": "false"}, + ) + topic_name = new_topic(admin_client) + offsets_path = f"/consumers/{group_name}/instances/{instance_id}/offsets{trail}" + assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments{trail}" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" + + res = await rest_async_client.post( + assign_path, + json={"partitions": [{"topic": topic_name, "partition": 0}]}, + headers=header, + ) + assert res.ok, f"Unexpected response status for assignment {res}" + + producer.send(topic_name, value=b"message-value") + producer.flush() + + resp = await rest_async_client.get(consume_path, headers=header) + assert resp.ok, f"Expected a successful response: {resp}" + + await repeat_until_successful_request( + rest_async_client.post, + offsets_path, + json_data={}, + headers=header, + error_msg="Unexpected response status for offset commit", + timeout=20, + sleep=1, + ) + + res = await rest_async_client.get( + offsets_path, + headers=header, + json={"partitions": [{"topic": topic_name, "partition": 0}]}, + ) + assert res.ok, f"Unexpected response status for {res}" + data = res.json() + assert "offsets" in data and len(data["offsets"]) == 1, f"Unexpected offsets response {res}" + data = data["offsets"][0] + assert "topic" in data and data["topic"] == topic_name, f"Unexpected topic {data}" + assert "offset" in data and data["offset"] == 1, f"Unexpected offset {data}" + assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}" + res = await rest_async_client.post( + offsets_path, + json={"offsets": [{"topic": topic_name, "partition": 0, "offset": 1}]}, + headers=header, + ) + assert res.ok, f"Unexpected response status for offset commit {res}" + + res = await rest_async_client.get( + offsets_path, + headers=header, + json={"partitions": [{"topic": topic_name, "partition": 0}]}, + ) + assert res.ok, f"Unexpected response status for {res}" + data = res.json() + assert "offsets" in data and len(data["offsets"]) == 1, f"Unexpected offsets response {res}" + data = data["offsets"][0] + assert "topic" in data and data["topic"] == topic_name, f"Unexpected topic {data}" + assert "offset" in data and data["offset"] == 2, f"Unexpected offset {data}" + assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}" + + @pytest.mark.parametrize("trail", ["", "/"]) async def test_consume(rest_async_client, admin_client, producer, trail): # avro to be handled in a separate testcase ?? diff --git a/tests/utils.py b/tests/utils.py index a48bc55cc..7b4538440 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -180,9 +180,11 @@ } -async def new_consumer(c, group, fmt="avro", trail=""): +async def new_consumer(c, group, fmt="avro", trail="", payload_override=None): payload = copy.copy(consumer_valid_payload) payload["format"] = fmt + if payload_override: + payload.update(payload_override) resp = await c.post(f"/consumers/{group}{trail}", json=payload, headers=REST_HEADERS[fmt]) assert resp.ok return resp.json()["instance_id"]