Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Nov 4, 2024
1 parent 47212a6 commit a595db8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
11 changes: 7 additions & 4 deletions tests/unit/grpc_internals/zeebe_adapter_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,21 @@ async def test_raises_unkown_grpc_status_code_on_unkown_status_code(
async def test_closes_after_retries_exceeded(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)

zeebe_adapter._close = AsyncMock()
zeebe_adapter._channel.close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeGatewayUnavailableError):
await zeebe_adapter._handle_grpc_error(error)

zeebe_adapter._close.assert_called_once()
assert zeebe_adapter.connected is False
zeebe_adapter._channel.close.assert_awaited_once()

async def test_closes_after_internal_error(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None)
zeebe_adapter._close = AsyncMock()

zeebe_adapter._channel.close = AsyncMock()
zeebe_adapter._max_connection_retries = 1
with pytest.raises(ZeebeInternalError):
await zeebe_adapter._handle_grpc_error(error)

zeebe_adapter._close.assert_called_once()
assert zeebe_adapter.connected is False
zeebe_adapter._channel.close.assert_awaited_once()
7 changes: 4 additions & 3 deletions tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, Mock
from uuid import uuid4

Expand Down Expand Up @@ -277,12 +278,12 @@ async def test_poller_failed(self, zeebe_worker: ZeebeWorker):
async def test_second_poller_should_cancel(self, zeebe_worker: ZeebeWorker):
zeebe_worker._init_tasks = Mock()

poller2_cancel_event = anyio.Event()
poller2_cancel_event = asyncio.Event()

async def poll2():
try:
await anyio.Event().wait()
except anyio.get_cancelled_exc_class():
await asyncio.Event().wait()
except asyncio.CancelledError:
poller2_cancel_event.set()

poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")]))
Expand Down

0 comments on commit a595db8

Please sign in to comment.