Skip to content

Commit

Permalink
Merge pull request #834 from Aiven-Open/matyaskuti/consumer_empty_com…
Browse files Browse the repository at this point in the history
…mit_test

Add tests for empty payload consumer commit
  • Loading branch information
eliax1996 authored Mar 8, 2024
2 parents ea3300d + d00ab8a commit cb6aa95
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
25 changes: 25 additions & 0 deletions tests/integration/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ def test_commit_offsets(
assert committed_partition.partition == 0
assert committed_partition.offset == 2

def test_commit_offsets_empty(
self,
producer: KafkaProducer,
consumer: KafkaConsumer,
new_topic: NewTopic,
) -> None:
consumer.subscribe([new_topic.topic])
first_fut = producer.send(new_topic.topic)
second_fut = producer.send(new_topic.topic)
producer.flush()
first_fut.result()
second_fut.result()
consumer.poll(timeout=POLL_TIMEOUT_S)
consumer.poll(timeout=POLL_TIMEOUT_S)

[topic_partition] = consumer.commit(offsets=None, message=None) # default parameters
[committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)])

assert topic_partition.topic == new_topic.topic
assert topic_partition.partition == 0
assert topic_partition.offset == 2
assert committed_partition.topic == new_topic.topic
assert committed_partition.partition == 0
assert committed_partition.offset == 2

def test_commit_raises_for_unknown_partition(
self,
consumer: KafkaConsumer,
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,80 @@ async def test_offsets(rest_async_client, admin_client, trail):
assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}"


@pytest.mark.parametrize("trail", ["", "/"])
async def test_offsets_no_payload(rest_async_client, admin_client, producer, trail):
group_name = "offset_group_no_payload"
fmt = "binary"
header = REST_HEADERS[fmt]
instance_id = await new_consumer(
rest_async_client,
group_name,
fmt=fmt,
trail=trail,
# By default this is true
payload_override={"auto.commit.enable": "false"},
)
topic_name = new_topic(admin_client)
offsets_path = f"/consumers/{group_name}/instances/{instance_id}/offsets{trail}"
assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments{trail}"
consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000"

res = await rest_async_client.post(
assign_path,
json={"partitions": [{"topic": topic_name, "partition": 0}]},
headers=header,
)
assert res.ok, f"Unexpected response status for assignment {res}"

producer.send(topic_name, value=b"message-value")
producer.flush()

resp = await rest_async_client.get(consume_path, headers=header)
assert resp.ok, f"Expected a successful response: {resp}"

await repeat_until_successful_request(
rest_async_client.post,
offsets_path,
json_data={},
headers=header,
error_msg="Unexpected response status for offset commit",
timeout=20,
sleep=1,
)

res = await rest_async_client.get(
offsets_path,
headers=header,
json={"partitions": [{"topic": topic_name, "partition": 0}]},
)
assert res.ok, f"Unexpected response status for {res}"
data = res.json()
assert "offsets" in data and len(data["offsets"]) == 1, f"Unexpected offsets response {res}"
data = data["offsets"][0]
assert "topic" in data and data["topic"] == topic_name, f"Unexpected topic {data}"
assert "offset" in data and data["offset"] == 1, f"Unexpected offset {data}"
assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}"
res = await rest_async_client.post(
offsets_path,
json={"offsets": [{"topic": topic_name, "partition": 0, "offset": 1}]},
headers=header,
)
assert res.ok, f"Unexpected response status for offset commit {res}"

res = await rest_async_client.get(
offsets_path,
headers=header,
json={"partitions": [{"topic": topic_name, "partition": 0}]},
)
assert res.ok, f"Unexpected response status for {res}"
data = res.json()
assert "offsets" in data and len(data["offsets"]) == 1, f"Unexpected offsets response {res}"
data = data["offsets"][0]
assert "topic" in data and data["topic"] == topic_name, f"Unexpected topic {data}"
assert "offset" in data and data["offset"] == 2, f"Unexpected offset {data}"
assert "partition" in data and data["partition"] == 0, f"Unexpected partition {data}"


@pytest.mark.parametrize("trail", ["", "/"])
async def test_consume(rest_async_client, admin_client, producer, trail):
# avro to be handled in a separate testcase ??
Expand Down
4 changes: 3 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@
}


async def new_consumer(c, group, fmt="avro", trail=""):
async def new_consumer(c, group, fmt="avro", trail="", payload_override=None):
payload = copy.copy(consumer_valid_payload)
payload["format"] = fmt
if payload_override:
payload.update(payload_override)
resp = await c.post(f"/consumers/{group}{trail}", json=payload, headers=REST_HEADERS[fmt])
assert resp.ok
return resp.json()["instance_id"]
Expand Down

0 comments on commit cb6aa95

Please sign in to comment.