Skip to content

Commit

Permalink
Add new get_catalog_relations macro, Supporting Changes (#8648)
Browse files Browse the repository at this point in the history
* Add new get_catalog_relations macro, allowing dbt to specify which relations in a schema the adapter should return data about

* Implement postgres adapter support for relation filtering on catalog queries

* Code review changes adding feature flag for catalog-by-relation-list support

* Use profile specified in --profile with dbt init (#7450)

* Use profile specified in --profile with dbt init

* Update .changes/unreleased/Fixes-20230424-161642.yaml

Co-authored-by: Doug Beatty <[email protected]>

* Refactor run() method into functions, replace exit() calls with exceptions

* Update help text for profile option

---------

Co-authored-by: Doug Beatty <[email protected]>

* add TestLargeEphemeralCompilation (#8376)

* Fix a couple of issues in the postgres implementation of get_catalog_relations

* Add relation count limit at which to fall back to batch retrieval

* Better feature detection mechanism for adapters.

* Code review changes to get_catalog_relations and adapter feature checking

* Add changelog entry

---------

Co-authored-by: ezraerb <[email protected]>
Co-authored-by: Doug Beatty <[email protected]>
Co-authored-by: Michelle Ark <[email protected]>
  • Loading branch information
4 people authored Sep 29, 2023
1 parent 48c97e8 commit c4f09b1
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 37 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230929-154743.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added support for retrieving partial catalog information from a schema
time: 2023-09-29T15:47:43.612438-04:00
custom:
Author: peterallenwebb
Issue: "8521"
107 changes: 89 additions & 18 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"


Expand Down Expand Up @@ -161,6 +162,14 @@ def submit(self, compiled_code: str) -> Any:
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class AdapterFeature(str, Enum):
"""Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()"""

CatalogByRelations = "CatalogByRelations"
"""Flags support for retrieving catalog information using a list of relations, rather than always retrieving all
the relations in a schema """


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.
Expand Down Expand Up @@ -415,6 +424,29 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
relations = self._get_catalog_relations(manifest)
for relation in relations:
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

def _get_catalog_relations_by_info_schema(
self, manifest: Manifest
) -> Dict[InformationSchema, List[BaseRelation]]:
relations = self._get_catalog_relations(manifest)
relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict()
for relation in relations:
info_schema = relation.information_schema_only()
if info_schema not in relations_by_info_schema:
relations_by_info_schema[info_schema] = []
relations_by_info_schema[info_schema].append(relation)

return relations_by_info_schema

def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]:
nodes: Iterator[ResultNode] = chain(
[
node
Expand All @@ -423,14 +455,9 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
],
manifest.sources.values(),
)
for node in nodes:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

relations = [self.Relation.create_from(self.config, n) for n in nodes]
return relations

def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None
Expand Down Expand Up @@ -1093,20 +1120,57 @@ def _get_one_catalog(
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"relations": relations,
}
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])

fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)
relation_count = len(self._get_catalog_relations(manifest))
if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations):
relations_by_schema = self._get_catalog_relations_by_info_schema(manifest)
for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
relations = relations_by_schema[info_schema]
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)
else:
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)

Expand Down Expand Up @@ -1437,6 +1501,13 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s
else:
return None

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
# The base adapter implementation does not implement any optional
# features, so always return false. Adapters which wish to provide
# optional features will have to override this function.
return False


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,11 @@ def add(self, relation: BaseRelation):
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
for information_schema, schemas in self.items():
for schema in schemas:
yield information_schema_name, schema
yield information_schema, schema

def flatten(self, allow_multiple_databases: bool = False):
def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap":
new = self.__class__()

# make sure we don't have multiple databases if allow_multiple_databases is set to False
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/include/global_project/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
{% macro get_catalog_relations(information_schema, relations) -%}
{{ return(adapter.dispatch('get_catalog_relations', 'dbt')(information_schema, relations)) }}
{%- endmacro %}

{% macro default__get_catalog_relations(information_schema, relations) -%}
{% set typename = adapter.type() %}
{% set msg -%}
get_catalog_relations not implemented for {{ typename }}
{%- endset %}

{{ exceptions.raise_compiler_error(msg) }}
{%- endmacro %}

{% macro get_catalog(information_schema, schemas) -%}
{{ return(adapter.dispatch('get_catalog', 'dbt')(information_schema, schemas)) }}
{%- endmacro %}
Expand Down
14 changes: 11 additions & 3 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Optional, Set, List, Any

