Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Kolmogorov Smirnov query logic with sqlalchemy's Language Expression API #44

Merged
merged 20 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/datajudge/constraints/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ def check_acceptance(
def c(alpha: float):
return math.sqrt(-math.log(alpha / 2.0 + 1e-10) * 0.5)

return d_statistic <= c(accepted_level) * math.sqrt(
threshold = c(accepted_level) * math.sqrt(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly more convenient to debug.

(n_samples + m_samples) / (n_samples * m_samples)
)
return d_statistic <= threshold

@staticmethod
def calculate_statistic(
Expand Down
172 changes: 111 additions & 61 deletions src/datajudge/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,33 @@ def get_column_array_agg(
return result, selections


def _get_cdf_selection(engine, ref: DataReference, cdf_label: str, value_label: str):
col = ref.get_column(engine)
selection = ref.get_selection(engine).subquery()

# Step 1: Calculate the CDF over the value column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: Is possible to merge the two steps? Like so

sa.select([
  selection.c[col],
  sa.func.max(sa.func.cume_dist().over(order_by=col)),
])
.group_by(selection.c[col])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered the same and doing it that way leads to an error:
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.GroupingError) aggregate function calls cannot contain window function calls

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add in the docstring a bit more information about the objective/idea behind this method?

It's great to have the comments on the step-by-step like in the SQL version before, but a summary would be a great addition to it, particularly clarifying and being explicit about the meaning of the arguments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.
f396053

cdf_selection = sa.select(
[
selection.c[col].label(value_label),
sa.func.cume_dist().over(order_by=col).label(cdf_label),
]
).subquery()

# Step 2: Aggregate rows s.t. every value occurs only once.
grouped_cdf_selection = (
sa.select(
[
cdf_selection.c[value_label],
sa.func.max(cdf_selection.c[cdf_label]).label(cdf_label),
]
)
.group_by(cdf_selection.c[value_label])
.subquery()
)

return grouped_cdf_selection


def get_ks_2sample(
engine: sa.engine.Engine,
ref1: DataReference,
Expand All @@ -912,68 +939,91 @@ def get_ks_2sample(
"""
Runs the query for the two-sample Kolmogorov-Smirnov test and returns the test statistic d.
"""
# For mssql: "tempdb.dbo".table_name -> tempdb.dbo.table_name
table1_str = str(ref1.data_source.get_clause(engine)).replace('"', "")
col1 = ref1.get_column(engine)
table2_str = str(ref2.data_source.get_clause(engine)).replace('"', "")
col2 = ref2.get_column(engine)

# for a more extensive explanation, see:
# https://github.com/Quantco/datajudge/pull/28#issuecomment-1165587929
ks_query_string = f"""
WITH
tab1 AS ( -- Step 0: Prepare data source and value column
SELECT {col1} as val FROM {table1_str}
),
tab2 AS (
SELECT {col2} as val FROM {table2_str}
),
tab1_cdf AS ( -- Step 1: Calculate the CDF over the value column
SELECT val, cume_dist() over (order by val) as cdf
FROM tab1
),
tab2_cdf AS (
SELECT val, cume_dist() over (order by val) as cdf
FROM tab2
),
tab1_grouped AS ( -- Step 2: Remove unnecessary values, s.t. we have (x, cdf(x)) rows only
SELECT val, MAX(cdf) as cdf
FROM tab1_cdf
GROUP BY val
),
tab2_grouped AS (
SELECT val, MAX(cdf) as cdf
FROM tab2_cdf
GROUP BY val
),
joined_cdf AS ( -- Step 3: combine the cdfs
SELECT coalesce(tab1_grouped.val, tab2_grouped.val) as v, tab1_grouped.cdf as cdf1, tab2_grouped.cdf as cdf2
FROM tab1_grouped FULL OUTER JOIN tab2_grouped ON tab1_grouped.val = tab2_grouped.val
),
-- Step 4: Create a grouper id based on the value count; this is just a helper for forward-filling
grouped_cdf AS (
SELECT v,
COUNT(cdf1) over (order by v) as _grp1,
cdf1,
COUNT(cdf2) over (order by v) as _grp2,
cdf2
FROM joined_cdf
),
-- Step 5: Forward-Filling: Select first non-null value per group (defined in the prev. step)
filled_cdf AS (
SELECT v,
first_value(cdf1) over (partition by _grp1 order by v) as cdf1_filled,
first_value(cdf2) over (partition by _grp2 order by v) as cdf2_filled
FROM grouped_cdf),
-- Step 6: Replace NULL values (at the beginning) with 0 to calculate difference
replaced_nulls AS (
SELECT coalesce(cdf1_filled, 0) as cdf1, coalesce(cdf2_filled, 0) as cdf2
FROM filled_cdf)
-- Step 7: Calculate final statistic as max. distance
SELECT MAX(ABS(cdf1 - cdf2)) FROM replaced_nulls;
"""
cdf_label = "cdf"
value_label = "val"
cdf_selection1 = _get_cdf_selection(engine, ref1, cdf_label, value_label)
cdf_selection2 = _get_cdf_selection(engine, ref2, cdf_label, value_label)

cdf_label1 = cdf_label + "1"
cdf_label2 = cdf_label + "2"

# Step 3: Combine the cdfs.
join = (
sa.select(
sa.func.coalesce(
cdf_selection1.c[value_label], cdf_selection2.c[value_label]
).label(value_label),
cdf_selection1.c[cdf_label].label(cdf_label1),
cdf_selection2.c[cdf_label].label(cdf_label2),
)
.select_from(
cdf_selection1.join(
cdf_selection2,
cdf_selection1.c[value_label] == cdf_selection2.c[value_label],
isouter=True,
full=True,
)
)
.subquery()
)

group_label1 = "_grp1"
group_label2 = "_grp2"

def _cdf_count(table, value_label, cdf_label, group_label):
return (
sa.func.count(table.c[cdf_label])
.over(order_by=table.c[value_label])
.label(group_label)
)

# Step 4: Create a grouper id based on the value count; this is just a helper for forward-filling.
pooled_cdf = sa.select(
[
join.c[value_label],
_cdf_count(join, value_label, cdf_label1, group_label1),
join.c[cdf_label1],
_cdf_count(join, value_label, cdf_label2, group_label2),
join.c[cdf_label2],
]
).subquery()

def _forward_filled_cdf_column(table, cdf_label, value_label, group_label):
return (
# Step 6: Replace NULL values at the beginning with 0 to enable computation of difference.
sa.func.coalesce(
(
# Step 5: Forward-Filling: Select first non-NULL value per group (defined in the prev. step).
sa.func.first_value(table.c[cdf_label]).over(
partition_by=table.c[group_label], order_by=table.c[value_label]
)
),
0,
).label(cdf_label)
)

replaced_nulls = sa.select(
[
pooled_cdf.c[value_label],
_forward_filled_cdf_column(
pooled_cdf, cdf_label1, value_label, group_label1
),
_forward_filled_cdf_column(
pooled_cdf, cdf_label2, value_label, group_label2
),
]
).subquery()

# Step 7: Calculate final statistic: maximal distance.
final_selection = sa.select(
sa.func.max(
sa.func.abs(replaced_nulls.c[cdf_label1] - replaced_nulls.c[cdf_label2])
)
)

with engine.connect() as connection:
d_statistic = connection.execute(final_selection).scalar()

d_statistic = engine.execute(ks_query_string).scalar()
return d_statistic


Expand Down
17 changes: 11 additions & 6 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1902,8 +1902,8 @@ def test_diff_average_between():
identity,
"col_int",
"col_int",
Condition("col_int >= 3"),
Condition("col_int >= 3"),
Condition(raw_string="col_int >= 3"),
Condition(raw_string="col_int >= 3"),
1.0,
),
],
Expand All @@ -1925,15 +1925,20 @@ def test_ks_2sample_constraint_perfect_between(engine, int_table1, data):
assert operation(test_result.outcome), test_result.failure_message


# TODO: Enable this test once the bug is fixed.
@pytest.mark.skip(reason="This is a known bug and unintended behaviour.")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer skipping this test (as well as adding further examples) should indicate that this PR solves #42

@pytest.mark.parametrize(
"data",
[
(negation, "col_int", "col_int", None, Condition("col_int >= 10"), 1.0),
(
negation,
"col_int",
"col_int",
None,
Condition(raw_string="col_int >= 10"),
1.0,
),
],
)
def test_ks_2sample_constraint_perfect_between_different_condition(
def test_ks_2sample_constraint_perfect_between_different_conditions(
engine, int_table1, data
):
"""
Expand Down