Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-28944] Provide config to link user name to email #987

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metaphor/kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class KafkaConfig(BaseConfig):
"""

bootstrap_servers: List[KafkaBootstrapServer] = dataclass_field(
default_factory=lambda: []
default_factory=list
)
"""
The Kafka bootstrap servers / brokers. Cannot be empty.
Expand Down
5 changes: 5 additions & 0 deletions metaphor/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ query_log:
excluded_usernames:
- <user_name1>
- <user_name2>

# (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer.
username_to_email:
<user1>: <user1 email>
<user2>: <user2 email>
```

See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config.
Expand Down
12 changes: 10 additions & 2 deletions metaphor/postgresql/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import field
from typing import Optional, Set
from typing import Dict, Optional, Set

from pydantic import field_validator
from pydantic.dataclasses import dataclass

from metaphor.common.aws import AwsCredentials
Expand All @@ -16,7 +17,7 @@ class QueryLogConfig:
lookback_days: int = 1

# Query log filter to exclude certain usernames
excluded_usernames: Set[str] = field(default_factory=lambda: set())
excluded_usernames: Set[str] = field(default_factory=set)

# Config to control query processing
process_query: ProcessQueryConfig = field(
Expand All @@ -25,6 +26,13 @@ class QueryLogConfig:
) # Ignore COMMAND statements by default
)

# Config to link user name to email so that Metaphor can display each query's issuer.
username_to_email: Dict[str, str] = field(default_factory=dict)

@field_validator("username_to_email")
def _normalize_emails(cls, username_to_email: Dict[str, str]):
return {k: v.lower() for k, v in username_to_email.items()}


@dataclass(config=ConnectorConfig)
class BasePostgreSQLRunConfig(BaseConfig):
Expand Down
5 changes: 4 additions & 1 deletion metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,15 @@ def _process_cloud_watch_log(

if sql:
sql_hash = md5_digest(sql.encode("utf-8"))
user_id = parsed.user
email = self._query_log_config.username_to_email.get(user_id)
usefulalgorithm marked this conversation as resolved.
Show resolved Hide resolved
return QueryLog(
id=f"{DataPlatform.POSTGRESQL.name}:{sql_hash}",
query_id=sql_hash,
platform=DataPlatform.POSTGRESQL,
default_database=parsed.database,
user_id=parsed.user,
user_id=user_id,
email=email,
sql=sql,
sql_hash=sql_hash,
duration=duration,
Expand Down
5 changes: 5 additions & 0 deletions metaphor/redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ query_log:
excluded_usernames:
- <user_name1>
- <user_name2>

# (Optional) A dict from user name to email. This is for Metaphor to recognize each query's issuer.
username_to_email:
<user1>: <user1 email>
<user2>: <user2 email>
```

##### Process Query Config
Expand Down
5 changes: 4 additions & 1 deletion metaphor/redshift/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def _process_record(self, access_event: AccessEvent):
)

if sql:
user_id = access_event.usename
email = self._query_log_config.username_to_email.get(user_id)
usefulalgorithm marked this conversation as resolved.
Show resolved Hide resolved
query_log = QueryLog(
id=f"{DataPlatform.REDSHIFT.name}:{access_event.query_id}",
query_id=str(access_event.query_id),
Expand All @@ -192,7 +194,8 @@ def _process_record(self, access_event: AccessEvent):
duration=float(
(access_event.end_time - access_event.start_time).total_seconds()
),
user_id=access_event.usename,
user_id=user_id,
email=email,
rows_read=float(access_event.rows),
bytes_read=float(access_event.bytes),
sources=tll.sources,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.108"
version = "0.14.109"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
9 changes: 6 additions & 3 deletions tests/postgresql/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def dummy_config(**args):
password="",
output=OutputConfig(),
**args,
query_log=PostgreSQLQueryLogConfig(excluded_usernames={"foo"}),
query_log=PostgreSQLQueryLogConfig(
excluded_usernames={"foo"},
username_to_email={"metaphor": "[email protected]"},
),
)


Expand All @@ -77,7 +80,7 @@ def dummy_config(**args):
default_database="metaphor",
default_schema=None,
duration=55.66,
email=None,
email="[email protected]",
metadata=None,
parsing=None,
platform=DataPlatform.POSTGRESQL,
Expand Down Expand Up @@ -270,7 +273,7 @@ def test_alter_rename():
default_database="metaphor",
default_schema=None,
duration=55.66,
email=None,
email="[email protected]",
metadata=None,
parsing=None,
platform=DataPlatform.POSTGRESQL,
Expand Down
10 changes: 6 additions & 4 deletions tests/redshift/query_logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"_id": "REDSHIFT:7086",
"bytesRead": 6500.0,
"duration": -338161298.308308,
"email": "[email protected]",
"platform": "REDSHIFT",
"queryId": "7086",
"rowsRead": 7155.0,
Expand All @@ -17,16 +18,17 @@
"table": "table1"
}
],
"targets": [],
"sql": "select * from schema1.table1",
"sqlHash": "fbbd533367fe766d71a48425c5c48654",
"startTime": "2010-06-26T10:07:18.014673",
"userId": "aZUytYDMFTryuKyWtbEM"
"targets": [],
"userId": "user1"
},
{
"_id": "REDSHIFT:9910",
"bytesRead": 176.0,
"duration": -217012182.227927,
"email": "[email protected]",
"platform": "REDSHIFT",
"queryId": "9910",
"rowsRead": 4617.0,
Expand All @@ -38,11 +40,11 @@
"table": "table2"
}
],
"targets": [],
"sql": "select * from schema2.table2",
"sqlHash": "9f98db863e022a1b0c8b0895c1d4dec1",
"startTime": "2003-10-01T10:15:51.904151",
"userId": "IevfvBUzEVUDrTbaIWKY"
"targets": [],
"userId": "user2"
}
]
}
Expand Down
15 changes: 11 additions & 4 deletions tests/redshift/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from metaphor.common.base_config import OutputConfig
from metaphor.common.event_util import EventUtil
from metaphor.models.metadata_change_event import DataPlatform
from metaphor.postgresql.config import QueryLogConfig
from metaphor.redshift.access_event import AccessEvent
from metaphor.redshift.config import RedshiftRunConfig
from metaphor.redshift.extractor import RedshiftExtractor
Expand Down Expand Up @@ -44,12 +45,11 @@ def test_dataset_platform():


def test_collect_query_logs(test_root_dir: str) -> None:
# Random stuff generated with polyfactory
access_events = [
AccessEvent(
user_id=7610,
query_id=7086,
usename="aZUytYDMFTryuKyWtbEM",
usename="user1",
rows=7155,
bytes=6500,
querytxt="select * from schema1.table1",
Expand All @@ -60,7 +60,7 @@ def test_collect_query_logs(test_root_dir: str) -> None:
AccessEvent(
user_id=8902,
query_id=9910,
usename="IevfvBUzEVUDrTbaIWKY",
usename="user2",
rows=4617,
bytes=176,
querytxt="select * from schema2.table2",
Expand Down Expand Up @@ -93,7 +93,14 @@ async def __anext__(self):
mock_connect_database.return_value = MagicMock()
mock_fetch_access_event.return_value = AsyncIterator()

extractor = RedshiftExtractor(dummy_config())
config = dummy_config()
config.query_log = QueryLogConfig(
username_to_email={
"user1": "[email protected]",
"user2": "[email protected]",
}
)
extractor = RedshiftExtractor(config)
extractor._included_databases = included_dbs
extractor._datasets["db1.schema1.table1"] = 1
extractor._datasets["db2.schema2.table2"] = 1
Expand Down
Loading