Skip to content

Commit

Permalink
Merge pull request #141 from mikeengland/master
Browse files Browse the repository at this point in the history
Fix bug when YoY reference is used with week dimension interval
  • Loading branch information
mikeengland authored Sep 21, 2017
2 parents 2af095a + 425a173 commit 927aa3d
Show file tree
Hide file tree
Showing 5 changed files with 716 additions and 647 deletions.
3 changes: 1 addition & 2 deletions fireant/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# coding: utf-8

__version__ = '{major}.{minor}.{patch}'.format(major=0, minor=17, patch=4)
__version__ = '{major}.{minor}.{patch}'.format(major=0, minor=17, patch=5)
4 changes: 4 additions & 0 deletions fireant/database/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class Trunc(terms.Function):

def __init__(self, field, date_format, alias=None):
super(Trunc, self).__init__('dashmore.TRUNC', field, date_format, alias=alias)
# Setting the fields here means we can access the TRUNC args by name.
self.field = field
self.date_format = date_format
self.alias = alias


class MySQLDatabase(Database):
Expand Down
31 changes: 19 additions & 12 deletions fireant/database/vertica.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# coding: utf-8
from pypika import terms, VerticaQuery
from pypika import (
VerticaQuery,
terms,
)

from fireant.database import Database
from fireant.slicer import DatetimeInterval
Expand All @@ -12,6 +15,10 @@ class Trunc(terms.Function):

def __init__(self, field, date_format, alias=None):
super(Trunc, self).__init__('TRUNC', field, date_format, alias=alias)
# Setting the fields here means we can access the TRUNC args by name.
self.field = field
self.date_format = date_format
self.alias = alias


class Interval(terms.Interval):
Expand Down Expand Up @@ -53,7 +60,7 @@ def get_sql(self, **kwargs):
# This has day as its maximum interval granularity, so it does not take into account leap years.
query = 'INTERVAL \'{expr} {unit}\''

return query.format(expr=expr, unit=unit)
return query.format(expr=expr, unit=unit)


class VerticaDatabase(Database):
Expand All @@ -63,6 +70,15 @@ class VerticaDatabase(Database):
# The pypika query class to use for constructing queries
query_cls = VerticaQuery

DATETIME_INTERVALS = {
'hour': DatetimeInterval('HH'),
'day': DatetimeInterval('DD'),
'week': DatetimeInterval('IW'),
'month': DatetimeInterval('MM'),
'quarter': DatetimeInterval('Q'),
'year': DatetimeInterval('Y')
}

def __init__(self, host='localhost', port=5433, database='vertica',
user='vertica', password=None,
read_timeout=None):
Expand All @@ -83,16 +99,7 @@ def connect(self):
)

def trunc_date(self, field, interval):
datetime_intervals = {
'hour': DatetimeInterval('HH'),
'day': DatetimeInterval('DD'),
'week': DatetimeInterval('IW'),
'month': DatetimeInterval('MM'),
'quarter': DatetimeInterval('Q'),
'year': DatetimeInterval('Y')
}

interval = datetime_intervals[interval]
interval = self.DATETIME_INTERVALS[interval]
return Trunc(field, interval.size)

def interval(self, **kwargs):
Expand Down
37 changes: 24 additions & 13 deletions fireant/slicer/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
from itertools import chain

import pandas as pd
from pypika import (
JoinType,
MySQLQuery,
)

from fireant import utils
from fireant.slicer.references import (
Delta,
DeltaPercentage,
YoY,
)
from pypika import (
JoinType,
MySQLQuery,
)

query_logger = logging.getLogger('fireant.query_log$')

Expand Down Expand Up @@ -129,7 +130,7 @@ def query_data(self, database, table, joins=None,
raise QueryNotSupportedError("MySQL currently doesn't support ROLLUP operations!")

query = self._build_data_query(
database, table, joins, metrics, dimensions, dfilters, mfilters, references, rollup, pagination
database, table, joins, metrics, dimensions, dfilters, mfilters, references, rollup, pagination
)

dataframe = self._get_dataframe_from_query(database, query)
Expand All @@ -145,8 +146,8 @@ def query_data(self, database, table, joins=None,

if dimensions:
dataframe = dataframe.set_index(
# Removed the reference keys for now
list(dimensions.keys()) # + ['{1}_{0}'.format(*ref) for ref in references.items()]
# Removed the reference keys for now
list(dimensions.keys()) # + ['{1}_{0}'.format(*ref) for ref in references.items()]
)

if references:
Expand All @@ -170,8 +171,8 @@ def _get_dataframe_from_query(self, database, query):
dataframe = database.fetch_dataframe(query_string)

query_logger.info('[duration: {duration} seconds]: {query}'.format(
duration=round(time.time() - start_time, 4),
query=query_string)
duration=round(time.time() - start_time, 4),
query=query_string)
)

return dataframe
Expand Down Expand Up @@ -230,9 +231,19 @@ def _build_reference_query(self, query, database, references, table, joins, metr

# The interval term from pypika does not take into account leap years, therefore the interval
# needs to be replaced with a database specific one when appropriate.
if reference_key in [YoY.key, Delta.generate_key(YoY.key), DeltaPercentage.generate_key(YoY.key)]:
yoy_keys = [YoY.key, Delta.generate_key(YoY.key), DeltaPercentage.generate_key(YoY.key)]
if reference_key in yoy_keys:
interval = database.interval(years=1)

# If YoY reference is used with the week interval, the dates will fail to match in the join
# as the first day of the ISO year is different. Therefore, we truncate the reference date by adding
# a year, truncating it and removing a year so the dates match.
from fireant.slicer import DatetimeDimension
week = DatetimeDimension.week
dim = dimensions[dimension_key]
if hasattr(dim, 'date_format') and dim.date_format in [week, database.DATETIME_INTERVALS[week].size]:
dimensions[dimension_key] = database.trunc_date(dim.field + interval, 'week') - interval

schema['interval'] = interval

# Don't reuse the dfilters arg otherwise intervals will be aggregated on each iteration
Expand All @@ -259,8 +270,8 @@ def _build_reference_query(self, query, database, references, table, joins, metr
get_reference_field = lambda key: ref_query.field(key)

wrapper_query = wrapper_query.select(
*[get_reference_field(key).as_(self._suffix(key, reference_key))
for key in metrics.keys()]
*[get_reference_field(key).as_(self._suffix(key, reference_key))
for key in metrics.keys()]
)

if pagination:
Expand Down Expand Up @@ -386,7 +397,7 @@ def _build_reference_join_criteria(dimension_key, dimensions, interval, query, r
ref_join_criteria = []
if dimension_key in dimensions:
ref_join_criteria.append(
query.field(dimension_key) == ref_query.field(dimension_key) + interval
query.field(dimension_key) == ref_query.field(dimension_key) + interval
)

# The below set is sorted to ensure that the ON part of the join is always consistently ordered
Expand Down
Loading

0 comments on commit 927aa3d

Please sign in to comment.