Skip to content

Commit

Permalink
Merge pull request #539 from camunda-community-hub/add-ZeebeError
Browse files Browse the repository at this point in the history
feat: add ZeebeError for all Zeebe errors
  • Loading branch information
dimastbk authored Dec 6, 2024
2 parents a60ba59 + d9b2651 commit 3533397
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 18 deletions.
2 changes: 2 additions & 0 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.InvalidJSONError

.. autoexception:: pyzeebe.errors.ZeebeError

.. autoexception:: pyzeebe.errors.ZeebeBackPressureError

.. autoexception:: pyzeebe.errors.ZeebeGatewayUnavailableError
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
UnknownGrpcStatusCodeError,
ZeebeBackPressureError,
ZeebeDeadlineExceeded,
ZeebeError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
)
Expand Down Expand Up @@ -52,4 +53,5 @@
"ZeebeDeadlineExceeded",
"ZeebeGatewayUnavailableError",
"ZeebeInternalError",
"ZeebeError",
)
34 changes: 26 additions & 8 deletions pyzeebe/errors/zeebe_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,47 @@
from pyzeebe.errors.pyzeebe_errors import PyZeebeError


class ZeebeBackPressureError(PyZeebeError):
class ZeebeError(PyZeebeError):
"""Base exception for all Zeebe errors."""

def __init__(self, grpc_error: grpc.aio.AioRpcError):
super().__init__()
self.grpc_error = grpc_error

def __str__(self) -> str:
return "{}(grpc_error={}(code={}, details={}, debug_error_string={}))".format(
self.__class__.__qualname__,
self.grpc_error.__class__.__qualname__,
self.grpc_error._code,
self.grpc_error._details,
self.grpc_error._debug_error_string,
)

def __repr__(self) -> str:
return self.__str__()


class ZeebeBackPressureError(ZeebeError):
"""If Zeebe is currently in back pressure (too many requests)
See: https://docs.camunda.io/docs/self-managed/zeebe-deployment/operations/backpressure/
"""


class ZeebeGatewayUnavailableError(PyZeebeError):
class ZeebeGatewayUnavailableError(ZeebeError):
pass


class ZeebeInternalError(PyZeebeError):
class ZeebeInternalError(ZeebeError):
pass


class ZeebeDeadlineExceeded(PyZeebeError):
class ZeebeDeadlineExceeded(ZeebeError):
"""If Zeebe hasn't responded after a certain timeout
See: https://grpc.io/docs/guides/deadlines/
"""


class UnknownGrpcStatusCodeError(PyZeebeError):
def __init__(self, grpc_error: grpc.aio.AioRpcError):
super().__init__()
self.grpc_error = grpc_error
class UnknownGrpcStatusCodeError(ZeebeError):
pass
8 changes: 4 additions & 4 deletions pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ async def _close(self) -> None:

def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> PyZeebeError:
if is_error_status(grpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED):
return ZeebeBackPressureError()
return ZeebeBackPressureError(grpc_error)
if is_error_status(grpc_error, grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.CANCELLED):
return ZeebeGatewayUnavailableError()
return ZeebeGatewayUnavailableError(grpc_error)
if is_error_status(grpc_error, grpc.StatusCode.INTERNAL):
return ZeebeInternalError()
return ZeebeInternalError(grpc_error)
elif is_error_status(grpc_error, grpc.StatusCode.DEADLINE_EXCEEDED):
return ZeebeDeadlineExceeded()
return ZeebeDeadlineExceeded(grpc_error)
return UnknownGrpcStatusCodeError(grpc_error)
42 changes: 36 additions & 6 deletions tests/unit/grpc_internals/zeebe_adapter_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,74 @@ def test_returns_false_when_current_retries_over_max(self, zeebe_adapter: ZeebeA
class TestHandleRpcError:
async def test_raises_internal_error_on_internal_error_status(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None)
with pytest.raises(ZeebeInternalError):
with pytest.raises(ZeebeInternalError) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "ZeebeInternalError(grpc_error=AioRpcError(code=StatusCode.INTERNAL, details=None, debug_error_string=None))"
)

async def test_raises_back_pressure_error_on_resource_exhausted(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.RESOURCE_EXHAUSTED, None, None)
with pytest.raises(ZeebeBackPressureError):
with pytest.raises(ZeebeBackPressureError) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "ZeebeBackPressureError(grpc_error=AioRpcError(code=StatusCode.RESOURCE_EXHAUSTED, details=None, debug_error_string=None))"
)

async def test_raises_deadline_exceeded_on_deadline_exceeded(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.DEADLINE_EXCEEDED, None, None)
with pytest.raises(ZeebeDeadlineExceeded):
with pytest.raises(ZeebeDeadlineExceeded) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "ZeebeDeadlineExceeded(grpc_error=AioRpcError(code=StatusCode.DEADLINE_EXCEEDED, details=None, debug_error_string=None))"
)

async def test_raises_gateway_unavailable_on_unavailable_status(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)
with pytest.raises(ZeebeGatewayUnavailableError):
with pytest.raises(ZeebeGatewayUnavailableError) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "ZeebeGatewayUnavailableError(grpc_error=AioRpcError(code=StatusCode.UNAVAILABLE, details=None, debug_error_string=None))"
)

async def test_raises_gateway_unavailable_on_cancelled_status(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError(grpc.StatusCode.CANCELLED, None, None)

with pytest.raises(ZeebeGatewayUnavailableError):
with pytest.raises(ZeebeGatewayUnavailableError) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "ZeebeGatewayUnavailableError(grpc_error=AioRpcError(code=StatusCode.CANCELLED, details=None, debug_error_string=None))"
)

async def test_raises_unkown_grpc_status_code_on_unkown_status_code(
self,
zeebe_adapter: ZeebeAdapterBase,
):
error = grpc.aio.AioRpcError("FakeGrpcStatus", None, None)
with pytest.raises(UnknownGrpcStatusCodeError):
with pytest.raises(UnknownGrpcStatusCodeError) as err:
await zeebe_adapter._handle_grpc_error(error)

assert (
str(err.value)
== "UnknownGrpcStatusCodeError(grpc_error=AioRpcError(code=FakeGrpcStatus, details=None, debug_error_string=None))"
)

async def test_closes_after_retries_exceeded(self, zeebe_adapter: ZeebeAdapterBase):
error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None)

Expand Down

0 comments on commit 3533397

Please sign in to comment.