Skip to content

Commit

Permalink
feat:add counters
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed Jan 10, 2024
1 parent 706dfac commit 606ccf1
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 30 deletions.
6 changes: 5 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog

## v1.2.5 🌈
## v1.3.0 🌈

### 🚀 Features

- Add to CronTask and RepeatableTask counters for successful/failed runs.

### 🧰 Maintenance

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "django-tasks-scheduler"
packages = [
{ include = "scheduler" },
]
version = "1.2.5"
version = "1.3.0"
description = "An async job scheduler for django using redis"
readme = "README.md"
keywords = ["redis", "django", "background-jobs", "job-queue", "task-queue", "redis-queue", "scheduled-jobs"]
Expand Down
20 changes: 16 additions & 4 deletions scheduler/admin/task_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,26 @@ class JobKwargInline(HiddenMixin, GenericStackedInline):


_LIST_DISPLAY_EXTRA = dict(
CronTask=('cron_string', 'next_run',),
CronTask=('cron_string', 'next_run', 'successful_runs', 'last_successful_run', 'failed_runs', 'last_failed_run',),
ScheduledTask=('scheduled_time',),
RepeatableTask=('scheduled_time', 'interval_display',),
RepeatableTask=(
'scheduled_time', 'interval_display', 'successful_runs', 'last_successful_run', 'failed_runs',
'last_failed_run',),
)
_FIELDSET_EXTRA = dict(
CronTask=('cron_string', 'repeat', 'timeout', 'result_ttl',),
CronTask=(
'cron_string', 'timeout', 'result_ttl',
('successful_runs', 'last_successful_run',),
('failed_runs', 'last_failed_run',),
),
ScheduledTask=('scheduled_time', 'timeout', 'result_ttl'),
RepeatableTask=('scheduled_time', ('interval', 'interval_unit',), 'repeat', 'timeout', 'result_ttl',),
RepeatableTask=(
'scheduled_time',
('interval', 'interval_unit',),
'repeat', 'timeout', 'result_ttl',
('successful_runs', 'last_successful_run',),
('failed_runs', 'last_failed_run',),
),
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Generated by Django 5.0.1 on 2024-01-10 17:39

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('scheduler', '0016_rename_jobarg_taskarg_rename_jobkwarg_taskkwarg_and_more'),
]

operations = [
migrations.RemoveField(
model_name='crontask',
name='repeat',
),
migrations.AddField(
model_name='crontask',
name='failed_runs',
field=models.PositiveIntegerField(default=0, help_text='Number of times the task has failed', verbose_name='failed runs'),
),
migrations.AddField(
model_name='crontask',
name='last_failed_run',
field=models.DateTimeField(blank=True, help_text='Last time the task has failed', null=True, verbose_name='last failed run'),
),
migrations.AddField(
model_name='crontask',
name='last_successful_run',
field=models.DateTimeField(blank=True, help_text='Last time the task has succeeded', null=True, verbose_name='last successful run'),
),
migrations.AddField(
model_name='crontask',
name='successful_runs',
field=models.PositiveIntegerField(default=0, help_text='Number of times the task has succeeded', verbose_name='successful runs'),
),
migrations.AddField(
model_name='repeatabletask',
name='failed_runs',
field=models.PositiveIntegerField(default=0, help_text='Number of times the task has failed', verbose_name='failed runs'),
),
migrations.AddField(
model_name='repeatabletask',
name='last_failed_run',
field=models.DateTimeField(blank=True, help_text='Last time the task has failed', null=True, verbose_name='last failed run'),
),
migrations.AddField(
model_name='repeatabletask',
name='last_successful_run',
field=models.DateTimeField(blank=True, help_text='Last time the task has succeeded', null=True, verbose_name='last successful run'),
),
migrations.AddField(
model_name='repeatabletask',
name='successful_runs',
field=models.PositiveIntegerField(default=0, help_text='Number of times the task has succeeded', verbose_name='successful runs'),
),
]
47 changes: 37 additions & 10 deletions scheduler/models/scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def failure_callback(job, connection, result, *args, **kwargs):
mail_admins(f'Task {task.id}/{task.name} has failed',
'See django-admin for logs', )
task.job_id = None
if isinstance(task, (CronTask, RepeatableTask)):
task.failed_runs += 1
task.last_failed_run = timezone.now()
task.save(schedule_job=True)


Expand All @@ -51,6 +54,9 @@ def success_callback(job, connection, result, *args, **kwargs):
if task is None:
return
task.job_id = None
if isinstance(task, (CronTask, RepeatableTask)):
task.successful_runs += 1
task.last_successful_run = timezone.now()
task.save(schedule_job=True)


