diff --git a/pyzeebe/errors/zeebe_errors.py b/pyzeebe/errors/zeebe_errors.py index fb9203d3..bdc23f8f 100644 --- a/pyzeebe/errors/zeebe_errors.py +++ b/pyzeebe/errors/zeebe_errors.py @@ -15,6 +15,10 @@ class ZeebeInternalError(PyZeebeError): pass +class ZeebeDeadlineExceeded(PyZeebeError): + pass + + class UnknownGrpcStatusCodeError(PyZeebeError): def __init__(self, grpc_error: grpc.aio.AioRpcError): super().__init__() diff --git a/pyzeebe/grpc_internals/zeebe_adapter_base.py b/pyzeebe/grpc_internals/zeebe_adapter_base.py index c375a391..4269099f 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_base.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_base.py @@ -7,6 +7,7 @@ from pyzeebe.errors import ( UnknownGrpcStatusCodeError, ZeebeBackPressureError, + ZeebeDeadlineExceeded, ZeebeGatewayUnavailableError, ZeebeInternalError, ) @@ -32,7 +33,7 @@ async def _handle_grpc_error(self, grpc_error: grpc.aio.AioRpcError) -> NoReturn try: pyzeebe_error = _create_pyzeebe_error_from_grpc_error(grpc_error) raise pyzeebe_error - except (ZeebeGatewayUnavailableError, ZeebeInternalError): + except (ZeebeGatewayUnavailableError, ZeebeInternalError, ZeebeDeadlineExceeded): self._current_connection_retries += 1 if not self._should_retry(): await self._close() @@ -52,4 +53,6 @@ def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> P return ZeebeGatewayUnavailableError() if is_error_status(grpc_error, grpc.StatusCode.INTERNAL): return ZeebeInternalError() + elif is_error_status(grpc_error, grpc.StatusCode.DEADLINE_EXCEEDED): + return ZeebeDeadlineExceeded() return UnknownGrpcStatusCodeError(grpc_error) diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index 5fd328b8..3c4bd8b2 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -26,6 +26,8 @@ logger = logging.getLogger(__name__) +DEFAULT_GRPC_REQUEST_TIMEOUT = 20 # This constant represents the fallback timeout value + class ZeebeJobAdapter(ZeebeAdapterBase): async def activate_jobs( @@ -39,6 +41,7 @@ async def activate_jobs( tenant_ids: Optional[Iterable[str]] = None, ) -> AsyncGenerator[Job, None]: try: + grpc_request_timeout = request_timeout / 1000 * 2 if request_timeout > 0 else DEFAULT_GRPC_REQUEST_TIMEOUT async for response in self._gateway_stub.ActivateJobs( ActivateJobsRequest( type=task_type, @@ -48,7 +51,8 @@ async def activate_jobs( fetchVariable=variables_to_fetch, requestTimeout=request_timeout, tenantIds=tenant_ids, - ) + ), + timeout=grpc_request_timeout, ): for raw_job in response.jobs: job = self._create_job_from_raw_job(raw_job) diff --git a/pyzeebe/worker/job_poller.py b/pyzeebe/worker/job_poller.py index 1e044cfb..15417459 100644 --- a/pyzeebe/worker/job_poller.py +++ b/pyzeebe/worker/job_poller.py @@ -5,6 +5,7 @@ from pyzeebe.errors import ( ActivateJobsRequestInvalidError, ZeebeBackPressureError, + ZeebeDeadlineExceeded, ZeebeGatewayUnavailableError, ZeebeInternalError, ) @@ -70,7 +71,12 @@ async def poll_once(self) -> None: except ActivateJobsRequestInvalidError: logger.warning("Activate job requests was invalid for task %s", self.task.type) raise - except (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) as error: + except ( + ZeebeBackPressureError, + ZeebeGatewayUnavailableError, + ZeebeInternalError, + ZeebeDeadlineExceeded, + ) as error: logger.warning( "Failed to activate jobs from the gateway. Exception: %s. Retrying in 5 seconds...", repr(error),