diff --git a/discoverx/dx.py b/discoverx/dx.py index 25c49c4..411e1ee 100644 --- a/discoverx/dx.py +++ b/discoverx/dx.py @@ -49,7 +49,9 @@ def __init__( def _can_read_columns_table(self) -> bool: try: - self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1") + self.spark.sql( + f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'information_schema' LIMIT 1" + ) return True except Exception as e: self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}") diff --git a/discoverx/explorer.py b/discoverx/explorer.py index 528c2e0..ab1aea8 100644 --- a/discoverx/explorer.py +++ b/discoverx/explorer.py @@ -32,6 +32,7 @@ def __init__(self, from_tables, spark: SparkSession, info_fetcher: InfoFetcher) self._sql_query_template = None self._max_concurrency = 10 self._with_tags = False + self._having_tags = [] self._data_source_formats = ["DELTA"] @staticmethod @@ -72,6 +73,19 @@ def having_columns(self, *columns) -> "DataExplorer": new_obj._having_columns.extend(columns) return new_obj + def having_tag(self, tag_name: str, tag_value: str = None) -> "DataExplorer": + """Will select tables tagged with the provided tag name and optionally value + either at table, schema, or catalog level. + + Args: + tag_name (str): Tag name + tag_value (str, optional): Tag value. Defaults to None. + """ + new_obj = copy.deepcopy(self) + new_obj._having_tags.extend(TagInfo(tag_name, tag_value)) + new_obj._with_tags = True + return new_obj + def with_data_source_formats(self, data_source_formats: list[str] = ["DELTA"]) -> "DataExplorer": """Filter tables with provided data source formats. Defaults to DELTA only. Possible Values 'DELTA', 'CSV', 'JSON', 'PARQUET', 'TEXT', 'ORC' etc @@ -157,6 +171,7 @@ def scan( schemas=self._schemas, tables=self._tables, columns=self._having_columns, + having_tags=self._having_tags, data_source_formats=self._data_source_formats, ), custom_rules=custom_rules, @@ -181,6 +196,7 @@ def map(self, f) -> list[any]: self._tables, self._having_columns, self._with_tags, + self._having_tags, self._data_source_formats, ) with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor: @@ -222,9 +238,9 @@ def _get_stack_string_columns_expression(table_info: TableInfo) -> str: @staticmethod def _build_sql(sql_template: str, table_info: TableInfo) -> str: if table_info.catalog and table_info.catalog != "None": - full_table_name = f"{table_info.catalog}.{table_info.schema}.{table_info.table}" + full_table_name = f"`{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}`" else: - full_table_name = f"{table_info.schema}.{table_info.table}" + full_table_name = f"`{table_info.schema}`.`{table_info.table}`" stack_string_columns = DataExplorerActions._get_stack_string_columns_expression(table_info) @@ -263,6 +279,7 @@ def _get_sql_commands(self, data_explorer: DataExplorer) -> list[tuple[str, Tabl data_explorer._tables, data_explorer._having_columns, data_explorer._with_tags, + data_explorer._having_tags, ) sql_commands = [ ( diff --git a/discoverx/table_info.py b/discoverx/table_info.py index ee407ec..14d2463 100644 --- a/discoverx/table_info.py +++ b/discoverx/table_info.py @@ -110,7 +110,8 @@ def get_tables_info( schemas: str, tables: str, columns: list[str] = [], - with_tags=False, + with_tags: bool = False, + having_tags: list[TagInfo] = [], data_source_formats: list[str] = ["DELTA"], ) -> list[TableInfo]: # Filter tables by matching filter @@ -121,7 +122,28 @@ def get_tables_info( if len(filtered_tables) == 0: raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}") - return self._to_info_list(filtered_tables) + info_list = self._to_info_list(filtered_tables) + return [info for info in info_list if InfoFetcher._contains_all_tags(info.tags, having_tags)] + + @staticmethod + def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool: + if not tags: + return True + if not tags_info: + return False + + all_tags = [] + + if tags_info.catalog_tags: + all_tags.extend(tags_info.catalog_tags) + + if tags_info.schema_tags: + all_tags.extend(tags_info.schema_tags) + + if tags_info.table_tags: + all_tags.extend(tags_info.table_tags) + + return all([tag in all_tags for tag in tags]) def _get_table_list_sql( self, diff --git a/examples/deep_clone_schema.py b/examples/deep_clone_schema.py index 2fa8ed8..9bacc55 100644 --- a/examples/deep_clone_schema.py +++ b/examples/deep_clone_schema.py @@ -2,14 +2,14 @@ # MAGIC %md # MAGIC # Deep Clone a Schema # MAGIC -# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. +# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table. # MAGIC # MAGIC # MAGIC Deep cloning is applied on a per-table basis, requiring a separate invocation for each table within your schema. In scenarios where automation is desirable, such as when dealing with shared schemas through Delta sharing, replicating the entire schema can be achieved using DiscoverX. This approach eliminates the need to manually inspect and modify your code each time a new table is added to the schema by the provider. # MAGIC # MAGIC This notebook serves as an example of utilizing DiscoverX to automate the replication of a schema using Delta Deep Clone. # MAGIC -# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. +# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side. # MAGIC # COMMAND ---------- @@ -19,7 +19,7 @@ # COMMAND ---------- -dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") +dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone") dbutils.widgets.text("2.destination_catalog", "_discoverx_deep_clone_replica") source_catalog = dbutils.widgets.get("1.source_catalog") @@ -52,31 +52,33 @@ # COMMAND ---------- + def clone_tables(table_info): - - spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") - try: - spark.sql( - f"""CREATE OR REPLACE TABLE + + spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}") + try: + spark.sql( + f"""CREATE OR REPLACE TABLE {destination_catalog}.{table_info.schema}.{table_info.table} CLONE {table_info.catalog}.{table_info.schema}.{table_info.table} """ - ) - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":True, - "info": None, - } - # Cloning Views is not supported - except Exception as error: - result={ - "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", - "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", - "success":False, - "info": error, - } - return result + ) + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": True, + "info": None, + } + # Cloning Views is not supported + except Exception as error: + result = { + "source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}", + "destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}", + "success": False, + "info": error, + } + return result + # COMMAND ---------- @@ -86,5 +88,4 @@ def clone_tables(table_info): # COMMAND ---------- -res = dx.from_tables(f"{source_catalog}.*.*")\ - .map(clone_tables) +res = dx.from_tables(f"{source_catalog}.*.*").map(clone_tables) diff --git a/examples/scan_with_user_specified_data_source_formats.py b/examples/scan_with_user_specified_data_source_formats.py index f94f7f7..34c84d1 100644 --- a/examples/scan_with_user_specified_data_source_formats.py +++ b/examples/scan_with_user_specified_data_source_formats.py @@ -43,7 +43,7 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### DiscoverX will scan all delta tables by default # COMMAND ---------- @@ -52,15 +52,11 @@ # COMMAND ---------- -# MAGIC %md +# MAGIC %md # MAGIC ### User can specify data source formats as follows # COMMAND ---------- -(dx.from_tables(from_table_statement) -.with_data_source_formats(["DELTA","JSON"]) -.scan()) +(dx.from_tables(from_table_statement).with_data_source_formats(["DELTA", "JSON"]).scan()) # COMMAND ---------- - - diff --git a/examples/update_owner_of_data_objects.py b/examples/update_owner_of_data_objects.py index 4c7e1a9..dc23bf6 100644 --- a/examples/update_owner_of_data_objects.py +++ b/examples/update_owner_of_data_objects.py @@ -22,9 +22,9 @@ dbutils.widgets.text("catalogs", "*", "Catalogs") dbutils.widgets.text("schemas", "*", "Schemas") dbutils.widgets.text("tables", "*", "Tables") -dbutils.widgets.text("owner","sourav.gulati@databricks.com","owner") -dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES","NO"]) -dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES","NO"]) +dbutils.widgets.text("owner", "sourav.gulati@databricks.com", "owner") +dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES", "NO"]) +dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES", "NO"]) # COMMAND ---------- @@ -54,23 +54,27 @@ # COMMAND ---------- + def update_owner(table_info): - catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" - schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" - table_owner_alter_sql = f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" - try: - if(if_update_catalog_owner == 'YES'): - print(f"Executing {catalog_owner_alter_sql}") - spark.sql(catalog_owner_alter_sql) - - if(if_update_schema_owner == 'YES'): - print(f"Executing {schema_owner_alter_sql}") - spark.sql(schema_owner_alter_sql) - - print(f"Executing {table_owner_alter_sql}") - spark.sql(table_owner_alter_sql) - except Exception as exception: - print(f" Exception occurred while updating owner: {exception}") + catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`""" + schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`""" + table_owner_alter_sql = ( + f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`""" + ) + try: + if if_update_catalog_owner == "YES": + print(f"Executing {catalog_owner_alter_sql}") + spark.sql(catalog_owner_alter_sql) + + if if_update_schema_owner == "YES": + print(f"Executing {schema_owner_alter_sql}") + spark.sql(schema_owner_alter_sql) + + print(f"Executing {table_owner_alter_sql}") + spark.sql(table_owner_alter_sql) + except Exception as exception: + print(f" Exception occurred while updating owner: {exception}") + # COMMAND ---------- diff --git a/tests/unit/explorer_test.py b/tests/unit/explorer_test.py index ac6d518..acb4771 100644 --- a/tests/unit/explorer_test.py +++ b/tests/unit/explorer_test.py @@ -26,7 +26,7 @@ def test_validate_from_components(): def test_build_sql(sample_table_info): sql_template = "SELECT * FROM {full_table_name}" - expected_sql = "SELECT * FROM catalog1.schema1.table1" + expected_sql = "SELECT * FROM `catalog1`.`schema1`.`table1`" assert DataExplorerActions._build_sql(sql_template, sample_table_info) == expected_sql diff --git a/tests/unit/table_info_test.py b/tests/unit/table_info_test.py new file mode 100644 index 0000000..0bf2328 --- /dev/null +++ b/tests/unit/table_info_test.py @@ -0,0 +1,23 @@ +import pytest +from discoverx.explorer import InfoFetcher, TagsInfo, TagInfo + + +def test_validate_from_components(): + info_table = TagsInfo([], [TagInfo("a", "v1")], [], []) + info_schema = TagsInfo([], [], [TagInfo("a", "v1")], []) + info_catalog = TagsInfo([], [], [], [TagInfo("a", "v1")]) + info_no_tags = TagsInfo([], [], [], []) + + assert InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v1")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v2")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("b", "v1")]) + assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", None)]) + # If no tags to check, then it should be true + assert InfoFetcher._contains_all_tags(info_table, []) + + assert InfoFetcher._contains_all_tags(info_schema, [TagInfo("a", "v1")]) + + assert InfoFetcher._contains_all_tags(info_catalog, [TagInfo("a", "v1")]) + + assert InfoFetcher._contains_all_tags(info_no_tags, []) + assert not InfoFetcher._contains_all_tags(info_no_tags, [TagInfo("a", "v1")])