From 1c9646f0a019a167c32b649b6f5e6423c5ba2c9b Mon Sep 17 00:00:00 2001 From: "Michael P. Jung" Date: Thu, 28 Nov 2024 17:32:03 +0100 Subject: [PATCH] Add DISTINCT ON and LIMIT BY support to ClickHouse dialect (#817) --- pypika/dialects.py | 65 ++++++++++++++++++++++-- pypika/queries.py | 4 +- pypika/tests/dialects/test_clickhouse.py | 45 ++++++++++++++++ 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/pypika/dialects.py b/pypika/dialects.py index 2255243b..57955d85 100644 --- a/pypika/dialects.py +++ b/pypika/dialects.py @@ -1,7 +1,7 @@ import itertools import warnings from copy import copy -from typing import Any, Optional, Union, Tuple as TypedTuple +from typing import Any, Optional, Union, Tuple as TypedTuple, List from pypika.enums import Dialects from pypika.queries import ( @@ -384,7 +384,7 @@ def get_sql(self, *args: Any, **kwargs: Any) -> str: kwargs['groupby_alias'] = False return super().get_sql(*args, **kwargs) - def _apply_pagination(self, querystring: str) -> str: + def _apply_pagination(self, querystring: str, **kwargs) -> str: # Note: Overridden as Oracle specifies offset before the fetch next limit if self._offset: querystring += self._offset_sql() @@ -719,7 +719,7 @@ def top(self, value: Union[str, int], percent: bool = False, with_ties: bool = F self._top_percent: bool = percent self._top_with_ties: bool = with_ties - def _apply_pagination(self, querystring: str) -> str: + def _apply_pagination(self, querystring: str, **kwargs) -> str: # Note: Overridden as MSSQL specifies offset before the fetch next limit if self._limit is not None or self._offset: # Offset has to be present if fetch next is specified in a MSSQL query @@ -794,11 +794,21 @@ def drop_view(self, view: str) -> "ClickHouseDropQueryBuilder": class ClickHouseQueryBuilder(QueryBuilder): QUERY_CLS = ClickHouseQuery + _distinct_on: List[Term] + _limit_by: Optional[TypedTuple[int, int, List[Term]]] + def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self._final = False self._sample = None self._sample_offset = None + self._distinct_on = [] + self._limit_by = None + + def __copy__(self) -> "ClickHouseQueryBuilder": + newone = super().__copy__() + newone._limit_by = copy(self._limit_by) + return newone @builder def final(self) -> "ClickHouseQueryBuilder": @@ -839,6 +849,55 @@ def _set_sql(self, **kwargs: Any) -> str: ) ) + @builder + def distinct_on(self, *fields: Union[str, Term]) -> "ClickHouseQueryBuilder": + for field in fields: + if isinstance(field, str): + self._distinct_on.append(Field(field)) + elif isinstance(field, Term): + self._distinct_on.append(field) + + def _distinct_sql(self, **kwargs: Any) -> str: + if self._distinct_on: + return "DISTINCT ON({distinct_on}) ".format( + distinct_on=",".join(term.get_sql(with_alias=True, **kwargs) for term in self._distinct_on) + ) + return super()._distinct_sql(**kwargs) + + @builder + def limit_by(self, n, *by: Union[str, Term]) -> "ClickHouseQueryBuilder": + self._limit_by = (n, 0, [Field(field) if isinstance(field, str) else field for field in by]) + + @builder + def limit_offset_by(self, n, offset, *by: Union[str, Term]) -> "ClickHouseQueryBuilder": + self._limit_by = (n, offset, [Field(field) if isinstance(field, str) else field for field in by]) + + def _apply_pagination(self, querystring: str, **kwargs) -> str: + # LIMIT BY isn't really a pagination per se but since we need + # to add this to the query right before an actual LIMIT clause + # this is good enough. + if self._limit_by: + querystring += self._limit_by_sql(**kwargs) + return super()._apply_pagination(querystring, **kwargs) + + def _limit_by_sql(self, **kwargs: Any) -> str: + (n, offset, by) = self._limit_by + by = ",".join(term.get_sql(with_alias=True, **kwargs) for term in by) + if offset != 0: + return f" LIMIT {n} OFFSET {offset} BY ({by})" + else: + return f" LIMIT {n} BY ({by})" + + def replace_table(self, current_table: Optional[Table], new_table: Optional[Table]) -> "ClickHouseQueryBuilder": + newone = super().replace_table(current_table, new_table) + if self._limit_by: + newone._limit_by = ( + self._limit_by[0], + self._limit_by[1], + [column.replace_table(current_table, new_table) for column in self._limit_by[2]], + ) + return newone + class ClickHouseDropQueryBuilder(DropQueryBuilder): QUERY_CLS = ClickHouseQuery diff --git a/pypika/queries.py b/pypika/queries.py index d7861200..42c7c459 100644 --- a/pypika/queries.py +++ b/pypika/queries.py @@ -1354,7 +1354,7 @@ def get_sql(self, with_alias: bool = False, subquery: bool = False, **kwargs: An if self._orderbys: querystring += self._orderby_sql(**kwargs) - querystring = self._apply_pagination(querystring) + querystring = self._apply_pagination(querystring, **kwargs) if self._for_update: querystring += self._for_update_sql(**kwargs) @@ -1370,7 +1370,7 @@ def get_sql(self, with_alias: bool = False, subquery: bool = False, **kwargs: An return querystring - def _apply_pagination(self, querystring: str) -> str: + def _apply_pagination(self, querystring: str, **kwargs) -> str: if self._limit is not None: querystring += self._limit_sql() diff --git a/pypika/tests/dialects/test_clickhouse.py b/pypika/tests/dialects/test_clickhouse.py index 62afd7c2..f319c462 100644 --- a/pypika/tests/dialects/test_clickhouse.py +++ b/pypika/tests/dialects/test_clickhouse.py @@ -99,3 +99,48 @@ def test_drop_other(self): self.assertEqual('DROP QUOTA "myquota"', str(q1)) self.assertEqual('DROP USER "myuser"', str(q2)) self.assertEqual('DROP VIEW "myview"', str(q3)) + + +class DistinctOnTests(TestCase): + table_abc = Table("abc") + + def test_distinct_on(self): + q = ClickHouseQuery.from_(self.table_abc).distinct_on("lname", self.table_abc.fname).select("lname", "id") + + self.assertEqual('''SELECT DISTINCT ON("lname","fname") "lname","id" FROM "abc"''', str(q)) + + +class LimitByTests(TestCase): + table_abc = Table("abc") + + def test_limit_by(self): + q = ClickHouseQuery.from_(self.table_abc).limit_by(1, "a", self.table_abc.b).select("a", "b", "c") + + self.assertEqual('''SELECT "a","b","c" FROM "abc" LIMIT 1 BY ("a","b")''', str(q)) + + def test_limit_offset_by(self): + q = ClickHouseQuery.from_(self.table_abc).limit_offset_by(1, 2, "a", self.table_abc.b).select("a", "b", "c") + + self.assertEqual('''SELECT "a","b","c" FROM "abc" LIMIT 1 OFFSET 2 BY ("a","b")''', str(q)) + + def test_limit_offset0_by(self): + q = ClickHouseQuery.from_(self.table_abc).limit_offset_by(1, 0, "a", self.table_abc.b).select("a", "b", "c") + + self.assertEqual('''SELECT "a","b","c" FROM "abc" LIMIT 1 BY ("a","b")''', str(q)) + + def test_rename_table(self): + table_join = Table("join") + + q = ( + ClickHouseQuery.from_(self.table_abc) + .join(table_join) + .using("a") + .limit_by(1, self.table_abc.a, table_join.a) + .select(self.table_abc.b, table_join.b) + ) + q = q.replace_table(self.table_abc, Table("xyz")) + + self.assertEqual( + '''SELECT "xyz"."b","join"."b" FROM "xyz" JOIN "join" USING ("a") LIMIT 1 BY ("xyz"."a","join"."a")''', + str(q), + )