Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filtering and sorting #3230

Merged
merged 21 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions src/zenml/client.py

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions src/zenml/models/v2/base/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ class BaseFilter(BaseModel):
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)

id: Optional[Union[UUID, str]] = Field(
default=None,
description="Id for this resource",
Expand Down Expand Up @@ -491,13 +490,13 @@ def validate_sort_by(cls, value: Any) -> Any:
)
value = column

if column in cls.FILTER_EXCLUDE_FIELDS:
if column in cls.CUSTOM_SORTING_OPTIONS:
return value
elif column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{value}'"
)
elif column in cls.model_fields:
return value
elif column in cls.CUSTOM_SORTING_OPTIONS:
if column in cls.model_fields:
return value
else:
raise ValueError(
Expand Down
254 changes: 251 additions & 3 deletions src/zenml/models/v2/base/scoped.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Optional,
Type,
TypeVar,
Union,
)
from uuid import UUID

Expand Down Expand Up @@ -151,16 +152,32 @@ class UserScopedFilter(BaseFilter):

FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.FILTER_EXCLUDE_FIELDS,
"user",
"scope_user",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.CLI_EXCLUDE_FIELDS,
"user_id",
"scope_user",
]
CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = [
*BaseFilter.CUSTOM_SORTING_OPTIONS,
"user",
]

scope_user: Optional[UUID] = Field(
default=None,
description="The user to scope this query to.",
)
user_id: Optional[Union[UUID, str]] = Field(
default=None,
description="UUID of the user that created the entity.",
union_mode="left_to_right",
)
user: Optional[Union[UUID, str]] = Field(
default=None,
description="Name/ID of the user that created the entity.",
)

def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Expand All @@ -170,6 +187,73 @@ def set_scope_user(self, user_id: UUID) -> None:
"""
self.scope_user = user_id

def get_custom_filters(
self,
) -> List["ColumnElement[bool]"]:
"""Get custom filters.

Returns:
A list of custom filters.
"""
custom_filters = super().get_custom_filters()

from sqlmodel import and_

from zenml.zen_stores.schemas import (
PipelineSchema,
UserSchema,
)

if self.user:
user_filter = and_(
PipelineSchema.user_id == UserSchema.id,
schustmi marked this conversation as resolved.
Show resolved Hide resolved
self.generate_name_or_id_query_conditions(
value=self.user,
table=UserSchema,
additional_columns=["full_name"],
),
)
custom_filters.append(user_filter)

return custom_filters

def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.

Args:
query: The query to which to apply the sorting.
table: The query table.

Returns:
The query with sorting applied.
"""
from sqlmodel import asc, desc

from zenml.enums import SorterOps
from zenml.zen_stores.schemas import UserSchema

sort_by, operand = self.sorting_params

if sort_by == "user":
column = UserSchema.name

query = query.join(
UserSchema, getattr(table, "user_id") == UserSchema.id
)

if operand == SorterOps.ASCENDING:
query = query.order_by(asc(column))
else:
query = query.order_by(desc(column))

return query

return super().apply_sorting(query=query, table=table)

def apply_filter(
self,
query: AnyQuery,
Expand Down Expand Up @@ -240,21 +324,37 @@ def workspace(self) -> "WorkspaceResponse":
return self.get_metadata().workspace


class WorkspaceScopedFilter(BaseFilter):
class WorkspaceScopedFilter(UserScopedFilter):
"""Model to enable advanced scoping with workspace."""

FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.FILTER_EXCLUDE_FIELDS,
*UserScopedFilter.FILTER_EXCLUDE_FIELDS,
"workspace",
"scope_workspace",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.CLI_EXCLUDE_FIELDS,
*UserScopedFilter.CLI_EXCLUDE_FIELDS,
"workspace_id",
"workspace",
"scope_workspace",
]
CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = [
*UserScopedFilter.CUSTOM_SORTING_OPTIONS,
"workspace",
]
scope_workspace: Optional[UUID] = Field(
default=None,
description="The workspace to scope this query to.",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None,
description="UUID of the workspace that this entity belongs to.",
union_mode="left_to_right",
)
workspace: Optional[Union[UUID, str]] = Field(
default=None,
description="Name/ID of the workspace that this entity belongs to.",
)

