Skip to content

Commit

Permalink
refactor: replace windowed_query with yield_per (#17361)
Browse files Browse the repository at this point in the history
  • Loading branch information
miketheman authored Jan 6, 2025
1 parent 24a2693 commit 35f9cac
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 149 deletions.
41 changes: 0 additions & 41 deletions tests/unit/utils/db/test_windowed_query.py

This file was deleted.

52 changes: 23 additions & 29 deletions warehouse/search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from opensearchpy.helpers import parallel_bulk
from redis.lock import Lock
from sqlalchemy import func, select, text
from sqlalchemy.orm import aliased
from urllib3.util import parse_url

from warehouse import tasks
Expand All @@ -36,28 +35,10 @@
)
from warehouse.packaging.search import Project as ProjectDocument
from warehouse.search.utils import get_index
from warehouse.utils.db import windowed_query


def _project_docs(db, project_name=None):
releases_list = (
select(Release.id)
.filter(Release.yanked.is_(False), Release.files.any())
.order_by(
Release.project_id,
Release.is_prerelease.nullslast(),
Release._pypi_ordering.desc(),
)
.distinct(Release.project_id)
)

if project_name:
releases_list = releases_list.join(Project).filter(Project.name == project_name)

releases_list = releases_list.subquery()
rlist = aliased(Release, releases_list)

classifiers = (
def _project_docs(db, project_name: str | None = None):
classifiers_subquery = (
select(func.array_agg(Classifier.classifier))
.select_from(ReleaseClassifiers)
.join(Classifier, Classifier.id == ReleaseClassifiers.trove_id)
Expand All @@ -66,8 +47,7 @@ def _project_docs(db, project_name=None):
.scalar_subquery()
.label("classifiers")
)

release_data = (
projects_to_index = (
select(
Description.raw.label("description"),
Release.author,
Expand All @@ -80,18 +60,32 @@ def _project_docs(db, project_name=None):
Release.platform,
Release.download_url,
Release.created,
classifiers,
classifiers_subquery,
Project.normalized_name,
Project.name,
)
.select_from(rlist)
.join(Release, Release.id == rlist.id)
.select_from(Release)
.join(Description)
.outerjoin(Release.project)
.join(Project)
.filter(
Release.yanked.is_(False),
Release.files.any(),
# Filter by project_name if provided
Project.name == project_name if project_name else text("TRUE"),
)
.order_by(
Project.name,
Release.is_prerelease.nullslast(),
Release._pypi_ordering.desc(),
)
.distinct(Project.name)
.execution_options(yield_per=25000)
)

for chunk in windowed_query(db, release_data, Project.name, 25000):
for release in chunk:
results = db.execute(projects_to_index)

for partition in results.partitions():
for release in partition:
p = ProjectDocument.from_db(release)
p._index = None
p.full_clean()
Expand Down
4 changes: 2 additions & 2 deletions warehouse/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from warehouse.utils.db.windowed_query import windowed_query
from warehouse.utils.db.query_printer import print_query

__all__ = ["windowed_query"]
__all__ = ["print_query"]
77 changes: 0 additions & 77 deletions warehouse/utils/db/windowed_query.py

This file was deleted.

0 comments on commit 35f9cac

Please sign in to comment.