Skip to content

Commit

Permalink
Develop (#82)
Browse files Browse the repository at this point in the history
* allow paused tasks, use scheduler timezone instead of tzlocal, make tzlocal optional (#80)

Changes:

- cleanup unused to_bool
- pass timezone info from scheduler to get_next_trigger_time, so they are scheduler specific
- make tzlocal optional and fallback to utc
- allow to_datetime be used without timezone
- allow submitting paused tasks
- replace UndefinedType by Undefined and add a shim for backward compatibility
- shrink pending_tasks, store_alias is now set in add_task
- allow updating task attributes while submitting via add_task
- fix incorrect fn references in documentation
- update documentation
- allow updating more attributes in-place in add_task
- remove old types from TaskState
- allow submitting tasks only once (set a fuse submitted)
- add tests
- update release notes

* optional loguru, allow switching logging (#81)


- make loguru optional, fallback to normal logging
- make normal logging selectable by overwriting either per scheduler or
  per default_loggers_class in asyncz.schedulers.base
- splitout process_pool from pool and pass special logger
- provide tests
- update release notes and docs
* fix test workflow and docker-compose
* fix python 3.8 typings

* bump version

* add warning about flaky integration tests

* fix spurious integration test failures

---------

Co-authored-by: Alexander <[email protected]>
  • Loading branch information
tarsil and devkral authored Aug 2, 2024
1 parent 61e9508 commit 6e842cb
Show file tree
Hide file tree
Showing 45 changed files with 850 additions and 378 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ jobs:
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
services:
redis:
image: redis
ports:
- 127.0.0.1:6379:6379

mongodb:
image: mongo
ports:
- 127.0.0.1:27017:27017

steps:
- uses: "actions/checkout@v4"
- uses: "actions/setup-python@v5"
Expand All @@ -29,14 +40,12 @@ jobs:
with:
path: ${{ env.pythonLocation }}
key: ${{ runner.os }}-python-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }}-test-v02
- name: "Start docker services"
run: docker-compose up -d
- name: "Install dependencies"
if: steps.cache.outputs.cache-hit != 'true'
run: "pip install hatch"
- name: "Run linting"
run: "hatch fmt --check"
# - name: "Run mypy"
# run: "hatch run test:check_types"
- name: "Run mypy"
run: "hatch run test:check_types"
- name: "Run tests"
run: "hatch test"
2 changes: 1 addition & 1 deletion asyncz/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.10.1"
__version__ = "0.11.0"
10 changes: 4 additions & 6 deletions asyncz/datastructures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime, timedelta, tzinfo
from datetime import timezone as dt_timezone
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union

from pydantic import BaseModel, ConfigDict
Expand Down Expand Up @@ -29,10 +28,10 @@ class IntervalState(BaseDatastructureState):
Handles the state for a IntervalTrigger.
"""

timezone: Union[dt_timezone, str, tzinfo]
timezone: Optional[tzinfo] = None
start_at: datetime
end_at: Optional[datetime] = None
interval: Optional[Union[dt_timezone, timedelta]] = None
interval: Optional[timedelta] = None
jitter: Optional[int] = None


Expand All @@ -50,7 +49,7 @@ class CronState(BaseDatastructureState):
Handles the state of the CronTrigger.
"""

timezone: Optional[Union[dt_timezone, str, tzinfo]] = None
timezone: Optional[tzinfo] = None
start_at: Optional[datetime] = None
end_at: Optional[datetime] = None
fields: Optional[List[Any]] = None
Expand All @@ -65,11 +64,10 @@ class TaskState(BaseDatastructureState): # type: ignore
args: Optional[Any] = None
kwargs: Optional[Any] = None
coalesce: Optional[bool] = None
trigger: Optional[Union[str, TriggerType]] = None
trigger: Optional[TriggerType] = None
executor: Optional[str] = None
mistrigger_grace_time: Optional[int] = None
max_instances: Optional[int] = None
next_run_time: Optional[datetime] = None
scheduler: Optional[Any] = None
store_alias: Optional[str] = None
store: Optional[Union[str, StoreType]] = None
3 changes: 2 additions & 1 deletion asyncz/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from .asyncio import AsyncIOExecutor
from .base import BaseExecutor
from .debug import DebugExecutor
from .pool import ProcessPoolExecutor, ThreadPoolExecutor
from .pool import ThreadPoolExecutor
from .process_pool import ProcessPoolExecutor

__all__ = [
"BaseExecutor",
Expand Down
9 changes: 6 additions & 3 deletions asyncz/executors/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Set
from typing import TYPE_CHECKING, Any, List, Set, cast

from asyncz.executors.base import BaseExecutor, run_coroutine_task, run_task
from asyncz.utils import iscoroutinefunction_partial
Expand Down Expand Up @@ -35,6 +35,7 @@ def shutdown(self, wait: bool = True) -> None:
def do_send_task(self, task: "TaskType", run_times: List[datetime]) -> None:
task_id = task.id
assert task_id is not None, "Cannot send decorator type task"
assert self.logger is not None, "logger is None"

def callback(fn: Any) -> None:
self.pending_futures.discard(fn)
Expand All @@ -46,11 +47,13 @@ def callback(fn: Any) -> None:
self.run_task_success(task_id, events)

if iscoroutinefunction_partial(task.fn):
coroutine = run_coroutine_task(task, task.store_alias, run_times, self.logger) # type: ignore
coroutine = run_coroutine_task(
task, cast(str, task.store_alias), run_times, self.logger
)
fn = self.event_loop.create_task(coroutine)
else:
fn = self.event_loop.run_in_executor(
None, run_task, task, task.store_alias, run_times, self.logger
None, run_task, task, cast(str, task.store_alias), run_times, self.logger
)

fn.add_done_callback(callback)
Expand Down
41 changes: 18 additions & 23 deletions asyncz/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
import logging
import sys
import traceback
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import timezone as tz
from threading import RLock
from traceback import format_tb
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast

from loguru import logger
from loguru._logger import Logger
from typing import TYPE_CHECKING, Any, Callable, Dict, List, cast

from asyncz.events import TaskExecutionEvent
from asyncz.events.constants import TASK_ERROR, TASK_EXECUTED, TASK_MISSED
from asyncz.exceptions import MaximumInstancesError
from asyncz.executors.types import ExecutorType
from asyncz.state import BaseStateExtra

if TYPE_CHECKING:
from asyncz.tasks.types import TaskType


class BaseExecutor(BaseStateExtra, ExecutorType):
class BaseExecutor(ExecutorType):
"""
Base model for the executors. It defines the interface for all the executors used by the Asyncz.
Asyncz uses loguru for its logging as it is more descriptive and intuitive.
"""

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
super().__init__()
self.instances: Dict[str, int] = defaultdict(lambda: 0)

def start(self, scheduler: Any, alias: str) -> None:
Expand All @@ -42,8 +39,9 @@ def start(self, scheduler: Any, alias: str) -> None:
"""
self.scheduler = scheduler
self.lock: RLock = scheduler.create_lock()
self.logger: Any = logger
self.logger.bind(logger_name=f"asyncz.executors.{alias}")
self.logger_name = f"asyncz.executors.{alias}"
# send to tasks
self.logger = self.scheduler.loggers[self.logger_name]

def shutdown(self, wait: bool = True) -> None:
"""
Expand Down Expand Up @@ -92,23 +90,23 @@ def run_task_error(self, task_id: str) -> None:
if self.instances[task_id] == 0:
del self.instances[task_id]

self.logger.opt(exception=True).error(f"Error running task {task_id}", exc_info=True)
self.scheduler.loggers[self.logger_name].error(
f"Error running task {task_id}", exc_info=True
)


def run_task(
task: "TaskType",
store_alias: str,
run_times: List[datetime],
_logger: Optional[Any] = None,
logger: logging.Logger,
) -> List[TaskExecutionEvent]:
"""
Called by executors to run the task. Returns a list of scheduler events to be dispatched by the
scheduler.
The run task is made to run in async mode.
"""
if not _logger:
_logger = logger

events = []
for run_time in run_times:
Expand All @@ -125,10 +123,10 @@ def run_task(
scheduled_run_time=run_time,
)
)
_logger.warning(f"Run time of task '{task}' was missed by {difference}")
logger.warning(f"Run time of task '{task}' was missed by {difference}")
continue

_logger.info(f'Running task "{task}" (scheduled at {run_time})')
logger.info(f'Running task "{task}" (scheduled at {run_time})')
try:
return_value = cast(Callable[..., Any], task.fn)(*task.args, **task.kwargs)
except Exception as exc:
Expand Down Expand Up @@ -156,25 +154,22 @@ def run_task(
return_value=return_value,
)
)
_logger.info(f"Task '{task}' executed successfully.")
logger.info(f"Task '{task}' executed successfully.")
return events


