Skip to content

Commit

Permalink
debugging celery on binding upload
Browse files Browse the repository at this point in the history
  • Loading branch information
cmatKhan committed Jan 5, 2024
1 parent cb47c18 commit d761008
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 101 deletions.
6 changes: 4 additions & 2 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,10 @@
CELERY_RESULT_SERIALIZER = "json"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_TIME_LIMIT = 5 * 60
CELERY_TASK_TIME_LIMIT = 20 * 60
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_SOFT_TIME_LIMIT = 60
CELERY_TASK_SOFT_TIME_LIMIT = 15 * 60
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-scheduler
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-send-task-events
Expand Down Expand Up @@ -323,6 +323,8 @@
),
"DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),
"DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema",
"DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.PageNumberPagination",
"PAGE_SIZE": 100,
}

# django-cors-headers - https://github.com/adamchainz/django-cors-headers#setup
Expand Down
10 changes: 9 additions & 1 deletion yeastregulatorydb/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,15 @@ def datasource(db) -> DataSource:
def fileformat_data() -> dict:
# harb, hu both csvs
return {
"array": ({"gene_id": "int", "effect": "float", "pval": "float"}, ",", "effect", 0.0, "pval", 1.0, "gene_id"),
"array": (
{"gene_id": "int", "effect": "float", "pval": "float"},
",",
"effect",
0.0,
"pval",
1.0,
"gene_id",
),
"qbed": (
{"chr": "str", "start": "int", "end": "int", "depth": "int", "strand": "str"},
"\t",
Expand Down
105 changes: 61 additions & 44 deletions yeastregulatorydb/regulatory_data/api/views/BindingViewSet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.conf import settings
from django.core.cache import cache
from django.core.files.storage import default_storage
from django.db import transaction
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import viewsets
Expand Down Expand Up @@ -27,53 +28,69 @@ class BindingViewSet(BulkUploadMixin, UpdateModifiedMixin, viewsets.ModelViewSet

@transaction.atomic
def perform_create(self, serializer):
instance = serializer.save()
try:
instance = serializer.save()

# if the source.name is in the settings NULL_BINDING_FILE_DATASOURCES,
# then the `file` needs to be added to the promotersetsig table
# with the FK to the binding instance
if instance.source.name in settings.NULL_BINDING_FILE_DATASOURCES:
promotersetsig_serializer = PromoterSetSigSerializer(
data={
"binding": instance.id,
"fileformat": instance.source.fileformat.id,
"file": self.request.data.get("file"),
},
context={"request": self.request},
)
promotersetsig_serializer.is_valid(raise_exception=True)
promotersetsiginstance = promotersetsig_serializer.save()
# if the promotersetsiginstance was successfully saved, then try
# to launch a rankresponse task
lock_id = "add_data_lock"
acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) # flake8: noqa: E731
release_lock = lambda: cache.delete(lock_id) # flake8: noqa: E731
# if the source.name is in the settings NULL_BINDING_FILE_DATASOURCES,
# then the `file` needs to be added to the promotersetsig table
# with the FK to the binding instance
if instance.source.name in settings.NULL_BINDING_FILE_DATASOURCES:
promotersetsig_serializer = PromoterSetSigSerializer(
data={
"binding": instance.id,
"fileformat": instance.source.fileformat.id,
"file": self.request.data.get("file"),
},
context={"request": self.request},
)
promotersetsig_serializer.is_valid(raise_exception=True)
promotersetsiginstance = promotersetsig_serializer.save()
# if the promotersetsiginstance was successfully saved, then try
# to launch a rankresponse task
lock_id = "add_data_lock"
acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) # flake8: noqa: E731
release_lock = lambda: cache.delete(lock_id) # flake8: noqa: E731

if acquire_lock():
if acquire_lock():
promotersetsiginstance.rankresponse_processing = True
promotersetsiginstance.save()
try:
rank_response_task.delay(promotersetsiginstance.id, self.request.user.id)
finally:
release_lock()

# if the source.assay is recognized as one associated with a task,
# set the promotersetsig_processing attribute
promotersetsig_format = None
if instance.source.assay == "chipexo":
if instance.source.name == "chipexo_pugh_allevents":
promotersetsig_format = settings.CHIPEXO_PROMOTER_SIG_FORMAT
elif instance.source.assay == "callingcards":
promotersetsig_format = settings.CALLINGCARDS_PROMOTER_SIG_FORMAT

if promotersetsig_format:
# this attribute is added to the returned serialized data
instance.promotersetsig_processing = True
try:
rank_response_task(promotersetsiginstance.id, self.request.user.id)
finally:
release_lock()
lock_id = "add_data_lock"
acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) # flake8: noqa: E731
release_lock = lambda: cache.delete(lock_id) # flake8: noqa: E731