Expand All @@ -76,9 +82,6 @@ class BaseTask(models.Model):
job_id = models.CharField(
_('job id'), max_length=128, editable=False, blank=True, null=True,
help_text=_('Current job_id on queue'))
repeat = models.PositiveIntegerField(
_('repeat'), blank=True, null=True,
help_text=_('Number of times to run the job. Leaving this blank means it will run forever.'), )
at_front = models.BooleanField(
_('At front'), default=False, blank=True, null=True,
help_text=_('When queuing the job, add it in the front of the queue'), )
Expand All @@ -104,14 +107,14 @@ def is_scheduled(self) -> bool:
"""Check whether a next job for this task is queued/scheduled to be executed"""
if self.job_id is None: # no job_id => is not scheduled
return False
# check whether job_id is in scheduled/enqueued/active jobs
# check whether job_id is in scheduled/queued/active jobs
scheduled_jobs = self.rqueue.scheduled_job_registry.get_job_ids()
enqueued_jobs = self.rqueue.get_job_ids()
active_jobs = self.rqueue.started_job_registry.get_job_ids()
res = ((self.job_id in scheduled_jobs)
or (self.job_id in enqueued_jobs)
or (self.job_id in active_jobs))
# If the job_id is not scheduled/enqueued/started,
# If the job_id is not scheduled/queued/started,
# update the job_id to None. (The job_id belongs to a previous run which is completed)
if not res:
self.job_id = None
Expand Down Expand Up @@ -152,7 +155,6 @@ def _enqueue_args(self) -> Dict:
"""
res = dict(
meta=dict(
repeat=self.repeat,
task_type=self.TASK_TYPE,
scheduled_task_id=self.id,
),
Expand Down Expand Up @@ -249,14 +251,18 @@ def to_dict(self) -> Dict:
for arg in self.callable_kwargs.all()],
enabled=self.enabled,
queue=self.queue,
repeat=self.repeat,
repeat=getattr(self, 'repeat', None),
at_front=self.at_front,
timeout=self.timeout,
result_ttl=self.result_ttl,
cron_string=getattr(self, 'cron_string', None),
scheduled_time=self._schedule_time().isoformat(),
interval=getattr(self, 'interval', None),
interval_unit=getattr(self, 'interval_unit', None),
successful_runs=getattr(self, 'successful_runs', None),
failed_runs=getattr(self, 'failed_runs', None),
last_successful_run=getattr(self, 'last_successful_run', None),
last_failed_run=getattr(self, 'last_failed_run', None),
)
return res

Expand Down Expand Up @@ -315,8 +321,25 @@ class Meta:
abstract = True


class RepeatableMixin(models.Model):
failed_runs = models.PositiveIntegerField(
_('failed runs'), default=0,
help_text=_('Number of times the task has failed'), )
successful_runs = models.PositiveIntegerField(
_('successful runs'), default=0,
help_text=_('Number of times the task has succeeded'), )
last_successful_run = models.DateTimeField(
_('last successful run'), blank=True, null=True,
help_text=_('Last time the task has succeeded'), )
last_failed_run = models.DateTimeField(
_('last failed run'), blank=True, null=True,
help_text=_('Last time the task has failed'), )

class Meta:
abstract = True


class ScheduledTask(ScheduledTimeMixin, BaseTask):
repeat = None
TASK_TYPE = 'ScheduledTask'

def ready_for_schedule(self) -> bool:
Expand All @@ -330,7 +353,7 @@ class Meta:
ordering = ('name',)


class RepeatableTask(ScheduledTimeMixin, BaseTask):
class RepeatableTask(RepeatableMixin, ScheduledTimeMixin, BaseTask):
class TimeUnits(models.TextChoices):
SECONDS = 'seconds', _('seconds')
MINUTES = 'minutes', _('minutes')
Expand All @@ -342,6 +365,9 @@ class TimeUnits(models.TextChoices):
interval_unit = models.CharField(
_('interval unit'), max_length=12, choices=TimeUnits.choices, default=TimeUnits.HOURS
)
repeat = models.PositiveIntegerField(
_('repeat'), blank=True, null=True,
help_text=_('Number of times to run the job. Leaving this blank means it will run forever.'), )
TASK_TYPE = 'RepeatableTask'

def clean(self):
Expand Down Expand Up @@ -384,6 +410,7 @@ def interval_seconds(self):
def _enqueue_args(self):
res = super(RepeatableTask, self)._enqueue_args()
res['meta']['interval'] = self.interval_seconds()
res['meta']['repeat'] = self.repeat
return res

def _schedule_time(self):
Expand All @@ -409,7 +436,7 @@ class Meta:
ordering = ('name',)


class CronTask(BaseTask):
class CronTask(RepeatableMixin, BaseTask):
TASK_TYPE = 'CronTask'

cron_string = models.CharField(
Expand Down
18 changes: 12 additions & 6 deletions scheduler/tests/test_cron_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,34 @@ def test_clean_cron_string_invalid(self):
with self.assertRaises(ValidationError):
task.clean_cron_string()

def test_repeat(self):
task = task_factory(CronTask, repeat=10)
entry = _get_job_from_scheduled_registry(task)
self.assertEqual(entry.meta['repeat'], 10)

def test_check_rescheduled_after_execution(self):
task = task_factory(CronTask, )
queue = task.rqueue
first_run_id = task.job_id
entry = queue.fetch_job(first_run_id)
queue.run_sync(entry)
task.refresh_from_db()
self.assertEqual(task.failed_runs, 0)
self.assertIsNone(task.last_failed_run)
self.assertEqual(task.successful_runs, 1)
self.assertIsNotNone(task.last_successful_run)
self.assertTrue(task.is_scheduled())
self.assertNotEqual(task.job_id, first_run_id)

def test_check_rescheduled_after_failed_execution(self):
task = task_factory(CronTask, callable_name="scheduler.tests.jobs.scheduler.tests.jobs.test_job", )
task = task_factory(
CronTask,
callable_name="scheduler.tests.jobs.scheduler.tests.jobs.test_job",
)
queue = task.rqueue
first_run_id = task.job_id
entry = queue.fetch_job(first_run_id)
queue.run_sync(entry)
task.refresh_from_db()
self.assertEqual(task.failed_runs, 1)
self.assertIsNotNone(task.last_failed_run)
self.assertEqual(task.successful_runs, 0)
self.assertIsNone(task.last_successful_run)
self.assertTrue(task.is_scheduled())
self.assertNotEqual(task.job_id, first_run_id)

Expand Down
33 changes: 30 additions & 3 deletions scheduler/tests/test_repeatable_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,49 @@ def test_repeat_none_interval_2_min(self):
self.assertTrue(job.is_scheduled())

def test_check_rescheduled_after_execution(self):
task = task_factory(self.TaskModelClass, scheduled_time=timezone.now() + timedelta(seconds=1))
task = task_factory(self.TaskModelClass, scheduled_time=timezone.now() + timedelta(seconds=1), repeat=10)
queue = task.rqueue
first_run_id = task.job_id
entry = queue.fetch_job(first_run_id)
queue.run_sync(entry)
task.refresh_from_db()
self.assertEqual(task.failed_runs, 0)
self.assertIsNone(task.last_failed_run)
self.assertEqual(task.successful_runs, 1)
self.assertIsNotNone(task.last_successful_run)
self.assertTrue(task.is_scheduled())
self.assertNotEqual(task.job_id, first_run_id)

def test_check_rescheduled_after_execution_failed_job(self):
task = task_factory(self.TaskModelClass, callable_name='scheduler.tests.jobs.failing_job',
scheduled_time=timezone.now() + timedelta(seconds=1))
task = task_factory(
self.TaskModelClass, callable_name='scheduler.tests.jobs.failing_job',
scheduled_time=timezone.now() + timedelta(seconds=1),
repeat=10, )
queue = task.rqueue
first_run_id = task.job_id
entry = queue.fetch_job(first_run_id)
queue.run_sync(entry)
task.refresh_from_db()
self.assertEqual(task.failed_runs, 1)
self.assertIsNotNone(task.last_failed_run)
self.assertEqual(task.successful_runs, 0)
self.assertIsNone(task.last_successful_run)
self.assertTrue(task.is_scheduled())
self.assertNotEqual(task.job_id, first_run_id)

def test_check_not_rescheduled_after_last_repeat(self):
task = task_factory(
self.TaskModelClass,
scheduled_time=timezone.now() + timedelta(seconds=1),
repeat=1,
)
queue = task.rqueue
first_run_id = task.job_id
entry = queue.fetch_job(first_run_id)
queue.run_sync(entry)
task.refresh_from_db()
self.assertEqual(task.failed_runs, 0)
self.assertIsNone(task.last_failed_run)
self.assertEqual(task.successful_runs, 1)
self.assertIsNotNone(task.last_successful_run)
self.assertNotEqual(task.job_id, first_run_id)
10 changes: 5 additions & 5 deletions scheduler/tests/testtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def task_factory(cls, callable_name: str = 'scheduler.tests.jobs.test_job', inst
repeat=None,
scheduled_time=timezone.now() + timedelta(days=1), ))
elif cls == CronTask:
values.update(dict(cron_string="0 0 * * *", repeat=None, ))
values.update(dict(cron_string="0 0 * * *", ))
values.update(kwargs)
if instance_only:
instance = cls(**values)
Expand All @@ -73,10 +73,10 @@ def taskarg_factory(cls, **kwargs):
return instance


def _get_job_from_scheduled_registry(django_job: BaseTask):
jobs_to_schedule = django_job.rqueue.scheduled_job_registry.get_job_ids()
entry = next(i for i in jobs_to_schedule if i == django_job.job_id)
return django_job.rqueue.fetch_job(entry)
def _get_job_from_scheduled_registry(django_task: BaseTask):
jobs_to_schedule = django_task.rqueue.scheduled_job_registry.get_job_ids()
entry = next(i for i in jobs_to_schedule if i == django_task.job_id)
return django_task.rqueue.fetch_job(entry)


def _get_executions(django_job: BaseTask):
Expand Down

0 comments on commit 606ccf1

Please sign in to comment.