diff --git a/metaphor/kafka/config.py b/metaphor/kafka/config.py index be7d58ac..382b623e 100644 --- a/metaphor/kafka/config.py +++ b/metaphor/kafka/config.py @@ -141,7 +141,7 @@ class KafkaConfig(BaseConfig): """ bootstrap_servers: List[KafkaBootstrapServer] = dataclass_field( - default_factory=lambda: [] + default_factory=list ) """ The Kafka bootstrap servers / brokers. Cannot be empty. diff --git a/metaphor/postgresql/README.md b/metaphor/postgresql/README.md index a1b4e5df..f77ae3c7 100644 --- a/metaphor/postgresql/README.md +++ b/metaphor/postgresql/README.md @@ -61,6 +61,11 @@ query_log: excluded_usernames: - - + + # (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer. + username_to_email: + : + : ``` See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config. diff --git a/metaphor/postgresql/config.py b/metaphor/postgresql/config.py index 23c532b6..8833d487 100644 --- a/metaphor/postgresql/config.py +++ b/metaphor/postgresql/config.py @@ -1,6 +1,7 @@ from dataclasses import field -from typing import Optional, Set +from typing import Dict, Optional, Set +from pydantic import field_validator from pydantic.dataclasses import dataclass from metaphor.common.aws import AwsCredentials @@ -16,7 +17,7 @@ class QueryLogConfig: lookback_days: int = 1 # Query log filter to exclude certain usernames - excluded_usernames: Set[str] = field(default_factory=lambda: set()) + excluded_usernames: Set[str] = field(default_factory=set) # Config to control query processing process_query: ProcessQueryConfig = field( @@ -25,6 +26,13 @@ class QueryLogConfig: ) # Ignore COMMAND statements by default ) + # Config to link user name to email so that Metaphor can display each query's issuer. + username_to_email: Dict[str, str] = field(default_factory=dict) + + @field_validator("username_to_email") + def _normalize_emails(cls, username_to_email: Dict[str, str]): + return {k: v.lower() for k, v in username_to_email.items()} + @dataclass(config=ConnectorConfig) class BasePostgreSQLRunConfig(BaseConfig): diff --git a/metaphor/postgresql/extractor.py b/metaphor/postgresql/extractor.py index b2babe3d..6bd2d835 100644 --- a/metaphor/postgresql/extractor.py +++ b/metaphor/postgresql/extractor.py @@ -498,12 +498,15 @@ def _process_cloud_watch_log( if sql: sql_hash = md5_digest(sql.encode("utf-8")) + user_id = parsed.user + email = self._query_log_config.username_to_email.get(user_id) return QueryLog( id=f"{DataPlatform.POSTGRESQL.name}:{sql_hash}", query_id=sql_hash, platform=DataPlatform.POSTGRESQL, default_database=parsed.database, - user_id=parsed.user, + user_id=user_id, + email=email, sql=sql, sql_hash=sql_hash, duration=duration, diff --git a/metaphor/redshift/README.md b/metaphor/redshift/README.md index d09c70e7..e176514e 100644 --- a/metaphor/redshift/README.md +++ b/metaphor/redshift/README.md @@ -83,6 +83,11 @@ query_log: excluded_usernames: - - + + # (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer. + username_to_email: + : + : ``` ##### Process Query Config diff --git a/metaphor/redshift/extractor.py b/metaphor/redshift/extractor.py index 7749a9bc..ef7ac4c4 100644 --- a/metaphor/redshift/extractor.py +++ b/metaphor/redshift/extractor.py @@ -184,6 +184,8 @@ def _process_record(self, access_event: AccessEvent): ) if sql: + user_id = access_event.usename + email = self._query_log_config.username_to_email.get(user_id) query_log = QueryLog( id=f"{DataPlatform.REDSHIFT.name}:{access_event.query_id}", query_id=str(access_event.query_id), @@ -192,7 +194,8 @@ def _process_record(self, access_event: AccessEvent): duration=float( (access_event.end_time - access_event.start_time).total_seconds() ), - user_id=access_event.usename, + user_id=user_id, + email=email, rows_read=float(access_event.rows), bytes_read=float(access_event.bytes), sources=tll.sources, diff --git a/pyproject.toml b/pyproject.toml index 94d0ba3e..d83048fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.108" +version = "0.14.109" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/postgresql/test_extractor.py b/tests/postgresql/test_extractor.py index 8012f199..c070024e 100644 --- a/tests/postgresql/test_extractor.py +++ b/tests/postgresql/test_extractor.py @@ -64,7 +64,10 @@ def dummy_config(**args): password="", output=OutputConfig(), **args, - query_log=PostgreSQLQueryLogConfig(excluded_usernames={"foo"}), + query_log=PostgreSQLQueryLogConfig( + excluded_usernames={"foo"}, + username_to_email={"metaphor": "admin@metaphor.io"}, + ), ) @@ -77,7 +80,7 @@ def dummy_config(**args): default_database="metaphor", default_schema=None, duration=55.66, - email=None, + email="admin@metaphor.io", metadata=None, parsing=None, platform=DataPlatform.POSTGRESQL, @@ -270,7 +273,7 @@ def test_alter_rename(): default_database="metaphor", default_schema=None, duration=55.66, - email=None, + email="admin@metaphor.io", metadata=None, parsing=None, platform=DataPlatform.POSTGRESQL, diff --git a/tests/redshift/query_logs.json b/tests/redshift/query_logs.json index c52aae11..4e71cb79 100644 --- a/tests/redshift/query_logs.json +++ b/tests/redshift/query_logs.json @@ -6,6 +6,7 @@ "_id": "REDSHIFT:7086", "bytesRead": 6500.0, "duration": -338161298.308308, + "email": "user1@metaphor.io", "platform": "REDSHIFT", "queryId": "7086", "rowsRead": 7155.0, @@ -17,16 +18,17 @@ "table": "table1" } ], - "targets": [], "sql": "select * from schema1.table1", "sqlHash": "fbbd533367fe766d71a48425c5c48654", "startTime": "2010-06-26T10:07:18.014673", - "userId": "aZUytYDMFTryuKyWtbEM" + "targets": [], + "userId": "user1" }, { "_id": "REDSHIFT:9910", "bytesRead": 176.0, "duration": -217012182.227927, + "email": "user2@metaphor.io", "platform": "REDSHIFT", "queryId": "9910", "rowsRead": 4617.0, @@ -38,11 +40,11 @@ "table": "table2" } ], - "targets": [], "sql": "select * from schema2.table2", "sqlHash": "9f98db863e022a1b0c8b0895c1d4dec1", "startTime": "2003-10-01T10:15:51.904151", - "userId": "IevfvBUzEVUDrTbaIWKY" + "targets": [], + "userId": "user2" } ] } diff --git a/tests/redshift/test_extractor.py b/tests/redshift/test_extractor.py index 54992870..03b7e880 100644 --- a/tests/redshift/test_extractor.py +++ b/tests/redshift/test_extractor.py @@ -7,6 +7,7 @@ from metaphor.common.base_config import OutputConfig from metaphor.common.event_util import EventUtil from metaphor.models.metadata_change_event import DataPlatform +from metaphor.postgresql.config import QueryLogConfig from metaphor.redshift.access_event import AccessEvent from metaphor.redshift.config import RedshiftRunConfig from metaphor.redshift.extractor import RedshiftExtractor @@ -44,12 +45,11 @@ def test_dataset_platform(): def test_collect_query_logs(test_root_dir: str) -> None: - # Random stuff generated with polyfactory access_events = [ AccessEvent( user_id=7610, query_id=7086, - usename="aZUytYDMFTryuKyWtbEM", + usename="user1", rows=7155, bytes=6500, querytxt="select * from schema1.table1", @@ -60,7 +60,7 @@ def test_collect_query_logs(test_root_dir: str) -> None: AccessEvent( user_id=8902, query_id=9910, - usename="IevfvBUzEVUDrTbaIWKY", + usename="user2", rows=4617, bytes=176, querytxt="select * from schema2.table2", @@ -93,7 +93,14 @@ async def __anext__(self): mock_connect_database.return_value = MagicMock() mock_fetch_access_event.return_value = AsyncIterator() - extractor = RedshiftExtractor(dummy_config()) + config = dummy_config() + config.query_log = QueryLogConfig( + username_to_email={ + "user1": "user1@metaphor.io", + "user2": "USER2@METAPHOR.IO", + } + ) + extractor = RedshiftExtractor(config) extractor._included_databases = included_dbs extractor._datasets["db1.schema1.table1"] = 1 extractor._datasets["db2.schema2.table2"] = 1