Skip to content

Commit

Permalink
Refactor orgs and job queues
Browse files Browse the repository at this point in the history
  • Loading branch information
bsatoriu committed Sep 4, 2024
1 parent a896fc7 commit f80377e
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 129 deletions.
20 changes: 3 additions & 17 deletions api/endpoints/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from datetime import datetime
import json

from api.utils import job_queue

log = logging.getLogger(__name__)

ns = api.namespace('mas', description='Operations to register an algorithm')
Expand Down Expand Up @@ -465,24 +467,8 @@ def get(self):
try:
response_body = {"code": None, "message": None}
user = get_authorized_user()
queues = job_queue.get_user_queues(user.id)

queues = []
query = """select jq.queue_name from organization_membership m
inner join public.organization_job_queue ojq on m.org_id = ojq.org_id
inner join public.job_queue jq on jq.id = ojq.job_queue_id
where m.member_id = {}
union
select queue_name
from job_queue
where guest_tier = true""".format(user.id)
queue_list = db.session.execute(sqlalchemy.text(query))

Record = namedtuple('Record', queue_list.keys())
queue_records = [Record(*r) for r in queue_list.fetchall()]
for r in queue_records:
queues.append(r.queue_name)

queues = queues
response_body["code"] = status.HTTP_200_OK
response_body["queues"] = queues
response_body["message"] = "success"
Expand Down
128 changes: 16 additions & 112 deletions api/endpoints/organizations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging

import sqlalchemy
from flask_restx import Resource
from flask import request
from flask_api import status
from collections import namedtuple
from sqlalchemy.exc import SQLAlchemyError
from api.models.job_queue import JobQueue
from api.models.organization import Organization as Organization_db
from api.models.organization_job_queue import OrganizationJobQueue
Expand All @@ -17,9 +17,10 @@
from api.schemas.organization_job_queue_schema import OrganizationJobQueueSchema
from api.schemas.organization_membership_schema import OrganizationMembershipSchema
from api.schemas.organization_schema import OrganizationSchema
from api.schemas.member_schema import MemberSchema
from datetime import datetime
import json

from api.utils import organization
from api.utils.http_util import err_response

