Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- improved tests
- bugfixes
- updated store docs
  • Loading branch information
devkral committed Dec 30, 2024
1 parent 8d0a41e commit 099d3f0
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 12 deletions.
3 changes: 2 additions & 1 deletion asyncz/schedulers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,8 @@ def real_add_task(
if start_task and self.state == SchedulerState.STATE_RUNNING:
self.wakeup()

def resolve_load_plugin(self, module_name: str) -> Any:
@classmethod
def resolve_load_plugin(cls, module_name: str) -> Any:
"""
Resolve the plugin from its module and attrs.
"""
Expand Down
7 changes: 4 additions & 3 deletions asyncz/schedulers/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"date": "asyncz.triggers.date:DateTrigger",
"interval": "asyncz.triggers.interval:IntervalTrigger",
"cron": "asyncz.triggers.cron.trigger:CronTrigger",
"and": "asyncz.triggers.combining:AndTrigger",
"or": "asyncz.triggers.combining:OrTrigger",
"and": "asyncz.triggers.combination:AndTrigger",
"or": "asyncz.triggers.combination:OrTrigger",
"shutdown": "asyncz.triggers.shutdown:ShutdownTrigger",
}

Expand All @@ -16,8 +16,9 @@
"asyncio": "asyncz.executors.asyncio:AsyncIOExecutor",
}


stores: dict[str, str] = {
"memory": "asyncz.stores.memory:MemoryTaskStore",
"memory": "asyncz.stores.memory:MemoryStore",
"mongodb": "asyncz.stores.mongo:MongoDBStore",
"redis": "asyncz.stores.redis:RedisStore",
"sqlalchemy": "asyncz.stores.sqlalchemy:SQLAlchemyStore",
Expand Down
2 changes: 1 addition & 1 deletion asyncz/stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, **kwargs: Any) -> None:

def create_lock(self) -> LockProtectedProtocol:
"""
Creates a reentrant lock object.
Creates a lock protector.
"""
if not self.scheduler or not self.scheduler.lock_path:
return NullLockProtected()
Expand Down
1 change: 1 addition & 0 deletions docs/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Fixed

- `and` was mapped to the wrong trigger.
- Some defaults had wrong module pathes.
- Missing export of NativeAsyncIOScheduler from schedulers.

## 0.12.0
Expand Down
37 changes: 34 additions & 3 deletions docs/stores.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ from asyncz.stores.memory import MemoryStore
**Store Alias** - `redis`

```python
from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.redis import RedisStore
# assuming redis runs on localhost
scheduler = AsyncIOScheduler(stores={"default": RedisStore()})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "redis"}})
```

### Parameters
Expand All @@ -64,14 +69,29 @@ highest available.

<sup>Default: `pickle.HIGHEST_PROTOCOL`</sup>

* **host** - Host to connect.

<sup>Default: `localhost`</sup>

* **port** - Port to connect.

<sup>Default: `6379`</sup>

## MongoDBStore

**Store Alias** - `mongo`

```python
from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
# assuming mongo db runs on localhost
scheduler = AsyncIOScheduler(stores={"default": MongoDBStore()})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "mongo"}})
```



### Parameters

* **database** - The database to store the tasks.
Expand All @@ -91,20 +111,31 @@ available.

<sup>Default: `pickle.HIGHEST_PROTOCOL`</sup>

* **host** - Host to connect.

<sup>Default: `localhost`</sup>

* **port** - Port to connect.

<sup>Default: `27017`</sup>


## SQLAlchemyStore

**Store Alias** - `sqlalchemy`

```python
from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.sqlalchemy import SQLAlchemyStore

scheduler = AsyncIOScheduler(stores={"default": SQLAlchemyStore(database="sqlite:///./test_suite.sqlite3")})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "sqlalchemy", "database": "sqlite:///./test_suite.sqlite3"}})
```

### Parameters

* **database** - The database to store the tasks. Can be string url or engine

<sup>Default: `asyncz`</sup>
* **database** - The database to store the tasks. Can be string url or engine.

* **tablename** - The tablename.

Expand Down
9 changes: 7 additions & 2 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextlib
import os
import time

