Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timeout to ActivateJobs request #325

Merged
merged 37 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1040e02
Bump version: 3.0.4 → 3.0.5
EyalCMX Jul 17, 2022
3116567
Bump version: 3.0.5 → 3.0.6
EyalCMX Jul 17, 2022
0f09810
Bump version: 3.0.6 → 3.0.7
EyalCMX Jul 17, 2022
7960e69
Bump version: 3.0.7 → 3.0.8
EyalCMX Jul 18, 2022
b053f8e
add logs
EyalCMX Jul 19, 2022
daa7488
add last poll time var
EyalCMX Jul 27, 2022
c5f86e3
Bump version: 3.0.8 → 3.0.9
EyalCMX Jul 27, 2022
129b7b0
fix import
EyalCMX Jul 27, 2022
e6d9c69
Bump version: 3.0.9 → 3.0.10
EyalCMX Jul 27, 2022
69b331c
remove logs
EyalCMX Aug 2, 2022
48f382e
Bump version: 3.0.10 → 3.0.11
EyalCMX Aug 2, 2022
9f051d2
Bump version: 3.0.11 → 3.0.12
EyalCMX Aug 3, 2022
16d16f5
Bump version: 3.0.12 → 3.0.13
EyalCMX Aug 9, 2022
7b369aa
Bump version: 3.0.13 → 3.0.14
EyalCMX Aug 9, 2022
790ba2c
Bump version: 3.0.14 → 3.0.15
EyalCMX Aug 9, 2022
f858606
add timeout and retry on grpc request
EyalCMX Aug 9, 2022
6d653de
Bump version: 3.0.15 → 3.0.16
EyalCMX Aug 9, 2022
ad4d0c6
Bump version: 3.0.16 → 3.0.17
EyalCMX Aug 9, 2022
b40ebc5
handle ZeebeDeadlineExceeded same as ZeebeInternalError
EyalCMX Aug 9, 2022
71b9485
Bump version: 3.0.17 → 3.0.18
EyalCMX Aug 14, 2022
fc01b08
fix timeout param for grpc polling call
EyalCMX Aug 14, 2022
324a887
Bump version: 3.0.18 → 3.0.19
EyalCMX Aug 21, 2022
39314f8
remove logs
EyalCMX Aug 25, 2022
c9697e0
remove unnecessary changes
EyalCMX Aug 25, 2022
33a3818
fix format
EyalCMX Aug 25, 2022
eacfbf9
run black to fix format
EyalCMX Nov 15, 2022
bb0806a
Merge branch 'master' into pr-branch
EyalCMX Jan 12, 2023
445f99a
Merge branch 'master' into pr-branch
EyalCMX Mar 12, 2023
89e2269
Merge branch 'master' into pr-branch
EyalCMX Sep 19, 2023
efa1c4e
move DEFAULT_GRPC_REQUEST_TIMEOUT to const
EyalCMX Mar 5, 2024
81bb92d
Merge remote-tracking branch 'origin/pr-branch' into pr-branch
EyalCMX Mar 5, 2024
9d391a4
Merge branch 'master' into pr-branch
dimastbk Jun 9, 2024
237532c
Merge branch 'master' into pr-branch
dimastbk Jul 23, 2024
134e414
Merge branch 'master' into pr-branch
dimastbk Sep 26, 2024
dc00354
Update pyzeebe/worker/job_poller.py
dimastbk Sep 26, 2024
15ecb94
Update pyzeebe/grpc_internals/zeebe_adapter_base.py
dimastbk Sep 26, 2024
ab33a86
Update pyzeebe/grpc_internals/zeebe_job_adapter.py
dimastbk Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyzeebe/errors/zeebe_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class ZeebeInternalError(PyZeebeError):
pass


class ZeebeDeadlineExceeded(PyZeebeError):
pass


class UnknownGrpcStatusCodeError(PyZeebeError):
def __init__(self, grpc_error: grpc.aio.AioRpcError):
super().__init__()
Expand Down
5 changes: 4 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ZeebeBackPressureError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
ZeebeDeadlineExceeded,
dimastbk marked this conversation as resolved.
Show resolved Hide resolved
)
from pyzeebe.errors.pyzeebe_errors import PyZeebeError
from pyzeebe.grpc_internals.grpc_utils import is_error_status
Expand All @@ -32,7 +33,7 @@ async def _handle_grpc_error(self, grpc_error: grpc.aio.AioRpcError) -> NoReturn
try:
pyzeebe_error = _create_pyzeebe_error_from_grpc_error(grpc_error)
raise pyzeebe_error
except (ZeebeGatewayUnavailableError, ZeebeInternalError):
except (ZeebeGatewayUnavailableError, ZeebeInternalError, ZeebeDeadlineExceeded):
self._current_connection_retries += 1
if not self._should_retry():
await self._close()
Expand All @@ -52,4 +53,6 @@ def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> P
return ZeebeGatewayUnavailableError()
if is_error_status(grpc_error, grpc.StatusCode.INTERNAL):
return ZeebeInternalError()
elif is_error_status(grpc_error, grpc.StatusCode.DEADLINE_EXCEEDED):
return ZeebeDeadlineExceeded()
return UnknownGrpcStatusCodeError(grpc_error)
5 changes: 4 additions & 1 deletion pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

logger = logging.getLogger(__name__)

DEFAULT_GRPC_REQUEST_TIMEOUT = 20 # This constant represents the fallback timeout value

dimastbk marked this conversation as resolved.
Show resolved Hide resolved
class ZeebeJobAdapter(ZeebeAdapterBase):
async def activate_jobs(
Expand All @@ -39,6 +40,7 @@ async def activate_jobs(
tenant_ids: Optional[Iterable[str]] = None,
) -> AsyncGenerator[Job, None]:
try:
grpc_request_timeout = request_timeout / 1000 * 2 if request_timeout > 0 else DEFAULT_GRPC_REQUEST_TIMEOUT
async for response in self._gateway_stub.ActivateJobs(
ActivateJobsRequest(
type=task_type,
Expand All @@ -48,7 +50,8 @@ async def activate_jobs(
fetchVariable=variables_to_fetch,
requestTimeout=request_timeout,
tenantIds=tenant_ids,
)
),
timeout=grpc_request_timeout,
):
for raw_job in response.jobs:
job = self._create_job_from_raw_job(raw_job)
Expand Down
8 changes: 7 additions & 1 deletion pyzeebe/worker/job_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ZeebeBackPressureError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
ZeebeDeadlineExceeded,
dimastbk marked this conversation as resolved.
Show resolved Hide resolved
)
from pyzeebe.grpc_internals.zeebe_job_adapter import ZeebeJobAdapter
from pyzeebe.job.job import Job
Expand Down Expand Up @@ -70,7 +71,12 @@ async def poll_once(self) -> None:
except ActivateJobsRequestInvalidError:
logger.warning("Activate job requests was invalid for task %s", self.task.type)
raise
except (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) as error:
except (
ZeebeBackPressureError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
ZeebeDeadlineExceeded,
) as error:
logger.warning(
"Failed to activate jobs from the gateway. Exception: %s. Retrying in 5 seconds...",
repr(error),
Expand Down
Loading