log = logging.getLogger(__name__)
Expand All @@ -35,60 +36,8 @@ def get(self):
Lists the hierarchy of organizations using MAAP
:return:
"""

result = []
otree = db.session.execute(sqlalchemy.text('select * from org_tree order by row_number'))

queues_query = db.session.query(
JobQueue, OrganizationJobQueue,
).filter(
JobQueue.id == OrganizationJobQueue.job_queue_id
).order_by(JobQueue.queue_name).all()

membership_query = db.session.query(
Member, OrganizationMembership_db,
).filter(
Member.id == OrganizationMembership_db.member_id
).order_by(Member.first_name).all()

Record = namedtuple('Record', otree.keys())
org_tree_records = [Record(*r) for r in otree.fetchall()]
for r in org_tree_records:
org = {
'id': r.id,
'parent_org_id': r.parent_org_id,
'name': r.name,
'depth': r.depth,
'member_count': r.member_count,
'default_job_limit_count': r.default_job_limit_count,
'default_job_limit_hours': r.default_job_limit_hours,
'job_queues': [],
'members': [],
'creation_date': r.creation_date.strftime('%m/%d/%Y'),
}

for q in queues_query:
if q.OrganizationJobQueue.org_id == r.id:
org['job_queues'].append({
'id': q.JobQueue.id,
'queue_name': q.JobQueue.queue_name,
'queue_description': q.JobQueue.queue_description
})

for m in membership_query:
if m.OrganizationMembership.org_id == r.id:
org['members'].append({
'id': m.Member.id,
'first_name': m.Member.first_name,
'last_name': m.Member.last_name,
'username': m.Member.username,
'email': m.Member.email,
'maintainer': m.OrganizationMembership.org_maintainer
})

result.append(org)

return result
orgs = organization.get_organizations()
return orgs

@api.doc(security='ApiKeyAuth')
@login_required()
Expand All @@ -114,26 +63,14 @@ def post(self):
parent_org_id = req_data.get("parent_org_id", root_org.id)
if parent_org_id is None:
parent_org_id = root_org.id

default_job_limit_count = req_data.get("default_job_limit_count", None)
default_job_limit_hours = req_data.get("default_job_limit_hours", None)

new_org = Organization_db(name=name, parent_org_id=parent_org_id, default_job_limit_count=default_job_limit_count,
default_job_limit_hours=default_job_limit_hours, creation_date=datetime.utcnow())

db.session.add(new_org)
db.session.commit()

org_members = []
members = req_data.get("members", [])
for org_member in members:
org_members.append(OrganizationMembership_db(member_id=org_member['member_id'], org_id=new_org.id, org_maintainer=org_member['maintainer'], creation_date=datetime.utcnow()))

if len(org_members) > 0:
db.session.add_all(org_members)
db.session.commit()
new_org = organization.create_organization(name, parent_org_id, default_job_limit_count, default_job_limit_hours, members)

org_schema = OrganizationSchema()
return json.loads(org_schema.dumps(new_org))
return new_org


@ns.route('/<int:org_id>')
Expand All @@ -145,19 +82,12 @@ def get(self, org_id):
"""
Retrieve organization
"""

org = db.session \
.query(Organization_db) \
.filter_by(id=org_id) \
.first()
org = organization.get_organization(org_id)

if org is None:
return err_response(msg="No organization found with id " + org_id, code=status.HTTP_404_NOT_FOUND)

org_schema = OrganizationSchema()
result = json.loads(org_schema.dumps(org))

return result
return org

@api.doc(security='ApiKeyAuth')
@login_required()
Expand All @@ -183,25 +113,12 @@ def put(self, org_id):
org.parent_org_id = req_data.get("parent_org_id", org.parent_org_id)
org.default_job_limit_count = req_data.get("default_job_limit_count", org.default_job_limit_count)
org.default_job_limit_hours = req_data.get("default_job_limit_hours", org.default_job_limit_hours)
db.session.commit()

# Update membership
db.session.execute(
db.delete(OrganizationMembership_db).filter_by(org_id=org_id)
)
db.session.commit()

org_members = []
members = req_data.get("members", [])
for org_member in members:
org_members.append(OrganizationMembership_db(member_id=org_member['member_id'], org_id=org_id, org_maintainer=org_member['maintainer'], creation_date=datetime.utcnow()))

if len(org_members) > 0:
db.session.add_all(org_members)
db.session.commit()
updated_org = organization.update_organization(org, members)
return updated_org


org_schema = OrganizationSchema()
return json.loads(org_schema.dumps(org))

@api.doc(security='ApiKeyAuth')
@login_required()
Expand All @@ -210,26 +127,13 @@ def delete(self, org_id):
Delete organization
"""

org = db.session.query(Organization_db).filter_by(id=org_id).first()
org_name = org.name
org = organization.get_organization(org_id)

if org is None:
return err_response(msg="Organization does not exist")

# Clear membership
db.session.execute(
db.delete(OrganizationMembership_db).filter_by(org_id=org_id)
)
db.session.commit()

# Clear job queues
db.session.execute(
db.delete(OrganizationJobQueue).filter_by(org_id=org_id)
)
db.session.commit()

db.session.query(Organization_db).filter_by(id=org_id).delete()
db.session.commit()
org_name = org.name
organization.delete_organization(org.id)

return {"code": status.HTTP_200_OK, "message": "Successfully deleted {}.".format(org_name)}

Expand Down
35 changes: 35 additions & 0 deletions api/utils/job_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from collections import namedtuple
import sqlalchemy
from sqlalchemy.exc import SQLAlchemyError
from api.maap_database import db

log = logging.getLogger(__name__)


def get_user_queues(user_id):

try:
user_queues = []
query = """select jq.queue_name from organization_membership m
inner join public.organization_job_queue ojq on m.org_id = ojq.org_id
inner join public.job_queue jq on jq.id = ojq.job_queue_id
where m.member_id = {}
union
select queue_name
from job_queue
where guest_tier = true""".format(user_id)
queue_list = db.session.execute(sqlalchemy.text(query))

Record = namedtuple('Record', queue_list.keys())
queue_records = [Record(*r) for r in queue_list.fetchall()]

for r in queue_records:
user_queues.append(r.queue_name)

return user_queues

except SQLAlchemyError as ex:
raise ex
except:
raise Exception("Couldn't get list of available queues")
Loading

0 comments on commit f80377e

Please sign in to comment.