diff --git a/tests/ops/integration_tests/test_mariadb_task.py b/tests/ops/integration_tests/test_mariadb_task.py new file mode 100644 index 0000000000..3951a2830a --- /dev/null +++ b/tests/ops/integration_tests/test_mariadb_task.py @@ -0,0 +1,101 @@ +import pytest + +from fides.api.models.privacy_request import ExecutionLog + +from ...conftest import access_runner_tester +from ..graph.graph_test_util import assert_rows_match, records_matching_fields +from ..task.traversal_data import integration_db_graph + + +@pytest.mark.integration_mariadb +@pytest.mark.integration +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_mariadb_access_request_task( + db, + policy, + connection_config_mariadb, + mariadb_integration_db, + dsr_version, + request, + privacy_request, +) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + v = access_runner_tester( + privacy_request, + policy, + integration_db_graph("my_maria_db_1"), + [connection_config_mariadb], + {"email": "customer-1@example.com"}, + db, + ) + + assert_rows_match( + v["my_maria_db_1:address"], + min_size=2, + keys=["id", "street", "city", "state", "zip"], + ) + assert_rows_match( + v["my_maria_db_1:orders"], + min_size=3, + keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], + ) + assert_rows_match( + v["my_maria_db_1:payment_card"], + min_size=2, + keys=["id", "name", "ccn", "customer_id", "billing_address_id"], + ) + assert_rows_match( + v["my_maria_db_1:customer"], + min_size=1, + keys=["id", "name", "email", "address_id"], + ) + + # links + assert v["my_maria_db_1:customer"][0]["email"] == "customer-1@example.com" + + logs = ( + ExecutionLog.query(db=db) + .filter(ExecutionLog.privacy_request_id == privacy_request.id) + .all() + ) + + logs = [log.__dict__ for log in logs] + assert ( + len( + records_matching_fields( + logs, dataset_name="my_maria_db_1", collection_name="customer" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_maria_db_1", collection_name="address" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_maria_db_1", collection_name="orders" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, + dataset_name="my_maria_db_1", + collection_name="payment_card", + ) + ) + > 0 + ) diff --git a/tests/ops/integration_tests/test_mssql_task.py b/tests/ops/integration_tests/test_mssql_task.py new file mode 100644 index 0000000000..6bc23eeda0 --- /dev/null +++ b/tests/ops/integration_tests/test_mssql_task.py @@ -0,0 +1,101 @@ +import pytest + +from fides.api.models.privacy_request import ExecutionLog + +from ...conftest import access_runner_tester +from ..graph.graph_test_util import assert_rows_match, records_matching_fields +from ..task.traversal_data import integration_db_graph + + +@pytest.mark.integration_mssql +@pytest.mark.integration +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_mssql_access_request_task( + db, + policy, + connection_config_mssql, + mssql_integration_db, + privacy_request, + dsr_version, + request, +) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + v = access_runner_tester( + privacy_request, + policy, + integration_db_graph("my_mssql_db_1"), + [connection_config_mssql], + {"email": "customer-1@example.com"}, + db, + ) + + assert_rows_match( + v["my_mssql_db_1:address"], + min_size=2, + keys=["id", "street", "city", "state", "zip"], + ) + assert_rows_match( + v["my_mssql_db_1:orders"], + min_size=3, + keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], + ) + assert_rows_match( + v["my_mssql_db_1:payment_card"], + min_size=2, + keys=["id", "name", "ccn", "customer_id", "billing_address_id"], + ) + assert_rows_match( + v["my_mssql_db_1:customer"], + min_size=1, + keys=["id", "name", "email", "address_id"], + ) + + # links + assert v["my_mssql_db_1:customer"][0]["email"] == "customer-1@example.com" + + logs = ( + ExecutionLog.query(db=db) + .filter(ExecutionLog.privacy_request_id == privacy_request.id) + .all() + ) + + logs = [log.__dict__ for log in logs] + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mssql_db_1", collection_name="customer" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mssql_db_1", collection_name="address" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mssql_db_1", collection_name="orders" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, + dataset_name="my_mssql_db_1", + collection_name="payment_card", + ) + ) + > 0 + ) diff --git a/tests/ops/integration_tests/test_mysql_task.py b/tests/ops/integration_tests/test_mysql_task.py new file mode 100644 index 0000000000..40551dd4d9 --- /dev/null +++ b/tests/ops/integration_tests/test_mysql_task.py @@ -0,0 +1,101 @@ +import pytest + +from fides.api.models.privacy_request import ExecutionLog + +from ...conftest import access_runner_tester +from ..graph.graph_test_util import assert_rows_match, records_matching_fields +from ..task.traversal_data import integration_db_graph + + +@pytest.mark.integration +@pytest.mark.integration_mysql +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_mysql_access_request_task( + db, + policy, + connection_config_mysql, + mysql_integration_db, + privacy_request, + dsr_version, + request, +) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + v = access_runner_tester( + privacy_request, + policy, + integration_db_graph("my_mysql_db_1"), + [connection_config_mysql], + {"email": "customer-1@example.com"}, + db, + ) + + assert_rows_match( + v["my_mysql_db_1:address"], + min_size=2, + keys=["id", "street", "city", "state", "zip"], + ) + assert_rows_match( + v["my_mysql_db_1:orders"], + min_size=3, + keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], + ) + assert_rows_match( + v["my_mysql_db_1:payment_card"], + min_size=2, + keys=["id", "name", "ccn", "customer_id", "billing_address_id"], + ) + assert_rows_match( + v["my_mysql_db_1:customer"], + min_size=1, + keys=["id", "name", "email", "address_id"], + ) + + # links + assert v["my_mysql_db_1:customer"][0]["email"] == "customer-1@example.com" + + logs = ( + ExecutionLog.query(db=db) + .filter(ExecutionLog.privacy_request_id == privacy_request.id) + .all() + ) + + logs = [log.__dict__ for log in logs] + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mysql_db_1", collection_name="customer" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mysql_db_1", collection_name="address" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, dataset_name="my_mysql_db_1", collection_name="orders" + ) + ) + > 0 + ) + assert ( + len( + records_matching_fields( + logs, + dataset_name="my_mysql_db_1", + collection_name="payment_card", + ) + ) + > 0 + ) diff --git a/tests/ops/integration_tests/test_scylladb_task.py b/tests/ops/integration_tests/test_scylladb_task.py new file mode 100644 index 0000000000..8ced1317ad --- /dev/null +++ b/tests/ops/integration_tests/test_scylladb_task.py @@ -0,0 +1,190 @@ +import pytest +from sqlalchemy.orm import Session + +from fides.api.models.privacy_request import ExecutionLogStatus, PrivacyRequest +from fides.api.service.connectors.scylla_connector import ScyllaConnectorMissingKeyspace +from fides.api.task.graph_task import get_cached_data_for_erasures + +from ...conftest import access_runner_tester, erasure_runner_tester +from ..graph.graph_test_util import assert_rows_match, erasure_policy +from ..task.traversal_data import integration_scylladb_graph + + +@pytest.mark.integration +@pytest.mark.integration_scylladb +@pytest.mark.asyncio +class TestScyllaDSRs: + @pytest.mark.parametrize( + "dsr_version", + ["use_dsr_2_0"], + ) + async def test_scylladb_access_request_task_no_keyspace_dsr2( + self, + db: Session, + policy, + integration_scylladb_config, + scylladb_integration_no_keyspace, + privacy_request, + dsr_version, + request, + ) -> None: + request.getfixturevalue(dsr_version) + + with pytest.raises(ScyllaConnectorMissingKeyspace) as err: + v = access_runner_tester( + privacy_request, + policy, + integration_scylladb_graph("scylla_example"), + [integration_scylladb_config], + {"email": "customer-1@example.com"}, + db, + ) + + assert ( + "No keyspace provided in the ScyllaDB configuration for connector scylla_example" + in str(err.value) + ) + + @pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0"], + ) + async def test_scylladb_access_request_task_no_keyspace_dsr3( + self, + db, + policy, + integration_scylladb_config, + scylladb_integration_no_keyspace, + privacy_request: PrivacyRequest, + dsr_version, + request, + ) -> None: + request.getfixturevalue(dsr_version) + v = access_runner_tester( + privacy_request, + policy, + integration_scylladb_graph("scylla_example"), + [integration_scylladb_config], + {"email": "customer-1@example.com"}, + db, + ) + + assert v == {} + assert ( + privacy_request.access_tasks.count() == 6 + ) # There's 4 tables plus the root and terminal "dummy" tasks + + # Root task should be completed + assert privacy_request.access_tasks.first().collection_name == "__ROOT__" + assert ( + privacy_request.access_tasks.first().status == ExecutionLogStatus.complete + ) + + # All other tasks should be error + for access_task in privacy_request.access_tasks.offset(1): + assert access_task.status == ExecutionLogStatus.error + + @pytest.mark.parametrize( + "dsr_version", + ["use_dsr_2_0", "use_dsr_3_0"], + ) + async def test_scylladb_access_request_task( + self, + db, + policy, + integration_scylladb_config_with_keyspace, + scylla_reset_db, + scylladb_integration_with_keyspace, + privacy_request, + dsr_version, + request, + ) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + results = access_runner_tester( + privacy_request, + policy, + integration_scylladb_graph("scylla_example_with_keyspace"), + [integration_scylladb_config_with_keyspace], + {"email": "customer-1@example.com"}, + db, + ) + + assert_rows_match( + results["scylla_example_with_keyspace:users"], + min_size=1, + keys=[ + "age", + "alternative_contacts", + "do_not_contact", + "email", + "name", + "last_contacted", + "logins", + "states_lived", + ], + ) + assert_rows_match( + results["scylla_example_with_keyspace:user_activity"], + min_size=3, + keys=["timestamp", "user_agent", "activity_type"], + ) + assert_rows_match( + results["scylla_example_with_keyspace:payment_methods"], + min_size=2, + keys=["card_number", "expiration_date"], + ) + assert_rows_match( + results["scylla_example_with_keyspace:orders"], + min_size=2, + keys=["order_amount", "order_date", "order_description"], + ) + + @pytest.mark.parametrize( + "dsr_version", + ["use_dsr_2_0", "use_dsr_3_0"], + ) + async def test_scylladb_erasure_task( + self, + db, + integration_scylladb_config_with_keyspace, + scylladb_integration_with_keyspace, + scylla_reset_db, + privacy_request, + dsr_version, + request, + ): + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + seed_email = "customer-1@example.com" + + policy = erasure_policy( + db, "user.name", "user.behavior", "user.device", "user.payment" + ) + privacy_request.policy_id = policy.id + privacy_request.save(db) + + graph = integration_scylladb_graph("scylla_example_with_keyspace") + access_runner_tester( + privacy_request, + policy, + integration_scylladb_graph("scylla_example_with_keyspace"), + [integration_scylladb_config_with_keyspace], + {"email": seed_email}, + db, + ) + results = erasure_runner_tester( + privacy_request, + policy, + graph, + [integration_scylladb_config_with_keyspace], + {"email": seed_email}, + get_cached_data_for_erasures(privacy_request.id), + db, + ) + assert results == { + "scylla_example_with_keyspace:user_activity": 3, + "scylla_example_with_keyspace:users": 1, + "scylla_example_with_keyspace:payment_methods": 2, + "scylla_example_with_keyspace:orders": 2, + } diff --git a/tests/ops/integration_tests/test_sql_task.py b/tests/ops/integration_tests/test_sql_task.py index 298d77229a..cd4bb1551f 100644 --- a/tests/ops/integration_tests/test_sql_task.py +++ b/tests/ops/integration_tests/test_sql_task.py @@ -6,30 +6,16 @@ import pytest from fideslang import Dataset from sqlalchemy import text -from sqlalchemy.orm import Session - -from fides.api.graph.config import ( - Collection, - CollectionAddress, - FieldAddress, - GraphDataset, - ScalarField, -) + +from fides.api.graph.config import Collection, FieldAddress, GraphDataset, ScalarField from fides.api.graph.data_type import DataType, StringTypeConverter from fides.api.graph.graph import DatasetGraph, Edge, Node from fides.api.graph.traversal import TraversalNode from fides.api.models.connectionconfig import ConnectionConfig from fides.api.models.datasetconfig import convert_dataset_to_graph from fides.api.models.policy import ActionType, Policy, Rule, RuleTarget -from fides.api.models.privacy_request import ( - ExecutionLog, - ExecutionLogStatus, - PrivacyRequest, - PrivacyRequestStatus, - RequestTask, -) +from fides.api.models.privacy_request import ExecutionLog, RequestTask from fides.api.service.connectors import get_connector -from fides.api.service.connectors.scylla_connector import ScyllaConnectorMissingKeyspace from fides.api.task.filter_results import filter_data_categories from fides.api.task.graph_task import get_cached_data_for_erasures from fides.config import CONFIG @@ -42,12 +28,7 @@ field, records_matching_fields, ) -from ..task.traversal_data import ( - integration_db_graph, - integration_scylladb_graph, - postgres_db_graph_dataset, - str_converter, -) +from ..task.traversal_data import integration_db_graph, postgres_db_graph_dataset @pytest.mark.integration_postgres @@ -57,7 +38,7 @@ "dsr_version", ["use_dsr_3_0", "use_dsr_2_0"], ) -async def test_sql_erasure_ignores_collections_without_pk( +async def test_sql_erasure_does_not_ignore_collections_without_pk( db, postgres_inserts, integration_postgres_config, @@ -116,7 +97,7 @@ async def test_sql_erasure_ignores_collections_without_pk( .all() ) logs = [log.__dict__ for log in logs] - # since address has no primary_key=True field, it's erasure is skipped + # erasure is not skipped since primary_key is not required assert ( len( records_matching_fields( @@ -126,13 +107,13 @@ async def test_sql_erasure_ignores_collections_without_pk( message="No values were erased since no primary key was defined for this collection", ) ) - == 1 + == 0 ) assert v == { "postgres_example:customer": 1, "postgres_example:payment_card": 0, "postgres_example:orders": 0, - "postgres_example:address": 0, + "postgres_example:address": 2, } @@ -504,468 +485,7 @@ async def test_postgres_privacy_requests_against_non_default_schema( assert johanna_record.name is None # Masked by erasure request -@pytest.mark.integration_mssql -@pytest.mark.integration -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_mssql_access_request_task( - db, - policy, - connection_config_mssql, - mssql_integration_db, - privacy_request, - dsr_version, - request, -) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - v = access_runner_tester( - privacy_request, - policy, - integration_db_graph("my_mssql_db_1"), - [connection_config_mssql], - {"email": "customer-1@example.com"}, - db, - ) - - assert_rows_match( - v["my_mssql_db_1:address"], - min_size=2, - keys=["id", "street", "city", "state", "zip"], - ) - assert_rows_match( - v["my_mssql_db_1:orders"], - min_size=3, - keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], - ) - assert_rows_match( - v["my_mssql_db_1:payment_card"], - min_size=2, - keys=["id", "name", "ccn", "customer_id", "billing_address_id"], - ) - assert_rows_match( - v["my_mssql_db_1:customer"], - min_size=1, - keys=["id", "name", "email", "address_id"], - ) - - # links - assert v["my_mssql_db_1:customer"][0]["email"] == "customer-1@example.com" - - logs = ( - ExecutionLog.query(db=db) - .filter(ExecutionLog.privacy_request_id == privacy_request.id) - .all() - ) - - logs = [log.__dict__ for log in logs] - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mssql_db_1", collection_name="customer" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mssql_db_1", collection_name="address" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mssql_db_1", collection_name="orders" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, - dataset_name="my_mssql_db_1", - collection_name="payment_card", - ) - ) - > 0 - ) - - -@pytest.mark.integration -@pytest.mark.integration_mysql -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_mysql_access_request_task( - db, - policy, - connection_config_mysql, - mysql_integration_db, - privacy_request, - dsr_version, - request, -) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - v = access_runner_tester( - privacy_request, - policy, - integration_db_graph("my_mysql_db_1"), - [connection_config_mysql], - {"email": "customer-1@example.com"}, - db, - ) - - assert_rows_match( - v["my_mysql_db_1:address"], - min_size=2, - keys=["id", "street", "city", "state", "zip"], - ) - assert_rows_match( - v["my_mysql_db_1:orders"], - min_size=3, - keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], - ) - assert_rows_match( - v["my_mysql_db_1:payment_card"], - min_size=2, - keys=["id", "name", "ccn", "customer_id", "billing_address_id"], - ) - assert_rows_match( - v["my_mysql_db_1:customer"], - min_size=1, - keys=["id", "name", "email", "address_id"], - ) - - # links - assert v["my_mysql_db_1:customer"][0]["email"] == "customer-1@example.com" - - logs = ( - ExecutionLog.query(db=db) - .filter(ExecutionLog.privacy_request_id == privacy_request.id) - .all() - ) - - logs = [log.__dict__ for log in logs] - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mysql_db_1", collection_name="customer" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mysql_db_1", collection_name="address" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_mysql_db_1", collection_name="orders" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, - dataset_name="my_mysql_db_1", - collection_name="payment_card", - ) - ) - > 0 - ) - - -@pytest.mark.integration_mariadb -@pytest.mark.integration -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_mariadb_access_request_task( - db, - policy, - connection_config_mariadb, - mariadb_integration_db, - dsr_version, - request, - privacy_request, -) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - v = access_runner_tester( - privacy_request, - policy, - integration_db_graph("my_maria_db_1"), - [connection_config_mariadb], - {"email": "customer-1@example.com"}, - db, - ) - - assert_rows_match( - v["my_maria_db_1:address"], - min_size=2, - keys=["id", "street", "city", "state", "zip"], - ) - assert_rows_match( - v["my_maria_db_1:orders"], - min_size=3, - keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], - ) - assert_rows_match( - v["my_maria_db_1:payment_card"], - min_size=2, - keys=["id", "name", "ccn", "customer_id", "billing_address_id"], - ) - assert_rows_match( - v["my_maria_db_1:customer"], - min_size=1, - keys=["id", "name", "email", "address_id"], - ) - - # links - assert v["my_maria_db_1:customer"][0]["email"] == "customer-1@example.com" - - logs = ( - ExecutionLog.query(db=db) - .filter(ExecutionLog.privacy_request_id == privacy_request.id) - .all() - ) - - logs = [log.__dict__ for log in logs] - assert ( - len( - records_matching_fields( - logs, dataset_name="my_maria_db_1", collection_name="customer" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_maria_db_1", collection_name="address" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, dataset_name="my_maria_db_1", collection_name="orders" - ) - ) - > 0 - ) - assert ( - len( - records_matching_fields( - logs, - dataset_name="my_maria_db_1", - collection_name="payment_card", - ) - ) - > 0 - ) - - -@pytest.mark.integration -@pytest.mark.integration_scylladb -@pytest.mark.asyncio -class TestScyllaDSRs: - @pytest.mark.parametrize( - "dsr_version", - ["use_dsr_2_0"], - ) - async def test_scylladb_access_request_task_no_keyspace_dsr2( - self, - db: Session, - policy, - integration_scylladb_config, - scylladb_integration_no_keyspace, - privacy_request, - dsr_version, - request, - ) -> None: - request.getfixturevalue(dsr_version) - - with pytest.raises(ScyllaConnectorMissingKeyspace) as err: - v = access_runner_tester( - privacy_request, - policy, - integration_scylladb_graph("scylla_example"), - [integration_scylladb_config], - {"email": "customer-1@example.com"}, - db, - ) - - assert ( - "No keyspace provided in the ScyllaDB configuration for connector scylla_example" - in str(err.value) - ) - - @pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0"], - ) - async def test_scylladb_access_request_task_no_keyspace_dsr3( - self, - db, - policy, - integration_scylladb_config, - scylladb_integration_no_keyspace, - privacy_request: PrivacyRequest, - dsr_version, - request, - ) -> None: - request.getfixturevalue(dsr_version) - v = access_runner_tester( - privacy_request, - policy, - integration_scylladb_graph("scylla_example"), - [integration_scylladb_config], - {"email": "customer-1@example.com"}, - db, - ) - - assert v == {} - assert ( - privacy_request.access_tasks.count() == 6 - ) # There's 4 tables plus the root and terminal "dummy" tasks - - # Root task should be completed - assert privacy_request.access_tasks.first().collection_name == "__ROOT__" - assert ( - privacy_request.access_tasks.first().status == ExecutionLogStatus.complete - ) - - # All other tasks should be error - for access_task in privacy_request.access_tasks.offset(1): - assert access_task.status == ExecutionLogStatus.error - - @pytest.mark.parametrize( - "dsr_version", - ["use_dsr_2_0", "use_dsr_3_0"], - ) - async def test_scylladb_access_request_task( - self, - db, - policy, - integration_scylladb_config_with_keyspace, - scylla_reset_db, - scylladb_integration_with_keyspace, - privacy_request, - dsr_version, - request, - ) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - results = access_runner_tester( - privacy_request, - policy, - integration_scylladb_graph("scylla_example_with_keyspace"), - [integration_scylladb_config_with_keyspace], - {"email": "customer-1@example.com"}, - db, - ) - - assert_rows_match( - results["scylla_example_with_keyspace:users"], - min_size=1, - keys=[ - "age", - "alternative_contacts", - "do_not_contact", - "email", - "name", - "last_contacted", - "logins", - "states_lived", - ], - ) - assert_rows_match( - results["scylla_example_with_keyspace:user_activity"], - min_size=3, - keys=["timestamp", "user_agent", "activity_type"], - ) - assert_rows_match( - results["scylla_example_with_keyspace:payment_methods"], - min_size=2, - keys=["card_number", "expiration_date"], - ) - assert_rows_match( - results["scylla_example_with_keyspace:orders"], - min_size=2, - keys=["order_amount", "order_date", "order_description"], - ) - - @pytest.mark.parametrize( - "dsr_version", - ["use_dsr_2_0", "use_dsr_3_0"], - ) - async def test_scylladb_erasure_task( - self, - db, - integration_scylladb_config_with_keyspace, - scylladb_integration_with_keyspace, - scylla_reset_db, - privacy_request, - dsr_version, - request, - ): - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - seed_email = "customer-1@example.com" - - policy = erasure_policy( - db, "user.name", "user.behavior", "user.device", "user.payment" - ) - privacy_request.policy_id = policy.id - privacy_request.save(db) - - graph = integration_scylladb_graph("scylla_example_with_keyspace") - access_runner_tester( - privacy_request, - policy, - integration_scylladb_graph("scylla_example_with_keyspace"), - [integration_scylladb_config_with_keyspace], - {"email": seed_email}, - db, - ) - results = erasure_runner_tester( - privacy_request, - policy, - graph, - [integration_scylladb_config_with_keyspace], - {"email": seed_email}, - get_cached_data_for_erasures(privacy_request.id), - db, - ) - assert results == { - "scylla_example_with_keyspace:user_activity": 3, - "scylla_example_with_keyspace:users": 1, - "scylla_example_with_keyspace:payment_methods": 2, - "scylla_example_with_keyspace:orders": 2, - } - - +@pytest.mark.integration_postgres @pytest.mark.integration @pytest.mark.asyncio @pytest.mark.parametrize( @@ -1602,280 +1122,3 @@ async def test_retry_erasure( "error", "error", } - - -@pytest.mark.integration_timescale -@pytest.mark.integration -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_timescale_access_request_task( - db, - policy, - timescale_connection_config, - timescale_integration_db, - privacy_request, - dsr_version, - request, -) -> None: - database_name = "my_timescale_db_1" - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - v = access_runner_tester( - privacy_request, - policy, - integration_db_graph(database_name), - [timescale_connection_config], - {"email": "customer-1@example.com"}, - db, - ) - - assert_rows_match( - v[f"{database_name}:address"], - min_size=2, - keys=["id", "street", "city", "state", "zip"], - ) - assert_rows_match( - v[f"{database_name}:orders"], - min_size=3, - keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], - ) - assert_rows_match( - v[f"{database_name}:payment_card"], - min_size=2, - keys=["id", "name", "ccn", "customer_id", "billing_address_id"], - ) - assert_rows_match( - v[f"{database_name}:customer"], - min_size=1, - keys=["id", "name", "email", "address_id"], - ) - - # links - assert v[f"{database_name}:customer"][0]["email"] == "customer-1@example.com" - - logs = ( - ExecutionLog.query(db=db) - .filter(ExecutionLog.privacy_request_id == privacy_request.id) - .all() - ) - - logs = [log.__dict__ for log in logs] - - assert ( - len( - records_matching_fields( - logs, dataset_name=database_name, collection_name="customer" - ) - ) - > 0 - ) - - assert ( - len( - records_matching_fields( - logs, dataset_name=database_name, collection_name="address" - ) - ) - > 0 - ) - - assert ( - len( - records_matching_fields( - logs, dataset_name=database_name, collection_name="orders" - ) - ) - > 0 - ) - - assert ( - len( - records_matching_fields( - logs, - dataset_name=database_name, - collection_name="payment_card", - ) - ) - > 0 - ) - - -@pytest.mark.integration_timescale -@pytest.mark.integration -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_timescale_erasure_request_task( - db, - erasure_policy, - timescale_connection_config, - timescale_integration_db, - privacy_request_with_erasure_policy, - dsr_version, - request, -) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - rule = erasure_policy.rules[0] - target = rule.targets[0] - target.data_category = "user" - target.save(db) - - database_name = "my_timescale_db_1" - - dataset = postgres_db_graph_dataset(database_name, timescale_connection_config.key) - - # Set some data categories on fields that will be targeted by the policy above - field([dataset], database_name, "customer", "name").data_categories = ["user.name"] - field([dataset], database_name, "address", "street").data_categories = ["user"] - field([dataset], database_name, "payment_card", "ccn").data_categories = ["user"] - - graph = DatasetGraph(dataset) - - v = access_runner_tester( - privacy_request_with_erasure_policy, - erasure_policy, - graph, - [timescale_connection_config], - {"email": "customer-1@example.com"}, - db, - ) - - v = erasure_runner_tester( - privacy_request_with_erasure_policy, - erasure_policy, - graph, - [timescale_connection_config], - {"email": "customer-1@example.com"}, - get_cached_data_for_erasures(privacy_request_with_erasure_policy.id), - db, - ) - assert v == { - f"{database_name}:customer": 1, - f"{database_name}:orders": 0, - f"{database_name}:payment_card": 2, - f"{database_name}:address": 2, - }, "No erasure on orders table - no data categories targeted" - - # Verify masking in appropriate tables - address_cursor = timescale_integration_db.execute( - text("select * from address where id in (1, 2)") - ) - for address in address_cursor: - assert address.street is None # Masked due to matching data category - assert address.state is not None - assert address.city is not None - assert address.zip is not None - - customer_cursor = timescale_integration_db.execute( - text("select * from customer where id = 1") - ) - customer = [customer for customer in customer_cursor][0] - assert customer.name is None # Masked due to matching data category - assert customer.email == "customer-1@example.com" - assert customer.address_id is not None - - payment_card_cursor = timescale_integration_db.execute( - text("select * from payment_card where id in ('pay_aaa-aaa', 'pay_bbb-bbb')") - ) - payment_cards = [card for card in payment_card_cursor] - assert all( - [card.ccn is None for card in payment_cards] - ) # Masked due to matching data category - assert not any([card.name is None for card in payment_cards]) is None - - -@pytest.mark.integration_timescale -@pytest.mark.integration -@pytest.mark.asyncio -@pytest.mark.parametrize( - "dsr_version", - ["use_dsr_3_0", "use_dsr_2_0"], -) -async def test_timescale_query_and_mask_hypertable( - db, - erasure_policy, - timescale_connection_config, - timescale_integration_db, - privacy_request_with_erasure_policy, - dsr_version, - request, -) -> None: - request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 - - database_name = "my_timescale_db_1" - - dataset = postgres_db_graph_dataset(database_name, timescale_connection_config.key) - # For this test, add a new collection to our standard dataset corresponding to the - # "onsite_personnel" timescale hypertable - onsite_personnel_collection = Collection( - name="onsite_personnel", - fields=[ - ScalarField( - name="responsible", data_type_converter=str_converter, identity="email" - ), - ScalarField( - name="time", data_type_converter=str_converter, primary_key=True - ), - ], - ) - - dataset.collections.append(onsite_personnel_collection) - graph = DatasetGraph(dataset) - rule = erasure_policy.rules[0] - target = rule.targets[0] - target.data_category = "user" - target.save(db) - # Update data category on responsible field - field( - [dataset], database_name, "onsite_personnel", "responsible" - ).data_categories = ["user.contact.email"] - - access_results = access_runner_tester( - privacy_request_with_erasure_policy, - erasure_policy, - graph, - [timescale_connection_config], - {"email": "employee-1@example.com"}, - db, - ) - - # Demonstrate hypertable can be queried - assert access_results[f"{database_name}:onsite_personnel"] == [ - {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 1, 9, 0)}, - {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 2, 9, 0)}, - {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 3, 9, 0)}, - {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 5, 9, 0)}, - ] - - # Run an erasure on the hypertable targeting the responsible field - v = erasure_runner_tester( - privacy_request_with_erasure_policy, - erasure_policy, - graph, - [timescale_connection_config], - {"email": "employee-1@example.com"}, - get_cached_data_for_erasures(privacy_request_with_erasure_policy.id), - db, - ) - - assert v == { - f"{database_name}:customer": 0, - f"{database_name}:orders": 0, - f"{database_name}:payment_card": 0, - f"{database_name}:address": 0, - f"{database_name}:onsite_personnel": 4, - }, "onsite_personnel.responsible was the only targeted data category" - - personnel_records = timescale_integration_db.execute( - text("select * from onsite_personnel") - ) - for record in personnel_records: - assert ( - record.responsible != "employee-1@example.com" - ) # These emails have all been masked diff --git a/tests/ops/integration_tests/test_timescale_task.py b/tests/ops/integration_tests/test_timescale_task.py new file mode 100644 index 0000000000..97af65ce65 --- /dev/null +++ b/tests/ops/integration_tests/test_timescale_task.py @@ -0,0 +1,294 @@ +from datetime import datetime + +import pytest +from sqlalchemy import text + +from fides.api.graph.config import Collection, ScalarField +from fides.api.graph.graph import DatasetGraph +from fides.api.models.privacy_request import ExecutionLog +from fides.api.task.graph_task import get_cached_data_for_erasures + +from ...conftest import access_runner_tester, erasure_runner_tester +from ..graph.graph_test_util import assert_rows_match, field, records_matching_fields +from ..task.traversal_data import ( + integration_db_graph, + postgres_db_graph_dataset, + str_converter, +) + + +@pytest.mark.integration_timescale +@pytest.mark.integration +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_timescale_access_request_task( + db, + policy, + timescale_connection_config, + timescale_integration_db, + privacy_request, + dsr_version, + request, +) -> None: + database_name = "my_timescale_db_1" + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + v = access_runner_tester( + privacy_request, + policy, + integration_db_graph(database_name), + [timescale_connection_config], + {"email": "customer-1@example.com"}, + db, + ) + + assert_rows_match( + v[f"{database_name}:address"], + min_size=2, + keys=["id", "street", "city", "state", "zip"], + ) + assert_rows_match( + v[f"{database_name}:orders"], + min_size=3, + keys=["id", "customer_id", "shipping_address_id", "payment_card_id"], + ) + assert_rows_match( + v[f"{database_name}:payment_card"], + min_size=2, + keys=["id", "name", "ccn", "customer_id", "billing_address_id"], + ) + assert_rows_match( + v[f"{database_name}:customer"], + min_size=1, + keys=["id", "name", "email", "address_id"], + ) + + # links + assert v[f"{database_name}:customer"][0]["email"] == "customer-1@example.com" + + logs = ( + ExecutionLog.query(db=db) + .filter(ExecutionLog.privacy_request_id == privacy_request.id) + .all() + ) + + logs = [log.__dict__ for log in logs] + + assert ( + len( + records_matching_fields( + logs, dataset_name=database_name, collection_name="customer" + ) + ) + > 0 + ) + + assert ( + len( + records_matching_fields( + logs, dataset_name=database_name, collection_name="address" + ) + ) + > 0 + ) + + assert ( + len( + records_matching_fields( + logs, dataset_name=database_name, collection_name="orders" + ) + ) + > 0 + ) + + assert ( + len( + records_matching_fields( + logs, + dataset_name=database_name, + collection_name="payment_card", + ) + ) + > 0 + ) + + +@pytest.mark.integration_timescale +@pytest.mark.integration +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_timescale_erasure_request_task( + db, + erasure_policy, + timescale_connection_config, + timescale_integration_db, + privacy_request_with_erasure_policy, + dsr_version, + request, +) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + rule = erasure_policy.rules[0] + target = rule.targets[0] + target.data_category = "user" + target.save(db) + + database_name = "my_timescale_db_1" + + dataset = postgres_db_graph_dataset(database_name, timescale_connection_config.key) + + # Set some data categories on fields that will be targeted by the policy above + field([dataset], database_name, "customer", "name").data_categories = ["user.name"] + field([dataset], database_name, "address", "street").data_categories = ["user"] + field([dataset], database_name, "payment_card", "ccn").data_categories = ["user"] + + graph = DatasetGraph(dataset) + + v = access_runner_tester( + privacy_request_with_erasure_policy, + erasure_policy, + graph, + [timescale_connection_config], + {"email": "customer-1@example.com"}, + db, + ) + + v = erasure_runner_tester( + privacy_request_with_erasure_policy, + erasure_policy, + graph, + [timescale_connection_config], + {"email": "customer-1@example.com"}, + get_cached_data_for_erasures(privacy_request_with_erasure_policy.id), + db, + ) + assert v == { + f"{database_name}:customer": 1, + f"{database_name}:orders": 0, + f"{database_name}:payment_card": 2, + f"{database_name}:address": 2, + }, "No erasure on orders table - no data categories targeted" + + # Verify masking in appropriate tables + address_cursor = timescale_integration_db.execute( + text("select * from address where id in (1, 2)") + ) + for address in address_cursor: + assert address.street is None # Masked due to matching data category + assert address.state is not None + assert address.city is not None + assert address.zip is not None + + customer_cursor = timescale_integration_db.execute( + text("select * from customer where id = 1") + ) + customer = [customer for customer in customer_cursor][0] + assert customer.name is None # Masked due to matching data category + assert customer.email == "customer-1@example.com" + assert customer.address_id is not None + + payment_card_cursor = timescale_integration_db.execute( + text("select * from payment_card where id in ('pay_aaa-aaa', 'pay_bbb-bbb')") + ) + payment_cards = [card for card in payment_card_cursor] + assert all( + [card.ccn is None for card in payment_cards] + ) # Masked due to matching data category + assert not any([card.name is None for card in payment_cards]) is None + + +@pytest.mark.integration_timescale +@pytest.mark.integration +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dsr_version", + ["use_dsr_3_0", "use_dsr_2_0"], +) +async def test_timescale_query_and_mask_hypertable( + db, + erasure_policy, + timescale_connection_config, + timescale_integration_db, + privacy_request_with_erasure_policy, + dsr_version, + request, +) -> None: + request.getfixturevalue(dsr_version) # REQUIRED to test both DSR 3.0 and 2.0 + + database_name = "my_timescale_db_1" + + dataset = postgres_db_graph_dataset(database_name, timescale_connection_config.key) + # For this test, add a new collection to our standard dataset corresponding to the + # "onsite_personnel" timescale hypertable + onsite_personnel_collection = Collection( + name="onsite_personnel", + fields=[ + ScalarField( + name="responsible", data_type_converter=str_converter, identity="email" + ), + ScalarField( + name="time", data_type_converter=str_converter, primary_key=True + ), + ], + ) + + dataset.collections.append(onsite_personnel_collection) + graph = DatasetGraph(dataset) + rule = erasure_policy.rules[0] + target = rule.targets[0] + target.data_category = "user" + target.save(db) + # Update data category on responsible field + field( + [dataset], database_name, "onsite_personnel", "responsible" + ).data_categories = ["user.contact.email"] + + access_results = access_runner_tester( + privacy_request_with_erasure_policy, + erasure_policy, + graph, + [timescale_connection_config], + {"email": "employee-1@example.com"}, + db, + ) + + # Demonstrate hypertable can be queried + assert access_results[f"{database_name}:onsite_personnel"] == [ + {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 1, 9, 0)}, + {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 2, 9, 0)}, + {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 3, 9, 0)}, + {"responsible": "employee-1@example.com", "time": datetime(2022, 1, 5, 9, 0)}, + ] + + # Run an erasure on the hypertable targeting the responsible field + v = erasure_runner_tester( + privacy_request_with_erasure_policy, + erasure_policy, + graph, + [timescale_connection_config], + {"email": "employee-1@example.com"}, + get_cached_data_for_erasures(privacy_request_with_erasure_policy.id), + db, + ) + + assert v == { + f"{database_name}:customer": 0, + f"{database_name}:orders": 0, + f"{database_name}:payment_card": 0, + f"{database_name}:address": 0, + f"{database_name}:onsite_personnel": 4, + }, "onsite_personnel.responsible was the only targeted data category" + + personnel_records = timescale_integration_db.execute( + text("select * from onsite_personnel") + ) + for record in personnel_records: + assert ( + record.responsible != "employee-1@example.com" + ) # These emails have all been masked diff --git a/tests/ops/service/connectors/test_dynamodb_query_config.py b/tests/ops/service/connectors/test_dynamodb_query_config.py new file mode 100644 index 0000000000..4591ae9385 --- /dev/null +++ b/tests/ops/service/connectors/test_dynamodb_query_config.py @@ -0,0 +1,129 @@ +from datetime import datetime, timezone + +import pytest +from boto3.dynamodb.types import TypeDeserializer +from fideslang.models import Dataset + +from fides.api.graph.config import CollectionAddress +from fides.api.graph.graph import DatasetGraph +from fides.api.graph.traversal import Traversal +from fides.api.models.datasetconfig import convert_dataset_to_graph +from fides.api.models.privacy_request import PrivacyRequest +from fides.api.service.connectors.query_configs.dynamodb_query_config import ( + DynamoDBQueryConfig, +) + +privacy_request = PrivacyRequest(id="234544") + + +class TestDynamoDBQueryConfig: + @pytest.fixture(scope="function") + def identity(self): + identity = {"email": "customer-test_uuid@example.com"} + return identity + + @pytest.fixture(scope="function") + def dataset_graph(self, integration_dynamodb_config, example_datasets): + dataset = Dataset(**example_datasets[11]) + dataset_graph = convert_dataset_to_graph( + dataset, integration_dynamodb_config.key + ) + + return DatasetGraph(*[dataset_graph]) + + @pytest.fixture(scope="function") + def traversal(self, identity, dataset_graph): + dynamo_traversal = Traversal(dataset_graph, identity) + return dynamo_traversal + + @pytest.fixture(scope="function") + def customer_node(self, traversal): + return traversal.traversal_node_dict[ + CollectionAddress("dynamodb_example_test_dataset", "customer") + ].to_mock_execution_node() + + @pytest.fixture(scope="function") + def customer_identifier_node(self, traversal): + return traversal.traversal_node_dict[ + CollectionAddress("dynamodb_example_test_dataset", "customer_identifier") + ].to_mock_execution_node() + + @pytest.fixture(scope="function") + def customer_row(self): + row = { + "customer_email": {"S": "customer-1@example.com"}, + "name": {"S": "John Customer"}, + "address_id": {"L": [{"S": "1"}, {"S": "2"}]}, + "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}}, + "id": {"S": "1"}, + } + return row + + @pytest.fixture(scope="function") + def deserialized_customer_row(self, customer_row): + deserialized_customer_row = {} + deserializer = TypeDeserializer() + for key, value in customer_row.items(): + deserialized_customer_row[key] = deserializer.deserialize(value) + return deserialized_customer_row + + @pytest.fixture(scope="function") + def customer_identifier_row(self): + row = { + "customer_id": {"S": "customer-1@example.com"}, + "email": {"S": "customer-1@example.com"}, + "name": {"S": "Customer 1"}, + "created": {"S": datetime.now(timezone.utc).isoformat()}, + } + return row + + @pytest.fixture(scope="function") + def deserialized_customer_identifier_row(self, customer_identifier_row): + deserialized_customer_identifier_row = {} + deserializer = TypeDeserializer() + for key, value in customer_identifier_row.items(): + deserialized_customer_identifier_row[key] = deserializer.deserialize(value) + return deserialized_customer_identifier_row + + def test_get_query_param_formatting_single_key( + self, + resources_dict, + customer_node, + ) -> None: + input_data = { + "fidesops_grouped_inputs": [], + "email": ["customer-test_uuid@example.com"], + } + attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}] + query_config = DynamoDBQueryConfig(customer_node, attribute_definitions) + item = query_config.generate_query( + input_data=input_data, policy=resources_dict["policy"] + ) + assert item["ExpressionAttributeValues"] == { + ":value": {"S": "customer-test_uuid@example.com"} + } + assert item["KeyConditionExpression"] == "email = :value" + + def test_put_query_param_formatting_single_key( + self, + erasure_policy, + customer_node, + deserialized_customer_row, + ) -> None: + input_data = { + "fidesops_grouped_inputs": [], + "email": ["customer-test_uuid@example.com"], + } + attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}] + query_config = DynamoDBQueryConfig(customer_node, attribute_definitions) + update_item = query_config.generate_update_stmt( + deserialized_customer_row, erasure_policy, privacy_request + ) + + assert update_item == { + "customer_email": {"S": "customer-1@example.com"}, + "name": {"NULL": True}, + "address_id": {"L": [{"S": "1"}, {"S": "2"}]}, + "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}}, + "id": {"S": "1"}, + } diff --git a/tests/ops/service/connectors/test_mongo_query_config.py b/tests/ops/service/connectors/test_mongo_query_config.py new file mode 100644 index 0000000000..c0f6079df1 --- /dev/null +++ b/tests/ops/service/connectors/test_mongo_query_config.py @@ -0,0 +1,283 @@ +import pytest +from fideslang.models import Dataset + +from fides.api.graph.config import ( + CollectionAddress, + FieldAddress, + FieldPath, + ObjectField, + ScalarField, +) +from fides.api.graph.graph import DatasetGraph, Edge +from fides.api.graph.traversal import Traversal +from fides.api.models.datasetconfig import convert_dataset_to_graph +from fides.api.models.privacy_request import PrivacyRequest +from fides.api.schemas.masking.masking_configuration import HashMaskingConfiguration +from fides.api.schemas.masking.masking_secrets import MaskingSecretCache, SecretType +from fides.api.service.connectors.query_configs.mongodb_query_config import ( + MongoQueryConfig, +) +from fides.api.service.masking.strategy.masking_strategy_hash import HashMaskingStrategy +from fides.api.util.data_category import DataCategory + +from ...task.traversal_data import combined_mongo_postgresql_graph +from ...test_helpers.cache_secrets_helper import cache_secret + +privacy_request = PrivacyRequest(id="234544") + + +class TestMongoQueryConfig: + @pytest.fixture(scope="function") + def combined_traversal(self, connection_config, integration_mongodb_config): + mongo_dataset, postgres_dataset = combined_mongo_postgresql_graph( + connection_config, integration_mongodb_config + ) + combined_dataset_graph = DatasetGraph(mongo_dataset, postgres_dataset) + combined_traversal = Traversal( + combined_dataset_graph, + {"email": "customer-1@examplecom"}, + ) + return combined_traversal + + @pytest.fixture(scope="function") + def customer_details_node(self, combined_traversal): + return combined_traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_details") + ].to_mock_execution_node() + + @pytest.fixture(scope="function") + def customer_feedback_node(self, combined_traversal): + return combined_traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_feedback") + ].to_mock_execution_node() + + def test_field_map_nested(self, customer_details_node): + config = MongoQueryConfig(customer_details_node) + + field_map = config.field_map() + assert isinstance(field_map[FieldPath("workplace_info")], ObjectField) + assert isinstance( + field_map[FieldPath("workplace_info", "employer")], ScalarField + ) + + def test_primary_key_field_paths(self, customer_details_node): + config = MongoQueryConfig(customer_details_node) + assert list(config.primary_key_field_paths.keys()) == [FieldPath("_id")] + assert isinstance(config.primary_key_field_paths[FieldPath("_id")], ScalarField) + + def test_nested_query_field_paths( + self, customer_details_node, customer_feedback_node + ): + assert customer_details_node.query_field_paths == { + FieldPath("customer_id"), + } + + assert customer_feedback_node.query_field_paths == { + FieldPath("customer_information", "email") + } + + def test_nested_typed_filtered_values(self, customer_feedback_node): + """Identity data is located on a nested object""" + input_data = { + "customer_information.email": ["test@example.com"], + "ignore": ["abcde"], + } + assert customer_feedback_node.typed_filtered_values(input_data) == { + "customer_information.email": ["test@example.com"] + } + + def test_generate_query( + self, + policy, + example_datasets, + integration_mongodb_config, + connection_config, + ): + dataset_postgres = Dataset(**example_datasets[0]) + graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) + dataset_mongo = Dataset(**example_datasets[1]) + mongo_graph = convert_dataset_to_graph( + dataset_mongo, integration_mongodb_config.key + ) + dataset_graph = DatasetGraph(*[graph, mongo_graph]) + traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) + # Edge created from Root to nested customer_information.email field + assert ( + Edge( + FieldAddress("__ROOT__", "__ROOT__", "email"), + FieldAddress( + "mongo_test", "customer_feedback", "customer_information", "email" + ), + ) + in traversal.edges + ) + + # Test query on nested field + customer_feedback = traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_feedback") + ].to_mock_execution_node() + config = MongoQueryConfig(customer_feedback) + input_data = {"customer_information.email": ["customer-1@example.com"]} + # Tuple of query, projection - Searching for documents with nested + # customer_information.email = customer-1@example.com + assert config.generate_query(input_data, policy) == ( + {"customer_information.email": "customer-1@example.com"}, + {"_id": 1, "customer_information": 1, "date": 1, "message": 1, "rating": 1}, + ) + + # Test query nested data + customer_details = traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_details") + ].to_mock_execution_node() + config = MongoQueryConfig(customer_details) + input_data = {"customer_id": [1]} + # Tuple of query, projection - Projection is specifying fields at the top-level. Nested data will + # be filtered later. + assert config.generate_query(input_data, policy) == ( + {"customer_id": 1}, + { + "_id": 1, + "birthday": 1, + "comments": 1, + "customer_id": 1, + "customer_uuid": 1, + "emergency_contacts": 1, + "children": 1, + "gender": 1, + "travel_identifiers": 1, + "workplace_info": 1, + }, + ) + + def test_generate_update_stmt_multiple_fields( + self, + erasure_policy, + example_datasets, + integration_mongodb_config, + connection_config, + ): + dataset_postgres = Dataset(**example_datasets[0]) + graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) + dataset_mongo = Dataset(**example_datasets[1]) + mongo_graph = convert_dataset_to_graph( + dataset_mongo, integration_mongodb_config.key + ) + dataset_graph = DatasetGraph(*[graph, mongo_graph]) + + traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) + customer_details = traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_details") + ].to_mock_execution_node() + config = MongoQueryConfig(customer_details) + row = { + "birthday": "1988-01-10", + "gender": "male", + "customer_id": 1, + "_id": 1, + "workplace_info": { + "position": "Chief Strategist", + "direct_reports": ["Robbie Margo", "Sully Hunter"], + }, + "emergency_contacts": [{"name": "June Customer", "phone": "444-444-4444"}], + "children": ["Christopher Customer", "Courtney Customer"], + } + + # Make target more broad + rule = erasure_policy.rules[0] + target = rule.targets[0] + target.data_category = DataCategory("user").value + + mongo_statement = config.generate_update_stmt( + row, erasure_policy, privacy_request + ) + + expected_result_0 = {"_id": 1} + expected_result_1 = { + "$set": { + "birthday": None, + "children.0": None, + "children.1": None, + "customer_id": None, + "emergency_contacts.0.name": None, + "workplace_info.direct_reports.0": None, # Both direct reports are masked. + "workplace_info.direct_reports.1": None, + "emergency_contacts.0.phone": None, + "gender": None, + "workplace_info.position": None, + } + } + + print(mongo_statement[1]) + print(expected_result_1) + assert mongo_statement[0] == expected_result_0 + assert mongo_statement[1] == expected_result_1 + + def test_generate_update_stmt_multiple_rules( + self, + erasure_policy_two_rules, + example_datasets, + integration_mongodb_config, + connection_config, + ): + dataset_postgres = Dataset(**example_datasets[0]) + graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) + dataset_mongo = Dataset(**example_datasets[1]) + mongo_graph = convert_dataset_to_graph( + dataset_mongo, integration_mongodb_config.key + ) + dataset_graph = DatasetGraph(*[graph, mongo_graph]) + + traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) + + customer_details = traversal.traversal_node_dict[ + CollectionAddress("mongo_test", "customer_details") + ].to_mock_execution_node() + + config = MongoQueryConfig(customer_details) + row = { + "birthday": "1988-01-10", + "gender": "male", + "customer_id": 1, + "_id": 1, + "workplace_info": { + "position": "Chief Strategist", + "direct_reports": ["Robbie Margo", "Sully Hunter"], + }, + "emergency_contacts": [{"name": "June Customer", "phone": "444-444-4444"}], + "children": ["Christopher Customer", "Courtney Customer"], + } + + rule = erasure_policy_two_rules.rules[0] + rule.masking_strategy = { + "strategy": "hash", + "configuration": {"algorithm": "SHA-512"}, + } + target = rule.targets[0] + target.data_category = DataCategory("user.demographic.date_of_birth").value + + rule_two = erasure_policy_two_rules.rules[1] + rule_two.masking_strategy = { + "strategy": "random_string_rewrite", + "configuration": {"length": 30}, + } + target = rule_two.targets[0] + target.data_category = DataCategory("user.demographic.gender").value + # cache secrets for hash strategy + secret = MaskingSecretCache[str]( + secret="adobo", + masking_strategy=HashMaskingStrategy.name, + secret_type=SecretType.salt, + ) + cache_secret(secret, privacy_request.id) + + mongo_statement = config.generate_update_stmt( + row, erasure_policy_two_rules, privacy_request + ) + assert mongo_statement[0] == {"_id": 1} + assert len(mongo_statement[1]["$set"]["gender"]) == 30 + assert ( + mongo_statement[1]["$set"]["birthday"] + == HashMaskingStrategy(HashMaskingConfiguration(algorithm="SHA-512")).mask( + ["1988-01-10"], request_id=privacy_request.id + )[0] + ) diff --git a/tests/ops/service/connectors/test_query_config.py b/tests/ops/service/connectors/test_query_config.py index 01d7b9dbd2..2aa0871255 100644 --- a/tests/ops/service/connectors/test_query_config.py +++ b/tests/ops/service/connectors/test_query_config.py @@ -1,43 +1,28 @@ -from datetime import datetime, timezone from typing import Any, Dict, Set from unittest import mock import pytest -from boto3.dynamodb.types import TypeDeserializer from fideslang.models import Dataset from fides.api.common_exceptions import MissingNamespaceSchemaException -from fides.api.graph.config import ( - CollectionAddress, - FieldAddress, - FieldPath, - ObjectField, - ScalarField, -) +from fides.api.graph.config import CollectionAddress, FieldPath from fides.api.graph.execution import ExecutionNode -from fides.api.graph.graph import DatasetGraph, Edge +from fides.api.graph.graph import DatasetGraph from fides.api.graph.traversal import Traversal, TraversalNode from fides.api.models.datasetconfig import convert_dataset_to_graph from fides.api.models.privacy_request import PrivacyRequest from fides.api.schemas.masking.masking_configuration import HashMaskingConfiguration from fides.api.schemas.masking.masking_secrets import MaskingSecretCache, SecretType from fides.api.schemas.namespace_meta.namespace_meta import NamespaceMeta -from fides.api.service.connectors.query_configs.dynamodb_query_config import ( - DynamoDBQueryConfig, -) -from fides.api.service.connectors.query_configs.mongodb_query_config import ( - MongoQueryConfig, -) from fides.api.service.connectors.query_configs.query_config import ( QueryConfig, SQLQueryConfig, ) -from fides.api.service.connectors.scylla_query_config import ScyllaDBQueryConfig from fides.api.service.masking.strategy.masking_strategy_hash import HashMaskingStrategy from fides.api.util.data_category import DataCategory from tests.fixtures.application_fixtures import load_dataset -from ...task.traversal_data import combined_mongo_postgresql_graph, integration_db_graph +from ...task.traversal_data import integration_db_graph from ...test_helpers.cache_secrets_helper import cache_secret, clear_cache_secrets # customers -> address, order @@ -286,9 +271,47 @@ def test_generate_update_stmt_one_field( "id": 1, } text_clause = config.generate_update_stmt(row, erasure_policy, privacy_request) - assert text_clause.text == """UPDATE customer SET name = :name WHERE id = :id""" - assert text_clause._bindparams["name"].key == "name" - assert text_clause._bindparams["name"].value is None # Null masking strategy + assert ( + text_clause.text + == """UPDATE customer SET name = :masked_name WHERE email = :email""" + ) + assert text_clause._bindparams["masked_name"].key == "masked_name" + assert ( + text_clause._bindparams["masked_name"].value is None + ) # Null masking strategy + + def test_generate_update_stmt_one_field_inbound_reference( + self, erasure_policy_address_city, example_datasets, connection_config + ): + dataset = Dataset(**example_datasets[0]) + graph = convert_dataset_to_graph(dataset, connection_config.key) + dataset_graph = DatasetGraph(*[graph]) + traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) + + address_node = traversal.traversal_node_dict[ + CollectionAddress("postgres_example_test_dataset", "address") + ].to_mock_execution_node() + + config = SQLQueryConfig(address_node) + row = { + "id": 1, + "house": "123", + "street": "Main St", + "city": "San Francisco", + "state": "CA", + "zip": "94105", + } + text_clause = config.generate_update_stmt( + row, erasure_policy_address_city, privacy_request + ) + assert ( + text_clause.text + == """UPDATE address SET city = :masked_city WHERE id = :id""" + ) + assert text_clause._bindparams["masked_city"].key == "masked_city" + assert ( + text_clause._bindparams["masked_city"].value is None + ) # Null masking strategy def test_generate_update_stmt_length_truncation( self, @@ -316,11 +339,14 @@ def test_generate_update_stmt_length_truncation( text_clause = config.generate_update_stmt( row, erasure_policy_string_rewrite_long, privacy_request ) - assert text_clause.text == """UPDATE customer SET name = :name WHERE id = :id""" - assert text_clause._bindparams["name"].key == "name" + assert ( + text_clause.text + == """UPDATE customer SET name = :masked_name WHERE email = :email""" + ) + assert text_clause._bindparams["masked_name"].key == "masked_name" # length truncation on name field assert ( - text_clause._bindparams["name"].value + text_clause._bindparams["masked_name"].value == "some rewrite value that is very long and" ) @@ -365,22 +391,23 @@ def test_generate_update_stmt_multiple_fields_same_rule( text_clause = config.generate_update_stmt(row, erasure_policy, privacy_request) assert ( text_clause.text - == "UPDATE customer SET email = :email, name = :name WHERE id = :id" + == "UPDATE customer SET email = :masked_email, name = :masked_name WHERE email = :email" ) - assert text_clause._bindparams["name"].key == "name" + assert text_clause._bindparams["masked_name"].key == "masked_name" # since length is set to 40 in dataset.yml, we expect only first 40 chars of masked val assert ( - text_clause._bindparams["name"].value + text_clause._bindparams["masked_name"].value == HashMaskingStrategy(HashMaskingConfiguration(algorithm="SHA-512")).mask( ["John Customer"], request_id=privacy_request.id )[0][0:40] ) assert ( - text_clause._bindparams["email"].value + text_clause._bindparams["masked_email"].value == HashMaskingStrategy(HashMaskingConfiguration(algorithm="SHA-512")).mask( ["customer-1@example.com"], request_id=privacy_request.id )[0] ) + assert text_clause._bindparams["email"].value == "customer-1@example.com" clear_cache_secrets(privacy_request.id) def test_generate_update_stmts_from_multiple_rules( @@ -409,423 +436,15 @@ def test_generate_update_stmts_from_multiple_rules( assert ( text_clause.text - == "UPDATE customer SET email = :email, name = :name WHERE id = :id" + == "UPDATE customer SET email = :masked_email, name = :masked_name WHERE email = :email" ) # Two different masking strategies used for name and email - assert text_clause._bindparams["name"].value is None # Null masking strategy - assert ( - text_clause._bindparams["email"].value == "*****" - ) # String rewrite masking strategy - - -class TestMongoQueryConfig: - @pytest.fixture(scope="function") - def combined_traversal(self, connection_config, integration_mongodb_config): - mongo_dataset, postgres_dataset = combined_mongo_postgresql_graph( - connection_config, integration_mongodb_config - ) - combined_dataset_graph = DatasetGraph(mongo_dataset, postgres_dataset) - combined_traversal = Traversal( - combined_dataset_graph, - {"email": "customer-1@examplecom"}, - ) - return combined_traversal - - @pytest.fixture(scope="function") - def customer_details_node(self, combined_traversal): - return combined_traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_details") - ].to_mock_execution_node() - - @pytest.fixture(scope="function") - def customer_feedback_node(self, combined_traversal): - return combined_traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_feedback") - ].to_mock_execution_node() - - def test_field_map_nested(self, customer_details_node): - config = MongoQueryConfig(customer_details_node) - - field_map = config.field_map() - assert isinstance(field_map[FieldPath("workplace_info")], ObjectField) - assert isinstance( - field_map[FieldPath("workplace_info", "employer")], ScalarField - ) - - def test_primary_key_field_paths(self, customer_details_node): - config = MongoQueryConfig(customer_details_node) - assert list(config.primary_key_field_paths.keys()) == [FieldPath("_id")] - assert isinstance(config.primary_key_field_paths[FieldPath("_id")], ScalarField) - - def test_nested_query_field_paths( - self, customer_details_node, customer_feedback_node - ): - assert customer_details_node.query_field_paths == { - FieldPath("customer_id"), - } - - assert customer_feedback_node.query_field_paths == { - FieldPath("customer_information", "email") - } - - def test_nested_typed_filtered_values(self, customer_feedback_node): - """Identity data is located on a nested object""" - input_data = { - "customer_information.email": ["test@example.com"], - "ignore": ["abcde"], - } - assert customer_feedback_node.typed_filtered_values(input_data) == { - "customer_information.email": ["test@example.com"] - } - - def test_generate_query( - self, - policy, - example_datasets, - integration_mongodb_config, - connection_config, - ): - dataset_postgres = Dataset(**example_datasets[0]) - graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) - dataset_mongo = Dataset(**example_datasets[1]) - mongo_graph = convert_dataset_to_graph( - dataset_mongo, integration_mongodb_config.key - ) - dataset_graph = DatasetGraph(*[graph, mongo_graph]) - traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) - # Edge created from Root to nested customer_information.email field - assert ( - Edge( - FieldAddress("__ROOT__", "__ROOT__", "email"), - FieldAddress( - "mongo_test", "customer_feedback", "customer_information", "email" - ), - ) - in traversal.edges - ) - - # Test query on nested field - customer_feedback = traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_feedback") - ].to_mock_execution_node() - config = MongoQueryConfig(customer_feedback) - input_data = {"customer_information.email": ["customer-1@example.com"]} - # Tuple of query, projection - Searching for documents with nested - # customer_information.email = customer-1@example.com - assert config.generate_query(input_data, policy) == ( - {"customer_information.email": "customer-1@example.com"}, - {"_id": 1, "customer_information": 1, "date": 1, "message": 1, "rating": 1}, - ) - - # Test query nested data - customer_details = traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_details") - ].to_mock_execution_node() - config = MongoQueryConfig(customer_details) - input_data = {"customer_id": [1]} - # Tuple of query, projection - Projection is specifying fields at the top-level. Nested data will - # be filtered later. - assert config.generate_query(input_data, policy) == ( - {"customer_id": 1}, - { - "_id": 1, - "birthday": 1, - "comments": 1, - "customer_id": 1, - "customer_uuid": 1, - "emergency_contacts": 1, - "children": 1, - "gender": 1, - "travel_identifiers": 1, - "workplace_info": 1, - }, - ) - - def test_generate_update_stmt_multiple_fields( - self, - erasure_policy, - example_datasets, - integration_mongodb_config, - connection_config, - ): - dataset_postgres = Dataset(**example_datasets[0]) - graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) - dataset_mongo = Dataset(**example_datasets[1]) - mongo_graph = convert_dataset_to_graph( - dataset_mongo, integration_mongodb_config.key - ) - dataset_graph = DatasetGraph(*[graph, mongo_graph]) - - traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) - customer_details = traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_details") - ].to_mock_execution_node() - config = MongoQueryConfig(customer_details) - row = { - "birthday": "1988-01-10", - "gender": "male", - "customer_id": 1, - "_id": 1, - "workplace_info": { - "position": "Chief Strategist", - "direct_reports": ["Robbie Margo", "Sully Hunter"], - }, - "emergency_contacts": [{"name": "June Customer", "phone": "444-444-4444"}], - "children": ["Christopher Customer", "Courtney Customer"], - } - - # Make target more broad - rule = erasure_policy.rules[0] - target = rule.targets[0] - target.data_category = DataCategory("user").value - - mongo_statement = config.generate_update_stmt( - row, erasure_policy, privacy_request - ) - - expected_result_0 = {"_id": 1} - expected_result_1 = { - "$set": { - "birthday": None, - "children.0": None, - "children.1": None, - "customer_id": None, - "emergency_contacts.0.name": None, - "workplace_info.direct_reports.0": None, # Both direct reports are masked. - "workplace_info.direct_reports.1": None, - "emergency_contacts.0.phone": None, - "gender": None, - "workplace_info.position": None, - } - } - - print(mongo_statement[1]) - print(expected_result_1) - assert mongo_statement[0] == expected_result_0 - assert mongo_statement[1] == expected_result_1 - - def test_generate_update_stmt_multiple_rules( - self, - erasure_policy_two_rules, - example_datasets, - integration_mongodb_config, - connection_config, - ): - dataset_postgres = Dataset(**example_datasets[0]) - graph = convert_dataset_to_graph(dataset_postgres, connection_config.key) - dataset_mongo = Dataset(**example_datasets[1]) - mongo_graph = convert_dataset_to_graph( - dataset_mongo, integration_mongodb_config.key - ) - dataset_graph = DatasetGraph(*[graph, mongo_graph]) - - traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) - - customer_details = traversal.traversal_node_dict[ - CollectionAddress("mongo_test", "customer_details") - ].to_mock_execution_node() - - config = MongoQueryConfig(customer_details) - row = { - "birthday": "1988-01-10", - "gender": "male", - "customer_id": 1, - "_id": 1, - "workplace_info": { - "position": "Chief Strategist", - "direct_reports": ["Robbie Margo", "Sully Hunter"], - }, - "emergency_contacts": [{"name": "June Customer", "phone": "444-444-4444"}], - "children": ["Christopher Customer", "Courtney Customer"], - } - - rule = erasure_policy_two_rules.rules[0] - rule.masking_strategy = { - "strategy": "hash", - "configuration": {"algorithm": "SHA-512"}, - } - target = rule.targets[0] - target.data_category = DataCategory("user.demographic.date_of_birth").value - - rule_two = erasure_policy_two_rules.rules[1] - rule_two.masking_strategy = { - "strategy": "random_string_rewrite", - "configuration": {"length": 30}, - } - target = rule_two.targets[0] - target.data_category = DataCategory("user.demographic.gender").value - # cache secrets for hash strategy - secret = MaskingSecretCache[str]( - secret="adobo", - masking_strategy=HashMaskingStrategy.name, - secret_type=SecretType.salt, - ) - cache_secret(secret, privacy_request.id) - - mongo_statement = config.generate_update_stmt( - row, erasure_policy_two_rules, privacy_request - ) - assert mongo_statement[0] == {"_id": 1} - assert len(mongo_statement[1]["$set"]["gender"]) == 30 assert ( - mongo_statement[1]["$set"]["birthday"] - == HashMaskingStrategy(HashMaskingConfiguration(algorithm="SHA-512")).mask( - ["1988-01-10"], request_id=privacy_request.id - )[0] - ) - - -class TestDynamoDBQueryConfig: - @pytest.fixture(scope="function") - def identity(self): - identity = {"email": "customer-test_uuid@example.com"} - return identity - - @pytest.fixture(scope="function") - def dataset_graph(self, integration_dynamodb_config, example_datasets): - dataset = Dataset(**example_datasets[11]) - dataset_graph = convert_dataset_to_graph( - dataset, integration_dynamodb_config.key - ) - - return DatasetGraph(*[dataset_graph]) - - @pytest.fixture(scope="function") - def traversal(self, identity, dataset_graph): - dynamo_traversal = Traversal(dataset_graph, identity) - return dynamo_traversal - - @pytest.fixture(scope="function") - def customer_node(self, traversal): - return traversal.traversal_node_dict[ - CollectionAddress("dynamodb_example_test_dataset", "customer") - ].to_mock_execution_node() - - @pytest.fixture(scope="function") - def customer_identifier_node(self, traversal): - return traversal.traversal_node_dict[ - CollectionAddress("dynamodb_example_test_dataset", "customer_identifier") - ].to_mock_execution_node() - - @pytest.fixture(scope="function") - def customer_row(self): - row = { - "customer_email": {"S": "customer-1@example.com"}, - "name": {"S": "John Customer"}, - "address_id": {"L": [{"S": "1"}, {"S": "2"}]}, - "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}}, - "id": {"S": "1"}, - } - return row - - @pytest.fixture(scope="function") - def deserialized_customer_row(self, customer_row): - deserialized_customer_row = {} - deserializer = TypeDeserializer() - for key, value in customer_row.items(): - deserialized_customer_row[key] = deserializer.deserialize(value) - return deserialized_customer_row - - @pytest.fixture(scope="function") - def customer_identifier_row(self): - row = { - "customer_id": {"S": "customer-1@example.com"}, - "email": {"S": "customer-1@example.com"}, - "name": {"S": "Customer 1"}, - "created": {"S": datetime.now(timezone.utc).isoformat()}, - } - return row - - @pytest.fixture(scope="function") - def deserialized_customer_identifier_row(self, customer_identifier_row): - deserialized_customer_identifier_row = {} - deserializer = TypeDeserializer() - for key, value in customer_identifier_row.items(): - deserialized_customer_identifier_row[key] = deserializer.deserialize(value) - return deserialized_customer_identifier_row - - def test_get_query_param_formatting_single_key( - self, - resources_dict, - customer_node, - ) -> None: - input_data = { - "fidesops_grouped_inputs": [], - "email": ["customer-test_uuid@example.com"], - } - attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}] - query_config = DynamoDBQueryConfig(customer_node, attribute_definitions) - item = query_config.generate_query( - input_data=input_data, policy=resources_dict["policy"] - ) - assert item["ExpressionAttributeValues"] == { - ":value": {"S": "customer-test_uuid@example.com"} - } - assert item["KeyConditionExpression"] == "email = :value" - - def test_put_query_param_formatting_single_key( - self, - erasure_policy, - customer_node, - deserialized_customer_row, - ) -> None: - input_data = { - "fidesops_grouped_inputs": [], - "email": ["customer-test_uuid@example.com"], - } - attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}] - query_config = DynamoDBQueryConfig(customer_node, attribute_definitions) - update_item = query_config.generate_update_stmt( - deserialized_customer_row, erasure_policy, privacy_request - ) - - assert update_item == { - "customer_email": {"S": "customer-1@example.com"}, - "name": {"NULL": True}, - "address_id": {"L": [{"S": "1"}, {"S": "2"}]}, - "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}}, - "id": {"S": "1"}, - } - - -class TestScyllaDBQueryConfig: - @pytest.fixture(scope="function") - def complete_execution_node( - self, example_datasets, integration_scylladb_config_with_keyspace - ): - dataset = Dataset(**example_datasets[15]) - graph = convert_dataset_to_graph( - dataset, integration_scylladb_config_with_keyspace.key - ) - dataset_graph = DatasetGraph(*[graph]) - identity = {"email": "customer-1@example.com"} - scylla_traversal = Traversal(dataset_graph, identity) - return scylla_traversal.traversal_node_dict[ - CollectionAddress("scylladb_example_test_dataset", "users") - ].to_mock_execution_node() - - def test_dry_run_query_no_data(self, scylladb_execution_node): - query_config = ScyllaDBQueryConfig(scylladb_execution_node) - dry_run_query = query_config.dry_run_query() - assert dry_run_query is None - - def test_dry_run_query_with_data(self, complete_execution_node): - query_config = ScyllaDBQueryConfig(complete_execution_node) - dry_run_query = query_config.dry_run_query() + text_clause._bindparams["masked_name"].value is None + ) # Null masking strategy assert ( - dry_run_query - == "SELECT age, alternative_contacts, ascii_data, big_int_data, do_not_contact, double_data, duration, email, float_data, last_contacted, logins, name, states_lived, timestamp, user_id, uuid FROM users WHERE email = ? ALLOW FILTERING;" - ) - - def test_query_to_str(self, complete_execution_node): - query_config = ScyllaDBQueryConfig(complete_execution_node) - statement = ( - "SELECT name FROM users WHERE email = %(email)s", - {"email": "test@example.com"}, - ) - query_to_str = query_config.query_to_str(statement, {}) - assert query_to_str == "SELECT name FROM users WHERE email = 'test@example.com'" - + text_clause._bindparams["masked_email"].value == "*****" + ) # String rewrite masking strategy class TestSQLLikeQueryConfig: def test_missing_namespace_meta_schema(self): diff --git a/tests/ops/service/connectors/test_scylladb_query_config.py b/tests/ops/service/connectors/test_scylladb_query_config.py new file mode 100644 index 0000000000..3cbc6f493f --- /dev/null +++ b/tests/ops/service/connectors/test_scylladb_query_config.py @@ -0,0 +1,47 @@ +import pytest +from fideslang.models import Dataset + +from fides.api.graph.config import CollectionAddress +from fides.api.graph.graph import DatasetGraph +from fides.api.graph.traversal import Traversal +from fides.api.models.datasetconfig import convert_dataset_to_graph +from fides.api.service.connectors.scylla_query_config import ScyllaDBQueryConfig + + +class TestScyllaDBQueryConfig: + @pytest.fixture(scope="function") + def complete_execution_node( + self, example_datasets, integration_scylladb_config_with_keyspace + ): + dataset = Dataset(**example_datasets[15]) + graph = convert_dataset_to_graph( + dataset, integration_scylladb_config_with_keyspace.key + ) + dataset_graph = DatasetGraph(*[graph]) + identity = {"email": "customer-1@example.com"} + scylla_traversal = Traversal(dataset_graph, identity) + return scylla_traversal.traversal_node_dict[ + CollectionAddress("scylladb_example_test_dataset", "users") + ].to_mock_execution_node() + + def test_dry_run_query_no_data(self, scylladb_execution_node): + query_config = ScyllaDBQueryConfig(scylladb_execution_node) + dry_run_query = query_config.dry_run_query() + assert dry_run_query is None + + def test_dry_run_query_with_data(self, complete_execution_node): + query_config = ScyllaDBQueryConfig(complete_execution_node) + dry_run_query = query_config.dry_run_query() + assert ( + dry_run_query + == "SELECT age, alternative_contacts, ascii_data, big_int_data, do_not_contact, double_data, duration, email, float_data, last_contacted, logins, name, states_lived, timestamp, user_id, uuid FROM users WHERE email = ? ALLOW FILTERING;" + ) + + def test_query_to_str(self, complete_execution_node): + query_config = ScyllaDBQueryConfig(complete_execution_node) + statement = ( + "SELECT name FROM users WHERE email = %(email)s", + {"email": "test@example.com"}, + ) + query_to_str = query_config.query_to_str(statement, {}) + assert query_to_str == "SELECT name FROM users WHERE email = 'test@example.com'"