from dbt.adapters.base.meta import available
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.impl import AdapterConfig, AdapterFeature, ConstraintSupport
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres.column import PostgresColumn
Expand Down Expand Up @@ -73,6 +73,10 @@ class PostgresAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

CATALOG_BY_RELATION_SUPPORT = True

SUPPORTED_FEATURES: Set[AdapterFeature] = frozenset([AdapterFeature.CatalogByRelations])

@classmethod
def date_function(cls):
return "now()"
Expand Down Expand Up @@ -113,9 +117,9 @@ def _link_cached_database_relations(self, schemas: Set[str]):

def _get_catalog_schemas(self, manifest):
# postgres only allow one database (the main one)
schemas = super()._get_catalog_schemas(manifest)
schema_search_map = super()._get_catalog_schemas(manifest)
try:
return schemas.flatten()
return schema_search_map.flatten()
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)

Expand Down Expand Up @@ -143,3 +147,7 @@ def valid_incremental_strategies(self):

def debug_query(self):
self.execute("select 1 as id")

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
return feature in cls.SUPPORTED_FEATURES
27 changes: 20 additions & 7 deletions plugins/postgres/dbt/include/postgres/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

{% macro postgres__get_catalog(information_schema, schemas) -%}

{% macro postgres__get_catalog_relations(information_schema, relations) -%}
{%- call statement('catalog', fetch_result=True) -%}

{#
If the user has multiple databases set and the first one is wrong, this will fail.
But we won't fail in the case where there are multiple quoting-difference-only dbs, which is better.
Expand Down Expand Up @@ -29,12 +29,17 @@
join pg_catalog.pg_attribute col on col.attrelid = tbl.oid
left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0)
left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum)
where (
{%- for schema in schemas -%}
upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- for relation in relations -%}
{%- if relation.identifier -%}
(upper(sch.nspname) = upper('{{ relation.schema }}') and
upper(tbl.relname) = upper('{{ relation.identifier }}'))
{%- else-%}
upper(sch.nspname) = upper('{{ relation.schema }}')
{%- endif -%}
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session
and tbl.relpersistence in ('p', 'u') -- [p]ermanent table or [u]nlogged table. Exclude [t]emporary tables
and tbl.relkind in ('r', 'v', 'f', 'p') -- o[r]dinary table, [v]iew, [f]oreign table, [p]artitioned table. Other values are [i]ndex, [S]equence, [c]omposite type, [t]OAST table, [m]aterialized view
Expand All @@ -49,5 +54,13 @@
{%- endcall -%}
{{ return(load_result('catalog').table) }}
{%- endmacro %}
{% macro postgres__get_catalog(information_schema, schemas) -%}
{%- set relations = [] -%}
{%- for schema in schemas -%}
{%- set dummy = relations.append({'schema': schema}) -%}
{%- endfor -%}
{{ return(postgres__get_catalog_relations(information_schema, relations)) }}
{%- endmacro %}
2 changes: 1 addition & 1 deletion tests/functional/artifacts/test_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

fail_macros__failure_sql = """
{% macro get_catalog(information_schema, schemas) %}
{% macro get_catalog_relations(information_schema, relations) %}
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
{% endmacro %}
Expand Down
14 changes: 9 additions & 5 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ def test_set_zero_keepalive(self, psycopg2):
)

@mock.patch.object(PostgresAdapter, "execute_macro")
@mock.patch.object(PostgresAdapter, "_get_catalog_schemas")
def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
@mock.patch.object(PostgresAdapter, "_get_catalog_relations_by_info_schema")
def test_get_catalog_various_schemas(self, mock_get_relations, mock_execute):
column_names = ["table_database", "table_schema", "table_name"]
rows = [
("dbt", "foo", "bar"),
Expand All @@ -334,9 +334,13 @@ def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
]
mock_execute.return_value = agate.Table(rows=rows, column_names=column_names)

mock_get_schemas.return_value.items.return_value = [
(mock.MagicMock(database="dbt"), {"foo", "FOO", "quux"})
]
mock_get_relations.return_value = {
mock.MagicMock(database="dbt"): [
mock.MagicMock(schema="foo"),
mock.MagicMock(schema="FOO"),
mock.MagicMock(schema="quux"),
]
}

mock_manifest = mock.MagicMock()
mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")}
Expand Down

0 comments on commit c4f09b1

Please sign in to comment.