Skip to content

Commit

Permalink
Splitting out query configs and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
galvana committed Dec 10, 2024
1 parent b0ef57d commit 77a5770
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 451 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def generate_update_stmt(
where_clauses: Dict[str, Any] = filter_nonempty_values(
{
field_path.string_path: field.cast(row[field_path.string_path])
for field_path, field in self.incoming_field_paths.items()
for field_path, field in self.primary_key_field_paths.items()
}
)

Expand Down
17 changes: 13 additions & 4 deletions src/fides/api/service/connectors/query_configs/query_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def primary_key_field_paths(self) -> Dict[FieldPath, Field]:
}

@property
def incoming_field_paths(self) -> Dict[FieldPath, Field]:
def reference_field_paths(self) -> Dict[FieldPath, Field]:
"""Mapping of FieldPaths to Fields that have incoming identity or dataset references"""
return {
field_path: field
Expand Down Expand Up @@ -447,10 +447,19 @@ def generate_update_stmt(
) -> Optional[T]:
"""Returns an update statement in generic SQL-ish dialect."""
update_value_map: Dict[str, Any] = self.update_value_map(row, policy, request)

non_empty_primary_key_fields: Dict[str, Field] = filter_nonempty_values(
{
fpath.string_path: fld.cast(row[fpath.string_path])
for fpath, fld in self.primary_key_field_paths.items()
if fpath.string_path in row
}
)

non_empty_reference_fields: Dict[str, Field] = filter_nonempty_values(
{
fpath.string_path: fld.cast(row[fpath.string_path])
for fpath, fld in self.incoming_field_paths.items()
for fpath, fld in self.reference_field_paths.items()
if fpath.string_path in row
}
)
Expand All @@ -463,10 +472,10 @@ def generate_update_stmt(

update_clauses = self.get_update_clauses(
{k: f"masked_{k}" for k in update_value_map},
non_empty_reference_fields,
non_empty_primary_key_fields or non_empty_reference_fields,
)
where_clauses = self.format_key_map_for_update_stmt(
{k: k for k in non_empty_reference_fields}
{k: k for k in non_empty_primary_key_fields or non_empty_reference_fields}
)

valid = len(where_clauses) > 0 and len(update_clauses) > 0
Expand Down
129 changes: 129 additions & 0 deletions tests/ops/service/connectors/test_dynamodb_query_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from datetime import datetime, timezone

import pytest
from boto3.dynamodb.types import TypeDeserializer
from fideslang.models import Dataset

from fides.api.graph.config import CollectionAddress
from fides.api.graph.graph import DatasetGraph
from fides.api.graph.traversal import Traversal
from fides.api.models.datasetconfig import convert_dataset_to_graph
from fides.api.models.privacy_request import PrivacyRequest
from fides.api.service.connectors.query_configs.dynamodb_query_config import (
DynamoDBQueryConfig,
)

privacy_request = PrivacyRequest(id="234544")


class TestDynamoDBQueryConfig:
@pytest.fixture(scope="function")
def identity(self):
identity = {"email": "[email protected]"}
return identity

@pytest.fixture(scope="function")
def dataset_graph(self, integration_dynamodb_config, example_datasets):
dataset = Dataset(**example_datasets[11])
dataset_graph = convert_dataset_to_graph(
dataset, integration_dynamodb_config.key
)

return DatasetGraph(*[dataset_graph])

@pytest.fixture(scope="function")
def traversal(self, identity, dataset_graph):
dynamo_traversal = Traversal(dataset_graph, identity)
return dynamo_traversal

@pytest.fixture(scope="function")
def customer_node(self, traversal):
return traversal.traversal_node_dict[
CollectionAddress("dynamodb_example_test_dataset", "customer")
].to_mock_execution_node()

@pytest.fixture(scope="function")
def customer_identifier_node(self, traversal):
return traversal.traversal_node_dict[
CollectionAddress("dynamodb_example_test_dataset", "customer_identifier")
].to_mock_execution_node()

@pytest.fixture(scope="function")
def customer_row(self):
row = {
"customer_email": {"S": "[email protected]"},
"name": {"S": "John Customer"},
"address_id": {"L": [{"S": "1"}, {"S": "2"}]},
"personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}},
"id": {"S": "1"},
}
return row

@pytest.fixture(scope="function")
def deserialized_customer_row(self, customer_row):
deserialized_customer_row = {}
deserializer = TypeDeserializer()
for key, value in customer_row.items():
deserialized_customer_row[key] = deserializer.deserialize(value)
return deserialized_customer_row

@pytest.fixture(scope="function")
def customer_identifier_row(self):
row = {
"customer_id": {"S": "[email protected]"},
"email": {"S": "[email protected]"},
"name": {"S": "Customer 1"},
"created": {"S": datetime.now(timezone.utc).isoformat()},
}
return row

@pytest.fixture(scope="function")
def deserialized_customer_identifier_row(self, customer_identifier_row):
deserialized_customer_identifier_row = {}
deserializer = TypeDeserializer()
for key, value in customer_identifier_row.items():
deserialized_customer_identifier_row[key] = deserializer.deserialize(value)
return deserialized_customer_identifier_row

def test_get_query_param_formatting_single_key(
self,
resources_dict,
customer_node,
) -> None:
input_data = {
"fidesops_grouped_inputs": [],
"email": ["[email protected]"],
}
attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}]
query_config = DynamoDBQueryConfig(customer_node, attribute_definitions)
item = query_config.generate_query(
input_data=input_data, policy=resources_dict["policy"]
)
assert item["ExpressionAttributeValues"] == {
":value": {"S": "[email protected]"}
}
assert item["KeyConditionExpression"] == "email = :value"

def test_put_query_param_formatting_single_key(
self,
erasure_policy,
customer_node,
deserialized_customer_row,
) -> None:
input_data = {
"fidesops_grouped_inputs": [],
"email": ["[email protected]"],
}
attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}]
query_config = DynamoDBQueryConfig(customer_node, attribute_definitions)
update_item = query_config.generate_update_stmt(
deserialized_customer_row, erasure_policy, privacy_request
)

assert update_item == {
"customer_email": {"S": "[email protected]"},
"name": {"NULL": True},
"address_id": {"L": [{"S": "1"}, {"S": "2"}]},
"personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}},
"id": {"S": "1"},
}
Loading

0 comments on commit 77a5770

Please sign in to comment.