Skip to content

Commit

Permalink
shutdown trigger & lifecycle tasks & (#83)
Browse files Browse the repository at this point in the history
Changes:

- add shutdown trigger type
- add lifecycle task pattern
- fix wrong Exception raised when maximal instances were surpassed
- remove broken exception (wrong name and context)
- fix docs
- update tests
- bump to python >=3.9
- remove duplicate keyword in pyproject.toml keywords
- fix instances calculation
- don't ignore invalid kwargs to executor anymore
- bump version
  • Loading branch information
devkral authored Oct 9, 2024
1 parent 6e842cb commit 79a17f5
Show file tree
Hide file tree
Showing 42 changed files with 644 additions and 175 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
services:
redis:
image: redis
Expand Down
2 changes: 1 addition & 1 deletion asyncz/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.11.0"
__version__ = "0.12.0"
6 changes: 3 additions & 3 deletions asyncz/datastructures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -40,7 +40,7 @@ class CombinationState(BaseDatastructureState):
Handles the state of the BaseCombination.
"""

triggers: List[Any]
triggers: list[Any]
jitter: Optional[int] = None


Expand All @@ -52,7 +52,7 @@ class CronState(BaseDatastructureState):
timezone: Optional[tzinfo] = None
start_at: Optional[datetime] = None
end_at: Optional[datetime] = None
fields: Optional[List[Any]] = None
fields: Optional[list[Any]] = None
jitter: Optional[int] = None


Expand Down
4 changes: 2 additions & 2 deletions asyncz/events/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, List, Optional, Union
from typing import Any, Optional, Union

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -42,7 +42,7 @@ class TaskSubmissionEvent(TaskEvent):
scheduled_run_times: List of datetimes when the task is supposed to run.
"""

scheduled_run_times: List[datetime]
scheduled_run_times: list[datetime]


class TaskExecutionEvent(TaskEvent):
Expand Down
9 changes: 2 additions & 7 deletions asyncz/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any, Optional, Union
from uuid import UUID
from typing import Any, Optional


class AsynczException(Exception):
Expand Down Expand Up @@ -76,14 +75,10 @@ def __init__(self, task_id: Optional[str]) -> None:
super().__init__(detail)


class MaxInterationsReached(AsynczException):
detail = "Maximum number of iterations has been reached."


class MaximumInstancesError(AsynczException):
detail = "The task by the id of {id} reached its maximum number of instances {total}."

def __init__(self, _id: Union[UUID, str, int], total: int) -> None:
def __init__(self, _id: str, total: int) -> None:
detail = self.detail.format(id=_id, total=total)
super().__init__(detail=detail)

Expand Down
10 changes: 5 additions & 5 deletions asyncz/executors/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import inspect
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Set, cast
from typing import TYPE_CHECKING, Any, cast

from asyncz.executors.base import BaseExecutor, run_coroutine_task, run_task
from asyncz.utils import iscoroutinefunction_partial

if TYPE_CHECKING:
from asyncz.tasks.types import TaskType
Expand All @@ -23,7 +23,7 @@ class AsyncIOExecutor(BaseExecutor):
def start(self, scheduler: Any, alias: str) -> None:
super().start(scheduler, alias)
self.event_loop = scheduler.event_loop
self.pending_futures: Set[Any] = set()
self.pending_futures: set[Any] = set()

def shutdown(self, wait: bool = True) -> None:
for f in self.pending_futures:
Expand All @@ -32,7 +32,7 @@ def shutdown(self, wait: bool = True) -> None:

self.pending_futures.clear()

def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> None:
def do_send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
task_id = task.id
assert task_id is not None, "Cannot send decorator type task"
assert self.logger is not None, "logger is None"
Expand All @@ -46,7 +46,7 @@ def callback(fn: Any) -> None:
else:
self.run_task_success(task_id, events)

if iscoroutinefunction_partial(task.fn):
if inspect.iscoroutinefunction(task.fn):
coroutine = run_coroutine_task(
task, cast(str, task.store_alias), run_times, self.logger
)
Expand Down
23 changes: 12 additions & 11 deletions asyncz/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import logging
import sys
import traceback
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import timezone as tz
from threading import RLock
from traceback import format_tb
from typing import TYPE_CHECKING, Any, Callable, Dict, List, cast
from typing import TYPE_CHECKING, Any, Callable, cast

from asyncz.events import TaskExecutionEvent
from asyncz.events.constants import TASK_ERROR, TASK_EXECUTED, TASK_MISSED
from asyncz.exceptions import MaximumInstancesError
from asyncz.executors.types import ExecutorType

if TYPE_CHECKING:
from asyncz.schedulers.types import SchedulerType
from asyncz.tasks.types import TaskType


Expand All @@ -24,9 +24,10 @@ class BaseExecutor(ExecutorType):
Asyncz uses loguru for its logging as it is more descriptive and intuitive.
"""

def __init__(self, **kwargs: Any) -> None:
super().__init__()
self.instances: Dict[str, int] = defaultdict(lambda: 0)
@property
def instances(self) -> dict[str, int]:
assert self.scheduler is not None
return cast("SchedulerType", self.scheduler).instances

def start(self, scheduler: Any, alias: str) -> None:
"""
Expand All @@ -51,7 +52,7 @@ def shutdown(self, wait: bool = True) -> None:
wait - Boolean indicating to wait until all submitted tasks have been executed.
"""

def send_task(self, task: "TaskType", run_times: List[datetime]) -> None:
def send_task(self, task: "TaskType", run_times: list[datetime]) -> None:
"""
Sends the task for execution.
Expand All @@ -68,7 +69,7 @@ def send_task(self, task: "TaskType", run_times: List[datetime]) -> None:
self.do_send_task(task, run_times)
self.instances[task.id] += 1

def run_task_success(self, task_id: str, events: List[TaskExecutionEvent]) -> None:
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
"""
Called by the executor with the list of generated events when the function run_task has
been successfully executed.
Expand Down Expand Up @@ -98,9 +99,9 @@ def run_task_error(self, task_id: str) -> None:
def run_task(
task: "TaskType",
store_alias: str,
run_times: List[datetime],
run_times: list[datetime],
logger: logging.Logger,
) -> List[TaskExecutionEvent]:
) -> list[TaskExecutionEvent]:
"""
Called by executors to run the task. Returns a list of scheduler events to be dispatched by the
scheduler.
Expand Down Expand Up @@ -161,9 +162,9 @@ def run_task(
async def run_coroutine_task(
task: "TaskType",
store_alias: str,
run_times: List[datetime],
run_times: list[datetime],
logger: logging.Logger,
) -> List[TaskExecutionEvent]:
) -> list[TaskExecutionEvent]:
"""
Called by executors to run the task. Returns a list of scheduler events to be dispatched by the
scheduler.
Expand Down
4 changes: 2 additions & 2 deletions asyncz/executors/debug.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, List, cast
from typing import TYPE_CHECKING, cast

from asyncz.executors.base import BaseExecutor, run_task

Expand All @@ -16,7 +16,7 @@ class DebugExecutor(BaseExecutor):
def do_send_task(
self,
task: "TaskType",
run_times: List[datetime],
run_times: list[datetime],
) -> None:
assert task.id is not None, "Cannot send decorator type task"
assert self.logger is not None, "logger is None"
Expand Down
8 changes: 3 additions & 5 deletions asyncz/executors/pool.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import concurrent.futures
import sys
from abc import abstractmethod
from concurrent.futures.process import BrokenProcessPool
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any, Optional

from pydantic import ConfigDict

Expand All @@ -23,7 +22,7 @@ def __init__(self, pool: Any, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.pool = pool

def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> Any:
def do_send_task(self, task: "TaskType", run_times: list[datetime]) -> Any:
task_id = task.id
assert task_id is not None, "Cannot send decorator type task"

Expand Down Expand Up @@ -52,8 +51,7 @@ def callback(fn: Any) -> None:
def shutdown(self, wait: bool = True) -> None:
if self.overwrite_wait is not None:
wait = self.overwrite_wait
if sys.version_info >= (3, 9):
self.pool.shutdown(wait, cancel_futures=self.cancel_futures)
self.pool.shutdown(wait, cancel_futures=self.cancel_futures)
self.pool.shutdown(wait)


Expand Down
8 changes: 4 additions & 4 deletions asyncz/executors/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any, Optional

if TYPE_CHECKING:
import logging
Expand Down Expand Up @@ -36,7 +36,7 @@ def shutdown(self, wait: bool = True) -> None:
"""

@abstractmethod
def send_task(self, task: TaskType, run_times: List[datetime]) -> None:
def send_task(self, task: TaskType, run_times: list[datetime]) -> None:
"""
Sends the task for execution.
Expand All @@ -46,14 +46,14 @@ def send_task(self, task: TaskType, run_times: List[datetime]) -> None:
"""

@abstractmethod
def do_send_task(self, task: TaskType, run_times: List[datetime]) -> Any:
def do_send_task(self, task: TaskType, run_times: list[datetime]) -> Any:
"""
Executes the actual task of scheduling `run_task` to be called.
"""
...

@abstractmethod
def run_task_success(self, task_id: str, events: List[TaskExecutionEvent]) -> None:
def run_task_success(self, task_id: str, events: list[TaskExecutionEvent]) -> None:
"""
Called by the executor with the list of generated events when the function run_task has
been successfully executed.
Expand Down
16 changes: 11 additions & 5 deletions asyncz/schedulers/asgi.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

from collections.abc import Awaitable
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING, Awaitable, Callable
from inspect import isawaitable
from typing import TYPE_CHECKING, Callable

from asyncz.typing import DictStrAny

if TYPE_CHECKING:
from asyncz.schedulers.base import BaseScheduler
from asyncz.schedulers.types import SchedulerType

ASGIApp = Callable[
[
Expand All @@ -26,7 +28,7 @@ class MuteInteruptException(BaseException):
@dataclass
class ASGIHelper:
app: ASGIApp
scheduler: BaseScheduler
scheduler: SchedulerType
handle_lifespan: bool = False
wait: bool = True

Expand All @@ -43,13 +45,17 @@ async def receive() -> DictStrAny:
message = await original_receive()
if message["type"] == "lifespan.startup":
try:
self.scheduler.start()
result = self.scheduler.start()
if isawaitable(result):
await result
except Exception as exc:
await send({"type": "lifespan.startup.failed", "msg": str(exc)})
raise MuteInteruptException from None
elif message["type"] == "lifespan.shutdown":
try:
self.scheduler.shutdown(self.wait)
result = self.scheduler.shutdown(self.wait)
if isawaitable(result):
await result
except Exception as exc:
await send({"type": "lifespan.shutdown.failed", "msg": str(exc)})
raise MuteInteruptException from None
Expand Down
Loading

0 comments on commit 79a17f5

Please sign in to comment.