Skip to content

Commit

Permalink
yield batch of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
spicy-sauce committed Sep 24, 2024
1 parent 7150927 commit 7ea4558
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 5 additions & 1 deletion core/src/datayoga_core/blocks/redis/read_stream/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async def produce(self) -> AsyncGenerator[List[Message], None]:
read_pending = True
while True:
# Read pending messages (fetched by us before but not acknowledged) in the first time, then consume new messages
messages_batch = []
streams = self.redis_client.xreadgroup(self.consumer_group, self.requesting_consumer, {
self.stream: "0" if read_pending else ">"}, None, 100 if self.snapshot else 0)

Expand All @@ -41,7 +42,10 @@ async def produce(self) -> AsyncGenerator[List[Message], None]:
for key, value in stream[1]:
payload = orjson.loads(value[next(iter(value))])
payload[self.MSG_ID_FIELD] = key
yield [payload]
messages_batch.append(payload)

if messages_batch:
yield messages_batch

# Quit after consuming pending current messages in case of snapshot
if self.snapshot and not read_pending:
Expand Down
6 changes: 3 additions & 3 deletions integration-tests/test_redis_to_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
#
# [1] https://github.com/testcontainers/testcontainers-python/pull/286
@pytest.mark.parametrize("db_type, schema_name", [
("db2", None),
("mysql", "hr"),
("pg", "hr"),
# ("db2", None),
# ("mysql", "hr"),
# ("pg", "hr"),
("oracle", "hr"),
pytest.param("sqlserver", "dbo", marks=pytest.mark.skip(reason="SQLServer test fails"))
])
Expand Down

0 comments on commit 7ea4558

Please sign in to comment.