Skip to content

Commit

Permalink
better support for virtual columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Gaetano Guerriero committed Jan 28, 2022
1 parent 9c15f13 commit 8e43199
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def topn(self, **kwargs):
:param str dimension: Dimension to run the query against
:param str metric: Metric over which to sort the specified dimension by
:param int threshold: How many of the top items to return
:param list virtual_columns: A list of VirtualColumnSpec or dict to add
to the query
:return: The query result
:rtype: Query
Expand Down Expand Up @@ -137,6 +139,8 @@ def timeseries(self, **kwargs):
:type intervals: str or list
:param dict aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
:param list virtual_columns: A list of VirtualColumnSpec or dict to add
to the query
:return: The query result
:rtype: Query
Expand Down Expand Up @@ -240,6 +244,8 @@ def groupby(self, **kwargs):
:param dict aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
:param list dimensions: The dimensions to group by
:param list virtual_columns: A list of VirtualColumnSpec or dict to add
to the query
:return: The query result
:rtype: Query
Expand Down Expand Up @@ -599,6 +605,8 @@ def scan(self, **kwargs):
empty, all columns are returned
:param list metrics: The list of metrics to select. If left empty,
all metrics are returned
:param list virtual_columns: A list of VirtualColumnSpec or dict to add
to the query
:param dict context: A dict of query context options
:return: The query result
Expand Down
9 changes: 9 additions & 0 deletions pydruid/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pydruid.utils.having import Having
from pydruid.utils.postaggregator import Postaggregator
from pydruid.utils.query_utils import UnicodeWriter
from pydruid.utils.virtualcolumns import build_virtual_column


class Query(MutableSequence):
Expand Down Expand Up @@ -310,6 +311,10 @@ def build_query(self, query_type, args):
query_dict[key] = build_dimension(val)
elif key == "dimensions":
query_dict[key] = [build_dimension(v) for v in val]
elif key in ("virtualColumns", "virtual_columns"):
query_dict["virtualColumns"] = [
build_virtual_column(virtual_col) for virtual_col in val
]
else:
query_dict[key] = val

Expand Down Expand Up @@ -341,6 +346,7 @@ def topn(self, args):
"threshold",
"metric",
"virtualColumns",
"virtual_columns",
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)
Expand All @@ -365,6 +371,7 @@ def timeseries(self, args):
"post_aggregations",
"intervals",
"virtualColumns",
"virtual_columns",
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)
Expand All @@ -391,6 +398,7 @@ def groupby(self, args):
"dimensions",
"limit_spec",
"virtualColumns",
"virtual_columns",
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)
Expand Down Expand Up @@ -516,6 +524,7 @@ def scan(self, args):
"intervals",
"limit",
"order",
"virtual_columns",
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)
28 changes: 28 additions & 0 deletions pydruid/utils/virtualcolumns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from typing import Union


def build_virtual_column(virtual_column: Union[VirtualColumnSpec, dict]) -> dict:
if isinstance(virtual_column, VirtualColumnSpec):
return virtual_column.build()

return virtual_column


class VirtualColumnSpec:
def __init__(self, name: str, expression: str, output_type: str = None):
self._name = name
self._expression = expression
self._output_type = output_type

def build(self) -> dict:
build = {
"type": "expression",
"name": self._name,
"expression": self._expression,
}
if self._output_type is not None:
build["outputType"] = self._output_type

return build
32 changes: 30 additions & 2 deletions tests/test_query.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- coding: UTF-8 -*-
# -*- coding: utf-8 -*-
#
# Copyright 2016 Metamarkets Group Inc.
#
Expand All @@ -23,7 +23,7 @@
from pandas.testing import assert_frame_equal

from pydruid.query import Query, QueryBuilder
from pydruid.utils import aggregators, filters, having, postaggregator
from pydruid.utils import aggregators, filters, having, postaggregator, virtualcolumns


def create_query_with_results():
Expand Down Expand Up @@ -226,6 +226,34 @@ def test_build_subquery(self):
# then
assert subquery_dict == expected_query_dict

def test_topn_virtual_columns(self) -> None:
builder = QueryBuilder()

query_kwargs = {
"datasource": "things",
"intervals": "2013-10-23/2013-10-26",
"granularity": "all",
"dimension": "virtual_id",
"threshold": 5,
"metric": "views",
"virtual_columns": [
virtualcolumns.VirtualColumnSpec(
"virtual_id", "concat('ns:' + real_id)", 'STRING'
)
]
}

query = builder.topn(query_kwargs)
assert query.query_dict['queryType'] == 'topN'

expected_virtual_columns = [
{
'type': 'expression', 'name': 'virtual_id',
'expression': "concat('ns:' + real_id)", 'outputType': 'STRING'
}
]
assert query.query_dict['virtualColumns'] == expected_virtual_columns


class TestQuery:
def test_export_tsv(self, tmpdir):
Expand Down
37 changes: 37 additions & 0 deletions tests/utils/test_virtualcolumns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pydruid.utils.virtualcolumns import build_virtual_column, VirtualColumnSpec


class BuildVirtualColumnTC:
def test_with_virtual_column_spec(self) -> None:
virtual_col_spec = VirtualColumnSpec("foo", "bar + 3")
assert build_virtual_column(virtual_col_spec) == virtual_col_spec.build()

def test_with_dict(self) -> None:
virtual_column = {
"type": "expression",
"name": "doubleVote",
"expression": "vote * 2",
"outputType": "LONG",
}
assert build_virtual_column(virtual_column) == virtual_column


class TestVirtualColumnSpec:
def test_default(self) -> None:
spec = VirtualColumnSpec("foo", "concat(bar + '123')")
expected = {
"type": "expression",
"name": "foo",
"expression": "concat(bar + '123')",
}
assert spec.build() == expected

def test_output_type(self) -> None:
spec = VirtualColumnSpec("foo", "bar * 3", "LONG")
expected = {
"type": "expression",
"name": "foo",
"expression": "bar * 3",
"outputType": "LONG",
}
assert spec.build() == expected

0 comments on commit 8e43199

Please sign in to comment.