import pytest
Expand Down Expand Up @@ -31,11 +33,11 @@ def reset_job_called():
@pytest.mark.flaky(reruns=2)
def test_simulated_multiprocess():
scheduler1 = AsyncIOScheduler(
stores={"default": SQLAlchemyStore(database="sqlite:///./test_suite.sqlite3")},
stores={"default": SQLAlchemyStore(database="sqlite:///./test_mp.sqlite3")},
lock_path="/tmp/{store}_asyncz_test.pid",
)
scheduler2 = AsyncIOScheduler(
stores={"default": SQLAlchemyStore(database="sqlite:///./test_suite.sqlite3")},
stores={"default": SQLAlchemyStore(database="sqlite:///./test_mp.sqlite3")},
lock_path="/tmp/{store}_asyncz_test.pid",
)

Expand Down Expand Up @@ -67,5 +69,8 @@ def test_simulated_multiprocess():
# fix CancelledError, by giving scheduler more time to send the tasks to the pool
# if the pool is closed, newly submitted tasks are cancelled
time.sleep(1)
scheduler1.stores["default"].metadata.drop_all(scheduler1.stores["default"].engine)
with contextlib.suppress(FileNotFoundError):
os.remove("./test_mp.sqlite3")

assert dummy_job_called == 3
12 changes: 11 additions & 1 deletion tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pickle
import time
from datetime import datetime, timedelta, tzinfo
from inspect import isclass
from threading import Thread
from typing import Any, Optional, Union
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -38,6 +39,7 @@
)
from asyncz.executors.base import BaseExecutor
from asyncz.executors.debug import DebugExecutor
from asyncz.schedulers import defaults
from asyncz.schedulers.asyncio import AsyncIOScheduler, NativeAsyncIOScheduler
from asyncz.schedulers.base import BaseScheduler, ClassicLogging
from asyncz.stores.base import BaseStore
Expand Down Expand Up @@ -321,7 +323,7 @@ def test_start(self, real_add_task, dispatch_events, scheduler, create_task):
assert "default" in scheduler.executors
assert "default" in scheduler.stores

scheduler.real_add_task.assert_called_once_with(task, False, True)
scheduler.real_add_task.assert_called_once_with(task, False, True, None)
assert scheduler.pending_tasks == []

assert scheduler.dispatch_event.call_count == 3
Expand Down Expand Up @@ -1315,3 +1317,11 @@ async def lifespan_gen():
assert is_setup is True
assert call_count == 1
assert is_exited is True


@pytest.mark.parametrize(
"module_path",
[*defaults.executors.values(), *defaults.stores.values(), *defaults.triggers.values()],
)
def test_defaults_actually_load(module_path):
assert isclass(BaseScheduler.resolve_load_plugin(module_path))
17 changes: 16 additions & 1 deletion tests/test_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from asyncz.exceptions import ConflictIdError, TaskLookupError
from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.memory import MemoryStore


Expand Down Expand Up @@ -225,7 +226,7 @@ def test_update_task_clear_next_runtime_when_run_times_are_initially_the_same(
store, create_add_task, next_run_time, timezone, index
):
tasks = [
create_add_task(store, dummy_task, datetime(2020, 2, 26), "task%d" % i) for i in range(3)
create_add_task(store, dummy_task, datetime(2020, 2, 26), f"task{i}") for i in range(3)
]
tasks[index].next_run_time = timezone.localize(next_run_time) if next_run_time else None
store.update_task(tasks[index])
Expand Down Expand Up @@ -327,3 +328,17 @@ def test_mongodb_null_database():
mongodb = pytest.importorskip("asyncz.stores.mongo")
exc = pytest.raises(ValueError, mongodb.MongoDBStore, database="")
assert "database" in str(exc.value)


@pytest.mark.parametrize(
"param",
[
{"type": "sqlalchemy", "database": "sqlite:///:memory:"},
{"type": "memory"},
],
ids=["sqlalchemy", "memory"],
)
def test_store_as_type(param):
scheduler = AsyncIOScheduler(stores={"default": param})
scheduler.start()
scheduler.shutdown()

0 comments on commit 099d3f0

Please sign in to comment.