def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Expand All @@ -264,6 +364,35 @@ def set_scope_workspace(self, workspace_id: UUID) -> None:
"""
self.scope_workspace = workspace_id

def get_custom_filters(
self,
) -> List["ColumnElement[bool]"]:
"""Get custom filters.

Returns:
A list of custom filters.
"""
custom_filters = super().get_custom_filters()

from sqlmodel import and_

from zenml.zen_stores.schemas import (
PipelineSchema,
WorkspaceSchema,
)

if self.workspace:
workspace_filter = and_(
PipelineSchema.user_id == WorkspaceSchema.id,
bcdurak marked this conversation as resolved.
Show resolved Hide resolved
self.generate_name_or_id_query_conditions(
value=self.workspace,
table=WorkspaceSchema,
),
)
custom_filters.append(workspace_filter)

return custom_filters

def apply_filter(
self,
query: AnyQuery,
Expand Down Expand Up @@ -291,6 +420,44 @@ def apply_filter(

return query

def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.

Args:
query: The query to which to apply the sorting.
table: The query table.

Returns:
The query with sorting applied.
"""
from sqlmodel import asc, desc

from zenml.enums import SorterOps
from zenml.zen_stores.schemas import WorkspaceSchema

sort_by, operand = self.sorting_params

if sort_by == "workspace":
column = WorkspaceSchema.name

query = query.join(
WorkspaceSchema,
getattr(table, "workspace_id") == WorkspaceSchema.id,
)

if operand == SorterOps.ASCENDING:
query = query.order_by(asc(column))
else:
query = query.order_by(desc(column))

return query

return super().apply_sorting(query=query, table=table)


class WorkspaceScopedTaggableFilter(WorkspaceScopedFilter):
"""Model to enable advanced scoping with workspace and tagging."""
Expand All @@ -304,6 +471,11 @@ class WorkspaceScopedTaggableFilter(WorkspaceScopedFilter):
"tag",
]

CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = [
*WorkspaceScopedFilter.CUSTOM_SORTING_OPTIONS,
"tag",
]

def apply_filter(
self,
query: AnyQuery,
Expand Down Expand Up @@ -347,3 +519,79 @@ def get_custom_filters(self) -> List["ColumnElement[bool]"]:
)

return custom_filters

def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.

Args:
query: The query to which to apply the sorting.
table: The query table.

Returns:
The query with sorting applied.
"""
from sqlmodel import asc, desc, func

from zenml.enums import SorterOps, TaggableResourceTypes
from zenml.zen_stores.schemas import (
ArtifactSchema,
ArtifactVersionSchema,
ModelSchema,
ModelVersionSchema,
PipelineRunSchema,
PipelineSchema,
RunTemplateSchema,
TagResourceSchema,
TagSchema,
)

sort_by, operand = self.sorting_params

if sort_by == "tag":
resource_type_mapping = {
ArtifactSchema: TaggableResourceTypes.ARTIFACT,
ArtifactVersionSchema: TaggableResourceTypes.ARTIFACT_VERSION,
ModelSchema: TaggableResourceTypes.MODEL,
ModelVersionSchema: TaggableResourceTypes.MODEL_VERSION,
PipelineSchema: TaggableResourceTypes.PIPELINE,
PipelineRunSchema: TaggableResourceTypes.PIPELINE_RUN,
RunTemplateSchema: TaggableResourceTypes.RUN_TEMPLATE,
}

query = (
query.outerjoin(
TagResourceSchema,
(table.id == TagResourceSchema.resource_id)
& (
bcdurak marked this conversation as resolved.
Show resolved Hide resolved
TagResourceSchema.resource_type
== resource_type_mapping[table]
),
)
.outerjoin(TagSchema, TagResourceSchema.tag_id == TagSchema.id)
.group_by(table.id)
)

if operand == SorterOps.ASCENDING:
query = query.order_by(
asc(
func.group_concat(TagSchema.name, ",").label(
"tags_list"
bcdurak marked this conversation as resolved.
Show resolved Hide resolved
)
)
)
else:
query = query.order_by(
desc(
func.group_concat(TagSchema.name, ",").label(
"tags_list"
)
)
)

return query

return super().apply_sorting(query=query, table=table)
Loading
Loading