# if the source.assay is recognized as one associated with a task,
# set the promotersetsig_processing attribute
promotersetsig_format = None
if instance.source.assay == "chipexo":
if instance.source.name == "chipexo_pugh_allevents":
promotersetsig_format = settings.CHIPEXO_PROMOTER_SIG_FORMAT
elif instance.source.assay == "callingcards":
promotersetsig_format = settings.CALLINGCARDS_PROMOTER_SIG_FORMAT
if acquire_lock():
try:
promotersetsig_rankedresponse_chained(instance.id, self.request.user.id, promotersetsig_format)
finally:
release_lock()
except:
# Delete the file of the instance if an exception occurs
if instance.file and default_storage.exists(instance.file.name):
default_storage.delete(instance.file.name)

if promotersetsig_format:
# this attribute is added to the returned serialized data
instance.promotersetsig_processing = True
lock_id = "add_data_lock"
acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) # flake8: noqa: E731
release_lock = lambda: cache.delete(lock_id) # flake8: noqa: E731
# Delete the file of the promotersetsiginstance if it exists and an exception occurs
if (
promotersetsiginstance
and promotersetsiginstance.file
and default_storage.exists(promotersetsiginstance.file.name)
):
default_storage.delete(promotersetsiginstance.file.name)

if acquire_lock():
try:
promotersetsig_rankedresponse_chained(instance.id, self.request.user.id, promotersetsig_format)
finally:
release_lock()
raise
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PromoterSetSigViewSet(UpdateModifiedMixin, viewsets.ModelViewSet):
A viewset for viewing and editing PromoterSetSig instances.
"""

queryset = PromoterSetSig.objects.all()
queryset = PromoterSetSig.objects.all().order_by("id")
authentication_classes = [SessionAuthentication, TokenAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = PromoterSetSigSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RankResponseViewSet(UpdateModifiedMixin, viewsets.ModelViewSet):
A viewset for viewing and editing RankResponse instances.
"""

