Skip to content

Commit

Permalink
use tenacity for retry
Browse files Browse the repository at this point in the history
  • Loading branch information
cl0ete committed Dec 5, 2024
1 parent 426b3da commit d3e14f7
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions waypoint/services/nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from datetime import datetime, timedelta, timezone

import orjson
import tenacity
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.api import ConsumerConfig, DeliverPolicy
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError
from retry import retry
from tenacity import RetryCallState

from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT
from shared.log_config import get_logger
Expand All @@ -17,6 +18,15 @@
logger = get_logger(__name__)


def retry_log(retry_state: RetryCallState):
"""Custom logging for retry attempts."""
if retry_state.outcome.failed:
exception = retry_state.outcome.exception()
logger.warning(
f"Retry attempt {retry_state.attempt_number} failed due to {type(exception).__name__}: {exception}"
)


class NatsEventsProcessor:
"""
Class to handle processing of NATS events. Calling the process_events method will
Expand Down Expand Up @@ -55,7 +65,12 @@ async def _subscribe(
opt_start_time=start_time,
)

@retry(TimeoutError, delay=1, backoff=2, max_delay=16, logger=logger)
@tenacity.retry(
retry=tenacity.retry_if_exception_type(TimeoutError),
wait=tenacity.wait_exponential(multiplier=1, max=16),
after=retry_log,
stop=tenacity.stop_never,
)
async def pull_subscribe(config, **kwargs):
try:
logger.trace(
Expand Down

0 comments on commit d3e14f7

Please sign in to comment.