diff --git a/tests/utils/db/test_helpers.py b/tests/utils/db/test_helpers.py index a82411143..04094a217 100644 --- a/tests/utils/db/test_helpers.py +++ b/tests/utils/db/test_helpers.py @@ -282,7 +282,7 @@ def test_should_merge_table_with_same_structure(mocker, base_table, delta_table_ # THEN: expect a merge statement according to the spec of the tables assert ( - mock_exec.call_args[0][0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table ' + mock_exec.call_args_list[1].args[0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table ' 'ON base_schema.base_table.id = delta_schema.delta_table.id ' 'AND base_schema.base_table.received_at >= :first_duplicate_date ' 'WHEN MATCHED THEN UPDATE SET after = base_table.after ' @@ -291,7 +291,15 @@ def test_should_merge_table_with_same_structure(mocker, base_table, delta_table_ ) # THEN: expect the merge statement to have the correct parameters - assert mock_exec.call_args[0][0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} + assert mock_exec.call_args_list[1].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} + # THEN: expect a delete statement + assert ( + mock_exec.call_args_list[2].args[0].text == 'DELETE FROM delta_schema.delta_table ' + 'WHERE id IN (SELECT id FROM base_schema.base_table WHERE received_at >= :first_duplicate_date)' + ) + + # THEN: expect the delete statement to have the correct parameters + assert mock_exec.call_args_list[2].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} def test_should_not_merge_table_if_delta_empty(mocker, base_table, delta_table_1): @@ -354,7 +362,7 @@ def test_should_add_new_columns_and_merge_table(mocker, base_table, delta_table_ # THEN: expect a merge statement according to the spec of the tables assert ( - mock_exec.call_args[0][0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table ' + mock_exec.call_args_list[2].args[0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table ' 'ON base_schema.base_table.id = delta_schema.delta_table.id ' 'AND base_schema.base_table.received_at >= :first_duplicate_date ' 'WHEN MATCHED THEN UPDATE SET after = base_table.after ' @@ -364,7 +372,16 @@ def test_should_add_new_columns_and_merge_table(mocker, base_table, delta_table_ ) # THEN: expect the merge statement to have the correct parameters - assert mock_exec.call_args[0][0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} + assert mock_exec.call_args_list[2].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} + + # THEN: expect a delete statement + assert ( + mock_exec.call_args_list[3].args[0].text == 'DELETE FROM delta_schema.delta_table ' + 'WHERE id IN (SELECT id FROM base_schema.base_table WHERE received_at >= :first_duplicate_date)' + ) + + # THEN: expect the delete statement to have the correct parameters + assert mock_exec.call_args_list[3].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"} def test_load_query(sqlalchemy_memory_engine, test_data): diff --git a/transform/mattermost-analytics/models/staging/mm_telemetry_prod/base/base_events_delta.sql b/transform/mattermost-analytics/models/staging/mm_telemetry_prod/base/base_events_delta.sql index 69a31ff65..8313cf498 100644 --- a/transform/mattermost-analytics/models/staging/mm_telemetry_prod/base/base_events_delta.sql +++ b/transform/mattermost-analytics/models/staging/mm_telemetry_prod/base/base_events_delta.sql @@ -8,7 +8,6 @@ 'cluster_by': ['to_date(received_at)'], 'on_schema_change': 'append_new_columns', 'snowflake_warehouse': 'transform_l', - 'post_hook': 'delete from {{this}} where id in (select id from {{ source(\'rudder_support\', \'base_events\') }} where received_at > dateadd(day, -5, current_date))' }) }} diff --git a/utils/db/helpers.py b/utils/db/helpers.py index dc7312ca8..8838a6a74 100644 --- a/utils/db/helpers.py +++ b/utils/db/helpers.py @@ -320,6 +320,14 @@ def merge_event_delta_table_into( stmt = text(merge.__repr__()).bindparams(first_duplicate_date=first_duplicate_date) conn.execute(stmt) + # Delete rows from delta table that were merged + conn.execute( + text( + f'DELETE FROM {delta_schema}.{delta_table} ' + f'WHERE id IN (SELECT id FROM {base_schema}.{base_table} WHERE received_at >= :first_duplicate_date)' + ).bindparams(first_duplicate_date=first_duplicate_date) + ) + def load_query(conn: Connection, query: str) -> pd.DataFrame: """