From 7d2d9ffa2236fc2717726d846776b1f63f33b3c9 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 8 Nov 2023 21:22:29 +0100 Subject: [PATCH 1/9] Added more filtering on initial checks --- discoverx/dx.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/discoverx/dx.py b/discoverx/dx.py index 01d02cc..992de21 100644 --- a/discoverx/dx.py +++ b/discoverx/dx.py @@ -49,7 +49,9 @@ def __init__( def _can_read_columns_table(self) -> bool: try: - self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1") + self.spark.sql( + f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'columns' LIMIT 1" + ) return True except Exception as e: self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}") From 4960e3f374b73d7889358342b2cf3ad5c6f07803 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 8 Nov 2023 21:27:58 +0100 Subject: [PATCH 2/9] Fixed full table name for dash in table name --- discoverx/explorer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 46f5a14..3845fde 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -203,9 +203,9 @@ def _get_stack_string_columns_expression(table_info: TableInfo) -> str: @staticmethod def _build_sql(sql_template: str, table_info: TableInfo) -> str: if table_info.catalog and table_info.catalog != "None": - full_table_name = f"{table_info.catalog}.{table_info.schema}.{table_info.table}" + full_table_name = f"`{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}`" else: - full_table_name = f"{table_info.schema}.{table_info.table}" + full_table_name = f"`{table_info.schema}`.`{table_info.table}`" stack_string_columns = DataExplorerActions._get_stack_string_columns_expression(table_info) From c64e53704e76e4325f74585d734fc1676d45bcb8 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 8 Nov 2023 22:26:16 +0100 Subject: [PATCH 3/9] Added having_tag --- discoverx/explorer.py | 20 +++++++++++++++++++- discoverx/table_info.py | 24 +++++++++++++++++++++++- tests/unit/table_info_test.py | 23 +++++++++++++++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 tests/unit/table_info_test.py diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 3845fde..8e79074 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -32,6 +32,7 @@ def __init__(self, from_tables, spark: SparkSession, info_fetcher: InfoFetcher) self._sql_query_template = None self._max_concurrency = 10 self._with_tags = False + self._having_tags = [] @staticmethod def validate_from_components(from_tables: str): @@ -70,6 +71,19 @@ def having_columns(self, *columns) -> "DataExplorer": new_obj._having_columns.extend(columns) return new_obj + def having_tag(self, tag_name: str, tag_value: str = None) -> "DataExplorer": + """Will select tables tagged with the provided tag name and optionally value + either at table, schema, or catalog level. + + Args: + tag_name (str): Tag name + tag_value (str, optional): Tag value. Defaults to None. + """ + new_obj = copy.deepcopy(self) + new_obj._having_tags.extend(TagInfo(tag_name, tag_value)) + new_obj._with_tags = True + return new_obj + def with_concurrency(self, max_concurrency) -> "DataExplorer": """Sets the maximum number of concurrent queries to run""" new_obj = copy.deepcopy(self) @@ -140,7 +154,9 @@ def scan( self._catalogs, self._schemas, self._tables, - self._info_fetcher.get_tables_info(self._catalogs, self._schemas, self._tables, self._having_columns), + self._info_fetcher.get_tables_info( + self._catalogs, self._schemas, self._tables, self._having_columns, self._having_tags + ), custom_rules=custom_rules, locale=locale, ) @@ -163,6 +179,7 @@ def map(self, f) -> list[any]: self._tables, self._having_columns, self._with_tags, + self._having_tags, ) with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor: # Submit tasks to the thread pool @@ -244,6 +261,7 @@ def _get_sql_commands(self, data_explorer: DataExplorer) -> list[tuple[str, Tabl data_explorer._tables, data_explorer._having_columns, data_explorer._with_tags, + data_explorer._having_tags, ) sql_commands = [ ( diff --git a/discoverx/table_info.py b/discoverx/table_info.py index 86e7f8f..837d982 100644 --- a/discoverx/table_info.py +++ b/discoverx/table_info.py @@ -111,6 +111,7 @@ def get_tables_info( tables: str, columns: list[str] = [], with_tags=False, + having_tags=[], ) -> list[TableInfo]: # Filter tables by matching filter table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns, with_tags) @@ -120,7 +121,28 @@ def get_tables_info( if len(filtered_tables) == 0: raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}") - return self._to_info_list(filtered_tables) + info_list = self._to_info_list(filtered_tables) + return [info for info in info_list if InfoFetcher._contains_all_tags(info.tags, having_tags)] + + @staticmethod + def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool: + if not tags_info: + return False + if not tags: + return True + + all_tags = [] + + if tags_info.catalog_tags: + all_tags.extend(tags_info.catalog_tags) + + if tags_info.schema_tags: + all_tags.extend(tags_info.schema_tags) + + if tags_info.table_tags: + all_tags.extend(tags_info.table_tags) + + return all([tag in all_tags for tag in tags]) def _get_table_list_sql( self, diff --git a/tests/unit/table_info_test.py b/tests/unit/table_info_test.py new file mode 100644 index 0000000..0bf2328 --- /dev/null +++ b/tests/unit/table_info_test.py @@ -0,0 +1,23 @@ +import pytest +from discoverx.explorer import InfoFetcher, TagsInfo, TagInfo + + +def test_validate_from_components(): + info_table = TagsInfo([], [TagInfo("a", "v1")], [], []) + info_schema = TagsInfo([], [], [TagInfo("a", "v1")], []) + info_catalog = TagsInfo([], [], [], [TagInfo("a", "v1")]) + info_no_tags = TagsInfo([], [], [], []) + + assert InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v1")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v2")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("b", "v1")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", None)]) + # If no tags to check, then it should be true + assert InfoFetcher._contains_all_tags(info_table, []) + + assert InfoFetcher._contains_all_tags(info_schema, [TagInfo("a", "v1")]) + + assert InfoFetcher._contains_all_tags(info_catalog, [TagInfo("a", "v1")]) + + assert InfoFetcher._contains_all_tags(info_no_tags, []) + assert not InfoFetcher._contains_all_tags(info_no_tags, [TagInfo("a", "v1")]) From 62d25bed4bd1859bffd6c8a06d2086d6d036e559 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 8 Nov 2023 22:43:47 +0100 Subject: [PATCH 4/9] Fixed tags check ordering --- discoverx/table_info.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/discoverx/table_info.py b/discoverx/table_info.py index 837d982..bdb1122 100644 --- a/discoverx/table_info.py +++ b/discoverx/table_info.py @@ -110,8 +110,8 @@ def get_tables_info( schemas: str, tables: str, columns: list[str] = [], - with_tags=False, - having_tags=[], + with_tags: bool = False, + having_tags: list[TagInfo] = [], ) -> list[TableInfo]: # Filter tables by matching filter table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns, with_tags) @@ -126,10 +126,10 @@ def get_tables_info( @staticmethod def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool: - if not tags_info: - return False if not tags: return True + if not tags_info: + return False all_tags = [] From 2d891beddf5c5d0e1ff4c3623a9249914d09f806 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 8 Nov 2023 22:48:51 +0100 Subject: [PATCH 5/9] Fixed SQL check unit test --- tests/unit/explorer_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index 8475599..e8dd53e 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -26,7 +26,7 @@ def test_validate_from_components(): def test_build_sql(sample_table_info): sql_template = "SELECT * FROM {full_table_name}" - expected_sql = "SELECT * FROM catalog1.schema1.table1" + expected_sql = "SELECT * FROM `catalog1`.`schema1`.`table1`" assert DataExplorerActions._build_sql(sql_template, sample_table_info) == expected_sql From 823ea4bd03403fbbaef222049f009f85be29161d Mon Sep 17 00:00:00 2001 From: edurdevic Date: Mon, 8 Jan 2024 21:10:33 +0000 Subject: [PATCH 6/9] Updated schema name --- discoverx/dx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discoverx/dx.py b/discoverx/dx.py index 3369d90..411e1ee 100644 --- a/discoverx/dx.py +++ b/discoverx/dx.py @@ -50,7 +50,7 @@ def __init__( def _can_read_columns_table(self) -> bool: try: self.spark.sql( - f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'columns' LIMIT 1" + f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'information_schema' LIMIT 1" ) return True except Exception as e: From 7ce5edee72b4c349cffcef3e7e33b1bb2f4f7205 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 9 Jan 2024 01:14:50 +0400 Subject: [PATCH 7/9] Black reformat --- discoverx/explorer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 37f8331..e13cbb5 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -94,7 +94,7 @@ def with_data_source_formats(self, data_source_formats: list[str] = ["DELTA"]) - """ new_obj = copy.deepcopy(self) new_obj._data_source_formats = data_source_formats - + def with_concurrency(self, max_concurrency) -> "DataExplorer": """Sets the maximum number of concurrent queries to run""" new_obj = copy.deepcopy(self) From 8ff270d7757332960413fcdfcd36dcf02c34d392 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 9 Jan 2024 08:52:55 +0400 Subject: [PATCH 8/9] Fixed merge issue --- discoverx/explorer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/discoverx/explorer.py b/discoverx/explorer.py index e13cbb5..ab1aea8 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -94,6 +94,7 @@ def with_data_source_formats(self, data_source_formats: list[str] = ["DELTA"]) - """ new_obj = copy.deepcopy(self) new_obj._data_source_formats = data_source_formats + return new_obj def with_concurrency(self, max_concurrency) -> "DataExplorer": """Sets the maximum number of concurrent queries to run""" From 511f7c998f4addb03132be9e13467a9bf49dd2bd Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 9 Jan 2024 08:53:03 +0400 Subject: [PATCH 9/9] Black format --- examples/deep_clone_schema.py | 53 ++++++++++--------- ...with_user_specified_data_source_formats.py | 10 ++-- examples/update_owner_of_data_objects.py | 42 ++++++++------- 3 files changed, 53 insertions(+), 52 deletions(-) diff --git a/examples/deep_clone_schema.py b/examples/deep_clone_schema.py index 2fa8ed8..9bacc55 100644 --- a/examples/deep_clone_schema.py +++ b/examples/deep_clone_schema.py @@ -2,14 +2,14 @@ # MAGIC %md # MAGIC # Deep Clone a Schema # MAGIC -# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. +# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. # MAGIC # MAGIC # MAGIC Deep cloning is applied on a per-table basis, requiring a separate invocation for each table within your schema. In scenarios where automation is desirable, such as when dealing with shared schemas through Delta sharing, replicating the entire schema can be achieved using DiscoverX. This approach eliminates the need to manually inspect and modify your code each time a new table is added to the schema by the provider. # MAGIC # MAGIC This notebook serves as an example of utilizing DiscoverX to automate the replication of a schema using Delta Deep Clone. # MAGIC -# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. +# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. # MAGIC # COMMAND ---------- @@ -19,7 +19,7 @@ # COMMAND ---------- -dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") +dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") dbutils.widgets.text("2.destination_catalog", "_discoverx_deep_clone_replica") source_catalog = dbutils.widgets.get("1.source_catalog") @@ -52,31 +52,33 @@ # COMMAND ---------- + def clone_tables(table_info): - - spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") - try: - spark.sql( - f"""CREATE OR REPLACE TABLE + + spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") + try: + spark.sql( + f"""CREATE OR REPLACE TABLE {destination_catalog}.{table_info.schema}.{table_info.table} CLONE {table_info.catalog}.{table_info.schema}.{table_info.table} """ - ) - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":True, - "info": None, - } - # Cloning Views is not supported - except Exception as error: - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":False, - "info": error, - } - return result + ) + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": True, + "info": None, + } + # Cloning Views is not supported + except Exception as error: + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": False, + "info": error, + } + return result + # COMMAND ---------- @@ -86,5 +88,4 @@ def clone_tables(table_info): # COMMAND ---------- -res = dx.from_tables(f"{source_catalog}.*.*")\ - .map(clone_tables) +res = dx.from_tables(f"{source_catalog}.*.*").map(clone_tables) diff --git a/examples/scan_with_user_specified_data_source_formats.py b/examples/scan_with_user_specified_data_source_formats.py index f94f7f7..34c84d1 100644 --- a/examples/scan_with_user_specified_data_source_formats.py +++ b/examples/scan_with_user_specified_data_source_formats.py @@ -43,7 +43,7 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### DiscoverX will scan all delta tables by default # COMMAND ---------- @@ -52,15 +52,11 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### User can specify data source formats as follows # COMMAND ---------- -(dx.from_tables(from_table_statement) -.with_data_source_formats(["DELTA","JSON"]) -.scan()) +(dx.from_tables(from_table_statement).with_data_source_formats(["DELTA", "JSON"]).scan()) # COMMAND ---------- - - diff --git a/examples/update_owner_of_data_objects.py b/examples/update_owner_of_data_objects.py index 4c7e1a9..dc23bf6 100644 --- a/examples/update_owner_of_data_objects.py +++ b/examples/update_owner_of_data_objects.py @@ -22,9 +22,9 @@ dbutils.widgets.text("catalogs", "*", "Catalogs") dbutils.widgets.text("schemas", "*", "Schemas") dbutils.widgets.text("tables", "*", "Tables") -dbutils.widgets.text("owner","sourav.gulati@databricks.com","owner") -dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES","NO"]) -dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES","NO"]) +dbutils.widgets.text("owner", "sourav.gulati@databricks.com", "owner") +dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES", "NO"]) +dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES", "NO"]) # COMMAND ---------- @@ -54,23 +54,27 @@ # COMMAND ---------- + def update_owner(table_info): - catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" - schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" - table_owner_alter_sql = f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" - try: - if(if_update_catalog_owner == 'YES'): - print(f"Executing {catalog_owner_alter_sql}") - spark.sql(catalog_owner_alter_sql) - - if(if_update_schema_owner == 'YES'): - print(f"Executing {schema_owner_alter_sql}") - spark.sql(schema_owner_alter_sql) - - print(f"Executing {table_owner_alter_sql}") - spark.sql(table_owner_alter_sql) - except Exception as exception: - print(f" Exception occurred while updating owner: {exception}") + catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" + schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" + table_owner_alter_sql = ( + f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" + ) + try: + if if_update_catalog_owner == "YES": + print(f"Executing {catalog_owner_alter_sql}") + spark.sql(catalog_owner_alter_sql) + + if if_update_schema_owner == "YES": + print(f"Executing {schema_owner_alter_sql}") + spark.sql(schema_owner_alter_sql) + + print(f"Executing {table_owner_alter_sql}") + spark.sql(table_owner_alter_sql) + except Exception as exception: + print(f" Exception occurred while updating owner: {exception}") + # COMMAND ----------