Skip to content

Commit

Permalink
override list_relations_without_caching using boto3 (Tomme#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcmessias authored Aug 8, 2022
1 parent c704ef7 commit 83f7756
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from botocore.exceptions import ClientError
from itertools import chain
from threading import Lock
from typing import Dict, Iterator, Optional, Set
from typing import Dict, Iterator, List, Optional, Set
from uuid import uuid4

from dbt.adapters.base import available
from dbt.adapters.base.impl import GET_CATALOG_MACRO_NAME
from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.relation import AthenaRelation, AthenaSchemaSearchMap
Expand Down Expand Up @@ -147,3 +147,64 @@ def _get_catalog_schemas(self, manifest: Manifest) -> AthenaSchemaSearchMap:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
return info_schema_name_map

def _get_data_catalog(self, catalog_name):
conn = self.connections.get_thread_connection()
client = conn.handle
with boto3_client_lock:
athena_client = boto3.client('athena', region_name=client.region_name)

response = athena_client.get_data_catalog(Name=catalog_name)
return response['DataCatalog']

def list_relations_without_caching(
self, schema_relation: AthenaRelation,
) -> List[BaseRelation]:
catalog_id = None
if schema_relation.database.lower() != 'awsdatacatalog':
data_catalog = self._get_data_catalog(schema_relation.database.lower())
# For non-Glue Data Catalogs, use the original Athena query against INFORMATION_SCHEMA approach
if data_catalog['Type'] != 'GLUE':
return super().list_relations_without_caching(schema_relation)
else:
catalog_id = data_catalog['Parameters']['catalog-id']

conn = self.connections.get_thread_connection()
client = conn.handle
with boto3_client_lock:
glue_client = boto3.client('glue', region_name=client.region_name)
paginator = glue_client.get_paginator('get_tables')

kwargs = {
'DatabaseName': schema_relation.schema,
}
# If the catalog is `awsdatacatalog` we don't need to pass CatalogId as boto3 infers it from the account Id.
if catalog_id:
kwargs['CatalogId'] = catalog_id
page_iterator = paginator.paginate(**kwargs)

relations = []
quote_policy = {
'database': True,
'schema': True,
'identifier': True
}

for page in page_iterator:
tables = page['TableList']
for table in tables:
_type = table['TableType']
if _type == 'VIRTUAL_VIEW':
_type = self.Relation.View
else:
_type = self.Relation.Table

relations.append(self.Relation.create(
schema=table['DatabaseName'],
database=schema_relation.database,
identifier=table['Name'],
quote_policy=quote_policy,
type=_type,
))

return relations

0 comments on commit 83f7756

Please sign in to comment.