diff --git a/inbox/api/filtering.py b/inbox/api/filtering.py index fd81f83d5..a82cbf471 100644 --- a/inbox/api/filtering.py +++ b/inbox/api/filtering.py @@ -22,7 +22,7 @@ from inbox.models.session import session_scope_by_shard_id -def contact_subquery(db_session, namespace_id, email_address, field): +def contact_query(db_session, namespace_id, email_address, field): return ( db_session.query(Message.thread_id) .join(MessageContactAssociation) @@ -32,7 +32,6 @@ def contact_subquery(db_session, namespace_id, email_address, field): Contact.namespace_id == namespace_id, MessageContactAssociation.field == field, ) - .subquery() ) @@ -89,19 +88,19 @@ def threads( query = query.filter(*filters) if from_addr is not None: - from_query = contact_subquery(db_session, namespace_id, from_addr, "from_addr") + from_query = contact_query(db_session, namespace_id, from_addr, "from_addr") query = query.filter(Thread.id.in_(from_query)) if to_addr is not None: - to_query = contact_subquery(db_session, namespace_id, to_addr, "to_addr") + to_query = contact_query(db_session, namespace_id, to_addr, "to_addr") query = query.filter(Thread.id.in_(to_query)) if cc_addr is not None: - cc_query = contact_subquery(db_session, namespace_id, cc_addr, "cc_addr") + cc_query = contact_query(db_session, namespace_id, cc_addr, "cc_addr") query = query.filter(Thread.id.in_(cc_query)) if bcc_addr is not None: - bcc_query = contact_subquery(db_session, namespace_id, bcc_addr, "bcc_addr") + bcc_query = contact_query(db_session, namespace_id, bcc_addr, "bcc_addr") query = query.filter(Thread.id.in_(bcc_query)) if any_email is not None: @@ -113,7 +112,6 @@ def threads( Contact.email_address.in_(any_email), Contact.namespace_id == namespace_id, ) - .subquery() ) query = query.filter(Thread.id.in_(any_contact_query)) @@ -129,7 +127,6 @@ def threads( .join(Part) .join(Block) .filter(Block.filename == filename, Block.namespace_id == namespace_id) - .subquery() ) query = query.filter(Thread.id.in_(files_query)) @@ -146,24 +143,19 @@ def threads( .join(Message.messagecategories) .join(MessageCategory.category) .filter(Category.namespace_id == namespace_id, or_(*category_filters)) - .subquery() ) query = query.filter(Thread.id.in_(category_query)) if unread is not None: read = not unread - unread_query = ( - db_session.query(Message.thread_id) - .filter(Message.namespace_id == namespace_id, Message.is_read == read) - .subquery() + unread_query = db_session.query(Message.thread_id).filter( + Message.namespace_id == namespace_id, Message.is_read == read ) query = query.filter(Thread.id.in_(unread_query)) if starred is not None: - starred_query = ( - db_session.query(Message.thread_id) - .filter(Message.namespace_id == namespace_id, Message.is_starred == starred) - .subquery() + starred_query = db_session.query(Message.thread_id).filter( + Message.namespace_id == namespace_id, Message.is_starred == starred ) query = query.filter(Thread.id.in_(starred_query)) @@ -344,7 +336,6 @@ def messages_or_drafts( Contact.email_address == to_addr, Contact.namespace_id == bindparam("namespace_id"), ) - .subquery() ) query = query.filter(Message.id.in_(to_query)) @@ -357,7 +348,6 @@ def messages_or_drafts( Contact.email_address == from_addr, Contact.namespace_id == bindparam("namespace_id"), ) - .subquery() ) query = query.filter(Message.id.in_(from_query)) @@ -370,7 +360,6 @@ def messages_or_drafts( Contact.email_address == cc_addr, Contact.namespace_id == bindparam("namespace_id"), ) - .subquery() ) query = query.filter(Message.id.in_(cc_query)) @@ -383,7 +372,6 @@ def messages_or_drafts( Contact.email_address == bcc_addr, Contact.namespace_id == bindparam("namespace_id"), ) - .subquery() ) query = query.filter(Message.id.in_(bcc_query)) @@ -395,7 +383,6 @@ def messages_or_drafts( Contact.email_address.in_(any_email), Contact.namespace_id == bindparam("namespace_id"), ) - .subquery() ) query = query.filter(Message.id.in_(any_email_query)) @@ -583,7 +570,7 @@ def recurring_events( if ends_before: # start < end, so event start < ends_before before_criteria.append(RecurringEvent.start < ends_before) - recur_query = recur_query.filter(and_(*before_criteria)) + recur_query = recur_query.filter(*before_criteria) after_criteria = [] if starts_after: after_criteria.append( @@ -594,7 +581,7 @@ def recurring_events( or_(RecurringEvent.until > ends_after, RecurringEvent.until.is_(None)) ) - recur_query = recur_query.filter(and_(*after_criteria)) + recur_query = recur_query.filter(*after_criteria) recur_instances = [] @@ -694,7 +681,6 @@ def events( Contact.namespace_id == namespace_id, EventContactAssociation.field == "title", ) - .subquery() ) event_criteria.append(Event.id.in_(title_email_query)) @@ -707,7 +693,6 @@ def events( Contact.namespace_id == namespace_id, EventContactAssociation.field == "description", ) - .subquery() ) event_criteria.append(Event.id.in_(description_email_query)) @@ -720,7 +705,6 @@ def events( Contact.namespace_id == namespace_id, EventContactAssociation.field == "owner", ) - .subquery() ) event_criteria.append(Event.id.in_(owner_email_query)) @@ -733,7 +717,6 @@ def events( Contact.namespace_id == namespace_id, EventContactAssociation.field == "participant", ) - .subquery() ) event_criteria.append(Event.id.in_(participant_email_query)) @@ -744,7 +727,6 @@ def events( .filter( Contact.email_address == any_email, Contact.namespace_id == namespace_id ) - .subquery() ) event_criteria.append(Event.id.in_(any_email_query)) diff --git a/inbox/models/session.py b/inbox/models/session.py index 22fd644a2..feda46894 100644 --- a/inbox/models/session.py +++ b/inbox/models/session.py @@ -211,7 +211,7 @@ def id_chooser(query, ident): return [str(engine_manager.shard_key_for_id(ident))] -def query_chooser(query): +def execute_chooser(query): return [str(k) for k in engine_manager.engines] @@ -221,7 +221,7 @@ def global_session_scope(): session = ShardedSession( shard_chooser=shard_chooser, id_chooser=id_chooser, - query_chooser=query_chooser, + execute_chooser=execute_chooser, shards=shards, ) # STOPSHIP(emfree): need instrumentation and proper exception handling diff --git a/inbox/transactions/delta_sync.py b/inbox/transactions/delta_sync.py index 7e944ac6e..edc7e5cb6 100644 --- a/inbox/transactions/delta_sync.py +++ b/inbox/transactions/delta_sync.py @@ -57,7 +57,7 @@ def get_transaction_cursor_near_timestamp(namespace_id, timestamp, db_session): .order_by(desc(Transaction.created_at)) .filter(Transaction.created_at < dt, Transaction.namespace_id == namespace_id) .limit(1) - .subquery() + .scalar_subquery() ) latest_transaction = ( db_session.query(Transaction)