Skip to content

Commit

Permalink
fix(diracx-routers): remove sqlalchemy dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 10, 2025
1 parent 0bc35d3 commit bf087c3
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 46 deletions.
10 changes: 10 additions & 0 deletions diracx-core/src/diracx/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ def __init__(self, job_id: int, detail: str | None = None):
super().__init__(f"Job {job_id} not found" + (" ({detail})" if detail else ""))


class SandboxNotFoundError(Exception):
def __init__(self, pfn: str, se_name: str, detail: str | None = None):
self.pfn: str = pfn
self.se_name: str = se_name
super().__init__(
f"Sandbox with {pfn} and {se_name} not found"
+ (" ({detail})" if detail else "")
)


class JobError(Exception):
def __init__(self, job_id, detail: str | None = None):
self.job_id: int = job_id
Expand Down
48 changes: 25 additions & 23 deletions diracx-db/src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from typing import Any

import sqlalchemy
from sqlalchemy import Executable, delete, insert, literal, select, update
from sqlalchemy.exc import IntegrityError, NoResultFound

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.models import SandboxInfo, SandboxType, UserInfo
from diracx.db.sql.utils import BaseSQLDB, UTCNow

Expand All @@ -17,7 +19,7 @@ class SandboxMetadataDB(BaseSQLDB):
async def upsert_owner(self, user: UserInfo) -> int:
"""Get the id of the owner from the database."""
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
stmt = sqlalchemy.select(SBOwners.OwnerID).where(
stmt = select(SBOwners.OwnerID).where(
SBOwners.Owner == user.preferred_username,
SBOwners.OwnerGroup == user.dirac_group,
SBOwners.VO == user.vo,
Expand All @@ -26,7 +28,7 @@ async def upsert_owner(self, user: UserInfo) -> int:
if owner_id := result.scalar_one_or_none():
return owner_id

stmt = sqlalchemy.insert(SBOwners).values(
stmt = insert(SBOwners).values(
Owner=user.preferred_username,
OwnerGroup=user.dirac_group,
VO=user.vo,
Expand All @@ -53,7 +55,7 @@ async def insert_sandbox(
"""Add a new sandbox in SandboxMetadataDB."""
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
owner_id = await self.upsert_owner(user)
stmt = sqlalchemy.insert(SandBoxes).values(
stmt = insert(SandBoxes).values(
OwnerId=owner_id,
SEName=se_name,
SEPFN=pfn,
Expand All @@ -63,27 +65,31 @@ async def insert_sandbox(
)
try:
result = await self.conn.execute(stmt)
except sqlalchemy.exc.IntegrityError:
except IntegrityError:
await self.update_sandbox_last_access_time(se_name, pfn)
else:
assert result.rowcount == 1

async def update_sandbox_last_access_time(self, se_name: str, pfn: str) -> None:
stmt = (
sqlalchemy.update(SandBoxes)
update(SandBoxes)
.where(SandBoxes.SEName == se_name, SandBoxes.SEPFN == pfn)
.values(LastAccessTime=UTCNow())
)
result = await self.conn.execute(stmt)
assert result.rowcount == 1

async def sandbox_is_assigned(self, pfn: str, se_name: str) -> bool:
async def sandbox_is_assigned(self, pfn: str, se_name: str) -> bool | None:
"""Checks if a sandbox exists and has been assigned."""
stmt: sqlalchemy.Executable = sqlalchemy.select(SandBoxes.Assigned).where(
stmt: Executable = select(SandBoxes.Assigned).where(
SandBoxes.SEName == se_name, SandBoxes.SEPFN == pfn
)
result = await self.conn.execute(stmt)
is_assigned = result.scalar_one()
try:
is_assigned = result.scalar_one()
except NoResultFound as e:
raise SandboxNotFoundError(pfn, se_name) from e

return is_assigned

@staticmethod
Expand All @@ -97,7 +103,7 @@ async def get_sandbox_assigned_to_job(
"""Get the sandbox assign to job."""
entity_id = self.jobid_to_entity_id(job_id)
stmt = (
sqlalchemy.select(SandBoxes.SEPFN)
select(SandBoxes.SEPFN)
.where(SandBoxes.SBId == SBEntityMapping.SBId)
.where(
SBEntityMapping.EntityId == entity_id,
Expand All @@ -118,32 +124,28 @@ async def assign_sandbox_to_jobs(
for job_id in jobs_ids:
# Define the entity id as 'Entity:entity_id' due to the DB definition:
entity_id = self.jobid_to_entity_id(job_id)
select_sb_id = sqlalchemy.select(
select_sb_id = select(
SandBoxes.SBId,
sqlalchemy.literal(entity_id).label("EntityId"),
sqlalchemy.literal(sb_type).label("Type"),
literal(entity_id).label("EntityId"),
literal(sb_type).label("Type"),
).where(
SandBoxes.SEName == se_name,
SandBoxes.SEPFN == pfn,
)
stmt = sqlalchemy.insert(SBEntityMapping).from_select(
stmt = insert(SBEntityMapping).from_select(
["SBId", "EntityId", "Type"], select_sb_id
)
await self.conn.execute(stmt)

stmt = (
sqlalchemy.update(SandBoxes)
.where(SandBoxes.SEPFN == pfn)
.values(Assigned=True)
)
stmt = update(SandBoxes).where(SandBoxes.SEPFN == pfn).values(Assigned=True)
result = await self.conn.execute(stmt)
assert result.rowcount == 1

async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
"""Delete mapping between jobs and sandboxes."""
for job_id in jobs_ids:
entity_id = self.jobid_to_entity_id(job_id)
sb_sel_stmt = sqlalchemy.select(SandBoxes.SBId)
sb_sel_stmt = select(SandBoxes.SBId)
sb_sel_stmt = sb_sel_stmt.join(
SBEntityMapping, SBEntityMapping.SBId == SandBoxes.SBId
)
Expand All @@ -152,19 +154,19 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
result = await self.conn.execute(sb_sel_stmt)
sb_ids = [row.SBId for row in result]

del_stmt = sqlalchemy.delete(SBEntityMapping).where(
del_stmt = delete(SBEntityMapping).where(
SBEntityMapping.EntityId == entity_id
)
await self.conn.execute(del_stmt)

sb_entity_sel_stmt = sqlalchemy.select(SBEntityMapping.SBId).where(
sb_entity_sel_stmt = select(SBEntityMapping.SBId).where(
SBEntityMapping.SBId.in_(sb_ids)
)
result = await self.conn.execute(sb_entity_sel_stmt)
remaining_sb_ids = [row.SBId for row in result]
if not remaining_sb_ids:
unassign_stmt = (
sqlalchemy.update(SandBoxes)
update(SandBoxes)
.where(SandBoxes.SBId.in_(sb_ids))
.values(Assigned=False)
)
Expand Down
3 changes: 2 additions & 1 deletion diracx-db/tests/jobs/test_sandbox_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import sqlalchemy

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.models import SandboxInfo, UserInfo
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
from diracx.db.sql.sandbox_metadata.schema import SandBoxes, SBEntityMapping
Expand Down Expand Up @@ -48,7 +49,7 @@ async def test_insert_sandbox(sandbox_metadata_db: SandboxMetadataDB):
db_contents = await _dump_db(sandbox_metadata_db)
assert pfn1 not in db_contents
async with sandbox_metadata_db:
with pytest.raises(sqlalchemy.exc.NoResultFound):
with pytest.raises(SandboxNotFoundError):
await sandbox_metadata_db.sandbox_is_assigned(pfn1, "SandboxSE")

# Insert the sandbox
Expand Down
1 change: 0 additions & 1 deletion diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies = [
"httpx",
"pydantic >=2.10",
"uvicorn",
"sqlalchemy",
"opentelemetry-api",
"opentelemetry-exporter-otlp",
"opentelemetry-instrumentation-fastapi",
Expand Down
35 changes: 14 additions & 21 deletions diracx-routers/src/diracx/routers/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from pydantic import BaseModel, PrivateAttr
from pydantic_settings import SettingsConfigDict
from pyparsing import Any
from sqlalchemy.exc import NoResultFound

from diracx.core.models import (
SandboxInfo,
Expand Down Expand Up @@ -121,26 +120,20 @@ async def initiate_sandbox_upload(
detail=f"Sandbox too large. Max size is {MAX_SANDBOX_SIZE_BYTES} bytes",
)

try:
exists_and_assigned = await sandbox_metadata_db.sandbox_is_assigned(
pfn, settings.se_name
)
except NoResultFound:
# The sandbox doesn't exist in the database
pass
else:
# As sandboxes are registered in the DB before uploading to the storage
# backend we can't rely on their existence in the database to determine if
# they have been uploaded. Instead we check if the sandbox has been
# assigned to a job. If it has then we know it has been uploaded and we
# can avoid communicating with the storage backend.
if exists_and_assigned or s3_object_exists(
settings.s3_client, settings.bucket_name, pfn_to_key(pfn)
):
await sandbox_metadata_db.update_sandbox_last_access_time(
settings.se_name, pfn
)
return SandboxUploadResponse(pfn=full_pfn)
exists_and_assigned = await sandbox_metadata_db.sandbox_is_assigned(
pfn, settings.se_name
)

# As sandboxes are registered in the DB before uploading to the storage
# backend we can't rely on their existence in the database to determine if
# they have been uploaded. Instead we check if the sandbox has been
# assigned to a job. If it has then we know it has been uploaded and we
# can avoid communicating with the storage backend.
if exists_and_assigned or s3_object_exists(
settings.s3_client, settings.bucket_name, pfn_to_key(pfn)
):
await sandbox_metadata_db.update_sandbox_last_access_time(settings.se_name, pfn)
return SandboxUploadResponse(pfn=full_pfn)

upload_info = await generate_presigned_upload(
settings.s3_client,
Expand Down

0 comments on commit bf087c3

Please sign in to comment.