async def run_coroutine_task(
task: "TaskType",
store_alias: str,
run_times: List[datetime],
_logger: Optional["Logger"] = None,
logger: logging.Logger,
) -> List[TaskExecutionEvent]:
"""
Called by executors to run the task. Returns a list of scheduler events to be dispatched by the
scheduler.
The run task is made to run in async mode.
"""
if not _logger:
_logger = logger # type: ignore

events = []
for run_time in run_times:
mistrigger_grace_time = task.mistrigger_grace_time
Expand All @@ -190,10 +185,10 @@ async def run_coroutine_task(
scheduled_run_time=run_time,
)
)
_logger.warning(f"Run time of task '{task}' was missed by {difference}") # type: ignore
logger.warning(f"Run time of task '{task}' was missed by {difference}")
continue

_logger.info(f'Running task "{task}" (scheduled at {run_time})') # type: ignore
logger.info(f'Running task "{task}" (scheduled at {run_time})')
try:
return_value = await cast(Callable[..., Any], task.fn)(*task.args, **task.kwargs)
except Exception as exc:
Expand Down Expand Up @@ -221,6 +216,6 @@ async def run_coroutine_task(
return_value=return_value,
)
)
_logger.info(f"Task '{task}' executed successfully") # type: ignore
logger.info(f"Task '{task}' executed successfully")

