Skip to content

Commit

Permalink
[sc-28944] Provide config to link user name to email
Browse files Browse the repository at this point in the history
  • Loading branch information
usefulalgorithm committed Sep 24, 2024
1 parent 167ccb7 commit f6070ab
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 16 deletions.
5 changes: 5 additions & 0 deletions metaphor/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ query_log:
excluded_usernames:
- <user_name1>
- <user_name2>

# (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer.
username_to_email:
<user1>: <user1 email>
<user2>: <user2 email>
```
See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config.
Expand Down
7 changes: 5 additions & 2 deletions metaphor/postgresql/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import field
from typing import Optional, Set
from typing import Dict, Optional, Set

from pydantic.dataclasses import dataclass

Expand All @@ -16,7 +16,7 @@ class QueryLogConfig:
lookback_days: int = 1

# Query log filter to exclude certain usernames
excluded_usernames: Set[str] = field(default_factory=lambda: set())
excluded_usernames: Set[str] = field(default_factory=set)

# Config to control query processing
process_query: ProcessQueryConfig = field(
Expand All @@ -25,6 +25,9 @@ class QueryLogConfig:
) # Ignore COMMAND statements by default
)

# Config to link user name to email so that Metaphor can display each query's issuer.
username_to_email: Dict[str, str] = field(default_factory=dict)


@dataclass(config=ConnectorConfig)
class BasePostgreSQLRunConfig(BaseConfig):
Expand Down
5 changes: 4 additions & 1 deletion metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,15 @@ def _process_cloud_watch_log(

if sql:
sql_hash = md5_digest(sql.encode("utf-8"))
user_id = parsed.user
email = self._query_log_config.username_to_email.get(user_id)
return QueryLog(
id=f"{DataPlatform.POSTGRESQL.name}:{sql_hash}",
query_id=sql_hash,
platform=DataPlatform.POSTGRESQL,
default_database=parsed.database,
user_id=parsed.user,
user_id=user_id,
email=email,
sql=sql,
sql_hash=sql_hash,
duration=duration,
Expand Down
5 changes: 5 additions & 0 deletions metaphor/redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ query_log:
excluded_usernames:
- <user_name1>
- <user_name2>
# (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer.
username_to_email:
<user1>: <user1 email>
<user2>: <user2 email>
```

##### Process Query Config
Expand Down
5 changes: 4 additions & 1 deletion metaphor/redshift/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def _process_record(self, access_event: AccessEvent):
)

if sql:
user_id = access_event.usename
email = self._query_log_config.username_to_email.get(user_id)
query_log = QueryLog(
id=f"{DataPlatform.REDSHIFT.name}:{access_event.query_id}",
query_id=str(access_event.query_id),
Expand All @@ -192,7 +194,8 @@ def _process_record(self, access_event: AccessEvent):
duration=float(
(access_event.end_time - access_event.start_time).total_seconds()
),
user_id=access_event.usename,
user_id=user_id,
email=email,
rows_read=float(access_event.rows),
bytes_read=float(access_event.bytes),
sources=tll.sources,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.107"
version = "0.14.108"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
9 changes: 6 additions & 3 deletions tests/postgresql/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def dummy_config(**args):
password="",
output=OutputConfig(),
**args,
query_log=PostgreSQLQueryLogConfig(excluded_usernames={"foo"}),
query_log=PostgreSQLQueryLogConfig(
excluded_usernames={"foo"},
username_to_email={"metaphor": "[email protected]"},
),
)


Expand All @@ -77,7 +80,7 @@ def dummy_config(**args):
default_database="metaphor",
default_schema=None,
duration=55.66,
email=None,
email="[email protected]",
metadata=None,
parsing=None,
platform=DataPlatform.POSTGRESQL,
Expand Down Expand Up @@ -270,7 +273,7 @@ def test_alter_rename():
default_database="metaphor",
default_schema=None,
duration=55.66,
email=None,
email="[email protected]",
metadata=None,
parsing=None,
platform=DataPlatform.POSTGRESQL,
Expand Down
10 changes: 6 additions & 4 deletions tests/redshift/query_logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"_id": "REDSHIFT:7086",
"bytesRead": 6500.0,
"duration": -338161298.308308,
"email": "[email protected]",
"platform": "REDSHIFT",
"queryId": "7086",
"rowsRead": 7155.0,
Expand All @@ -17,16 +18,17 @@
"table": "table1"
}
],
"targets": [],
"sql": "select * from schema1.table1",
"sqlHash": "fbbd533367fe766d71a48425c5c48654",
"startTime": "2010-06-26T10:07:18.014673",
"userId": "aZUytYDMFTryuKyWtbEM"
"targets": [],
"userId": "user1"
},
{
"_id": "REDSHIFT:9910",
"bytesRead": 176.0,
"duration": -217012182.227927,
"email": "[email protected]",
"platform": "REDSHIFT",
"queryId": "9910",
"rowsRead": 4617.0,
Expand All @@ -38,11 +40,11 @@
"table": "table2"
}
],
"targets": [],
"sql": "select * from schema2.table2",
"sqlHash": "9f98db863e022a1b0c8b0895c1d4dec1",
"startTime": "2003-10-01T10:15:51.904151",
"userId": "IevfvBUzEVUDrTbaIWKY"
"targets": [],
"userId": "user2"
}
]
}
Expand Down
15 changes: 11 additions & 4 deletions tests/redshift/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from metaphor.common.base_config import OutputConfig
from metaphor.common.event_util import EventUtil
from metaphor.models.metadata_change_event import DataPlatform
from metaphor.postgresql.config import QueryLogConfig
from metaphor.redshift.access_event import AccessEvent
from metaphor.redshift.config import RedshiftRunConfig
from metaphor.redshift.extractor import RedshiftExtractor
Expand Down Expand Up @@ -44,12 +45,11 @@ def test_dataset_platform():


def test_collect_query_logs(test_root_dir: str) -> None:
# Random stuff generated with polyfactory
access_events = [
AccessEvent(
user_id=7610,
query_id=7086,
usename="aZUytYDMFTryuKyWtbEM",
usename="user1",
rows=7155,
bytes=6500,
querytxt="select * from schema1.table1",
Expand All @@ -60,7 +60,7 @@ def test_collect_query_logs(test_root_dir: str) -> None:
AccessEvent(
user_id=8902,
query_id=9910,
usename="IevfvBUzEVUDrTbaIWKY",
usename="user2",
rows=4617,
bytes=176,
querytxt="select * from schema2.table2",
Expand Down Expand Up @@ -93,7 +93,14 @@ async def __anext__(self):
mock_connect_database.return_value = MagicMock()
mock_fetch_access_event.return_value = AsyncIterator()

extractor = RedshiftExtractor(dummy_config())
config = dummy_config()
config.query_log = QueryLogConfig(
username_to_email={
"user1": "[email protected]",
"user2": "[email protected]",
}
)
extractor = RedshiftExtractor(config)
extractor._included_databases = included_dbs
extractor._datasets["db1.schema1.table1"] = 1
extractor._datasets["db2.schema2.table2"] = 1
Expand Down

0 comments on commit f6070ab

Please sign in to comment.