queryset = RankResponse.objects.all()
queryset = RankResponse.objects.all().order_by("id")
authentication_classes = [SessionAuthentication, TokenAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = RankResponseSerializer
Expand Down
29 changes: 21 additions & 8 deletions yeastregulatorydb/regulatory_data/models/Binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,30 @@ class Meta:
db_table = "binding"
unique_together = ("regulator", "batch", "replicate", "source")

# pylint: disable=R0801
def save(self, *args, **kwargs):
# Store the old file path
old_file_name = self.file.name if self.file else None
is_create = self.pk is None
super().save(*args, **kwargs)
self.update_file_name("file", f"binding/{self.source.name}")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
if old_file_name and old_file_name != new_file_name:
default_storage.delete(old_file_name)
if is_create:
old_file_name = self.file.name if self.file else None
self.update_file_name("file", f"binding/{self.source.name}")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint: disable=R0801
# def save(self, *args, **kwargs):
# # Store the old file path
# old_file_name = self.file.name if self.file else None
# super().save(*args, **kwargs)
# self.update_file_name("file", f"binding/{self.source.name}")
# new_file_name = self.file.name
# super().save(update_fields=["file"])
# # If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint: enable=R0801

Expand Down
29 changes: 21 additions & 8 deletions yeastregulatorydb/regulatory_data/models/CallingCardsBackground.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,30 @@ def __str__(self):
class Meta:
db_table = "callingcardsbackground"

# pylint:disable=R0801
def save(self, *args, **kwargs):
# Store the old file path
old_file_name = self.file.name if self.file else None
is_create = self.pk is None
super().save(*args, **kwargs)
self.update_file_name("file", "callingcards/background", "tsv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
if old_file_name and old_file_name != new_file_name:
default_storage.delete(old_file_name)
if is_create:
old_file_name = self.file.name if self.file else None
self.update_file_name("file", "callingcards/background", "qbed.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint:disable=R0801
# def save(self, *args, **kwargs):
# # Store the old file path
# old_file_name = self.file.name if self.file else None
# super().save(*args, **kwargs)
# self.update_file_name("file", "callingcards/background", "qbed.gz")
# new_file_name = self.file.name
# super().save(update_fields=["file"])
# # If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint:enable=R0801

Expand Down
29 changes: 21 additions & 8 deletions yeastregulatorydb/regulatory_data/models/Expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,30 @@ def __str__(self):
class Meta:
db_table = "expression"

# pylint: disable=R0801
def save(self, *args, **kwargs):
# Store the old file path
old_file_name = self.file.name if self.file else None
is_create = self.pk is None
super().save(*args, **kwargs)
self.update_file_name("file", f"expression/{self.source.name}", "tsv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
if old_file_name and old_file_name != new_file_name:
default_storage.delete(old_file_name)
if is_create:
old_file_name = self.file.name if self.file else None
self.update_file_name("file", f"expression/{self.source.name}", "csv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint: disable=R0801
# def save(self, *args, **kwargs):
# # Store the old file path
# old_file_name = self.file.name if self.file else None
# super().save(*args, **kwargs)
# self.update_file_name("file", f"expression/{self.source.name}", "csv.gz")
# new_file_name = self.file.name
# super().save(update_fields=["file"])
# # If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint: enable=R0801

Expand Down
27 changes: 20 additions & 7 deletions yeastregulatorydb/regulatory_data/models/PromoterSetSig.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,27 @@ class Meta:
# pylint:disable=R0801
def save(self, *args, **kwargs):
# Store the old file path
old_file_name = self.file.name if self.file else None
is_create = self.pk is None
super().save(*args, **kwargs)
self.update_file_name("file", "promotersetsig", "csv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
if old_file_name and old_file_name != new_file_name:
default_storage.delete(old_file_name)
if is_create:
old_file_name = self.file.name if self.file else None
self.update_file_name("file", "promotersetsig", "csv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# def save(self, *args, **kwargs):
# # Store the old file path
# old_file_name = self.file.name if self.file else None
# super().save(*args, **kwargs)
# self.update_file_name("file", "promotersetsig", "csv.gz")
# new_file_name = self.file.name
# super().save(update_fields=["file"])
# # If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint:enable=R0801

Expand Down
18 changes: 10 additions & 8 deletions yeastregulatorydb/regulatory_data/models/RankResponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,24 @@ class RankResponse(BaseModel, GzipFileUploadWithIdMixin):
)

def __str__(self):
return f"pk:{self.pk};promotersetsig:{self.binding};expression:{self.expression}"
return f"pk:{self.pk};promotersetsig:{self.promotersetsig};expression:{self.expression}"

class Meta:
db_table = "rankresponse"

# pylint:disable=R0801
def save(self, *args, **kwargs):
# Store the old file path
old_file_name = self.file.name if self.file else None
is_create = self.pk is None
super().save(*args, **kwargs)
self.update_file_name("file", "rankresponse", "csv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
if old_file_name and old_file_name != new_file_name:
default_storage.delete(old_file_name)
if is_create:
old_file_name = self.file.name if self.file else None
self.update_file_name("file", "rankresponse", "csv.gz")
new_file_name = self.file.name
super().save(update_fields=["file"])
# If the file name changed, delete the old file
# if old_file_name and old_file_name != new_file_name:
# default_storage.delete(old_file_name)

# pylint:enable=R0801

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Hu_s3(BaseModel, GzipFileUploadWithIdMixin):
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
self.update_file_name('file', 'hu', 'tsv.gz')
self.update_file_name('file', 'hu', 'csv.gz')
super().save(update_fields=['file'])
# Other fields and methods...
Expand Down Expand Up @@ -73,13 +73,16 @@ def update_file_name(self, file_field_name: str, upload_dir: str, extension: str
except AttributeError:
logger.info('No file field name provided. Skipping update_file_name for "%s"', self)
else:
extension = ".".join(file_name_parts[-2:]) if len(file_name_parts) > 1 else file_name_parts[1:]
if not extension:
logger.warning(
'Could not extract extension from file name "%s". Setting to `.txt.gz`',
getattr(self, file_field_name).name,
)
extension = ".txt.gz"
if extension:
logger.debug("Using provided extension: %s", extension)
else:
extension = ".".join(file_name_parts[-2:]) if len(file_name_parts) > 1 else file_name_parts[1:]
if not extension:
logger.warning(
'Could not extract extension from file name "%s". Setting to `.txt.gz`',
getattr(self, file_field_name).name,
)
extension = ".txt.gz"
# Cast self to HasPkProtocol to assure mypy that self has a pk attribute
self_with_pk = cast(HasPkProtocol, self)
# raise AttributeError if self does not have a pk attribute
Expand Down
Loading

0 comments on commit d761008

Please sign in to comment.