diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index a4e5f07c9f90e..36825dc33fe7d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -66,6 +66,11 @@ logger = logging.getLogger(__name__) +# Define a type alias +UserName = str +UserEmail = str +UsersMapping = Dict[UserName, UserEmail] + class SnowflakeQueriesExtractorConfig(ConfigModel): # TODO: Support stateful ingestion for the time windows. @@ -268,8 +273,8 @@ def get_workunits_internal( shared_connection.close() audit_log_file.unlink(missing_ok=True) - def fetch_users(self) -> Dict[str, str]: - users: Dict[str, str] = dict() + def fetch_users(self) -> UsersMapping: + users: UsersMapping = dict() with self.structured_reporter.report_exc("Error fetching users from Snowflake"): logger.info("Fetching users from Snowflake") query = SnowflakeQuery.get_all_users() @@ -322,7 +327,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]: yield result def fetch_query_log( - self, users: Dict[str, str] + self, users: UsersMapping ) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]: query_log_query = _build_enriched_query_log_query( start_time=self.config.window.start_time, @@ -355,7 +360,7 @@ def fetch_query_log( yield entry def _parse_audit_log_row( - self, row: Dict[str, Any], users: Dict[str, str] + self, row: Dict[str, Any], users: UsersMapping ) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]: json_fields = { "DIRECT_OBJECTS_ACCESSED",