return events
5 changes: 3 additions & 2 deletions asyncz/executors/debug.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, List, cast

from asyncz.executors.base import BaseExecutor, run_task

Expand All @@ -19,8 +19,9 @@ def do_send_task(
run_times: List[datetime],
) -> None:
assert task.id is not None, "Cannot send decorator type task"
assert self.logger is not None, "logger is None"
try:
events = run_task(task, task.store_alias, run_times, self.logger) # type: ignore
events = run_task(task, cast(str, task.store_alias), run_times, self.logger)
except Exception:
self.run_task_error(task.id)
else:
Expand Down
24 changes: 4 additions & 20 deletions asyncz/executors/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def callback(fn: Any) -> None:
self.run_task_success(task_id, fn.result())

try:
fn = self.pool.submit(run_task, task, task.store_alias, run_times)
fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)
except (BrokenProcessPool, TypeError):
self.logger.warning("Process pool is broken. Replacing pool with a new instance.")
self.scheduler.loggers[self.logger_name].warning(
"Process pool is broken. Replacing pool with a new instance."
)
self.pool = self.pool.__class__(self.pool.max_workers)
fn = self.pool.submit(run_task, task, task.store_alias, run_times, self.logger)

Expand All @@ -68,21 +70,3 @@ def __init__(self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **k
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
super().__init__(pool, **kwargs)


class ProcessPoolExecutor(BasePoolExecutor):
"""
An executor that runs tasks in a concurrent.futures process pool.
Args:
max_workers: The maximum number of spawned processes.
pool_kwargs: Dict of keyword arguments to pass to the underlying
ProcessPoolExecutor constructor.
"""

def __init__(
self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any
) -> None:
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super().__init__(pool, **kwargs)
81 changes: 81 additions & 0 deletions asyncz/executors/process_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import concurrent.futures
from multiprocessing import Pipe, connection
from threading import Thread
from typing import TYPE_CHECKING, Any, Optional, cast

from asyncz.executors.pool import BasePoolExecutor

if TYPE_CHECKING:
import logging

from asyncz.schedulers.types import SchedulerType

# because multiprocessing is heavy weight, it is split out from pool


class ProcessPoolLoggerSenderFnWrap:
def __init__(self, send_pipe: connection.Connection, fn_name: str) -> None:
self.send_pipe = send_pipe
self.fn_name = fn_name

def __call__(self, *args: Any, **kwargs: Any) -> None:
self.send_pipe.send((self.fn_name, args, kwargs))


class ProcessPoolLoggerSender:
def __init__(self, send_pipe: connection.Connection) -> None:
self.send_pipe = send_pipe

def __getattr__(self, item: str) -> Any:
if item.startswith("_"):
return object.__getattr__(self, item)
return ProcessPoolLoggerSenderFnWrap(self.send_pipe, item)


class ProcessPoolReceiver(Thread):
def __init__(self, receive_pipe: connection.Connection, logger: "logging.Logger") -> None:
super().__init__()
self.receive_pipe = receive_pipe
self.logger = logger
self.start()

def run(self) -> None:
try:
while True:
fn_name, args, kwargs = self.receive_pipe.recv()
if not fn_name.startswith("_"):
getattr(self.logger, fn_name)(*args, **kwargs)
except EOFError:
pass


class ProcessPoolExecutor(BasePoolExecutor):
"""
An executor that runs tasks in a concurrent.futures process pool.
Args:
max_workers: The maximum number of spawned processes.
pool_kwargs: Dict of keyword arguments to pass to the underlying
ProcessPoolExecutor constructor.
"""

def __init__(
self, max_workers: int = 10, pool_kwargs: Optional[Any] = None, **kwargs: Any
) -> None:
self.receive_pipe, self.send_pipe = Pipe(False)
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super().__init__(pool, **kwargs)

def start(self, scheduler: "SchedulerType", alias: str) -> None:
super().start(scheduler, alias)
assert self.logger is not None, "logger is None"
# move the old logger to logger_receiver
self.logger_receiver = ProcessPoolReceiver(self.receive_pipe, self.logger)
# and send the process logger instead
self.logger = cast("logging.Logger", ProcessPoolLoggerSender(self.send_pipe))

def shutdown(self, wait: bool = True) -> None:
super().shutdown(wait=wait)
self.send_pipe.close()
self.logger_receiver.join()
Loading

0 comments on commit 6e842cb

Please sign in to comment.