Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tag filtering and speedup #88

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion discoverx/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def __init__(

def _can_read_columns_table(self) -> bool:
try:
self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1")
self.spark.sql(
f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'information_schema' LIMIT 1"
)
return True
except Exception as e:
self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}")
Expand Down
21 changes: 19 additions & 2 deletions discoverx/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
self._sql_query_template = None
self._max_concurrency = 10
self._with_tags = False
self._having_tags = []
self._data_source_formats = ["DELTA"]

@staticmethod
Expand Down Expand Up @@ -72,6 +73,19 @@
new_obj._having_columns.extend(columns)
return new_obj

def having_tag(self, tag_name: str, tag_value: str = None) -> "DataExplorer":
"""Will select tables tagged with the provided tag name and optionally value
either at table, schema, or catalog level.

Args:
tag_name (str): Tag name
tag_value (str, optional): Tag value. Defaults to None.
"""
new_obj = copy.deepcopy(self)
new_obj._having_tags.extend(TagInfo(tag_name, tag_value))
new_obj._with_tags = True
return new_obj

Check warning on line 87 in discoverx/explorer.py

View check run for this annotation

Codecov / codecov/patch

discoverx/explorer.py#L84-L87

Added lines #L84 - L87 were not covered by tests

def with_data_source_formats(self, data_source_formats: list[str] = ["DELTA"]) -> "DataExplorer":
"""Filter tables with provided data source formats. Defaults to DELTA only. Possible Values 'DELTA', 'CSV', 'JSON', 'PARQUET', 'TEXT', 'ORC' etc

Expand Down Expand Up @@ -157,6 +171,7 @@
schemas=self._schemas,
tables=self._tables,
columns=self._having_columns,
having_tags=self._having_tags,
data_source_formats=self._data_source_formats,
),
custom_rules=custom_rules,
Expand All @@ -181,6 +196,7 @@
self._tables,
self._having_columns,
self._with_tags,
self._having_tags,
self._data_source_formats,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor:
Expand Down Expand Up @@ -222,9 +238,9 @@
@staticmethod
def _build_sql(sql_template: str, table_info: TableInfo) -> str:
if table_info.catalog and table_info.catalog != "None":
full_table_name = f"{table_info.catalog}.{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}`"
else:
full_table_name = f"{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.schema}`.`{table_info.table}`"

stack_string_columns = DataExplorerActions._get_stack_string_columns_expression(table_info)

Expand Down Expand Up @@ -263,6 +279,7 @@
data_explorer._tables,
data_explorer._having_columns,
data_explorer._with_tags,
data_explorer._having_tags,
)
sql_commands = [
(
Expand Down
26 changes: 24 additions & 2 deletions discoverx/table_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@
schemas: str,
tables: str,
columns: list[str] = [],
with_tags=False,
with_tags: bool = False,
having_tags: list[TagInfo] = [],
data_source_formats: list[str] = ["DELTA"],
) -> list[TableInfo]:
# Filter tables by matching filter
Expand All @@ -121,7 +122,28 @@
if len(filtered_tables) == 0:
raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}")

return self._to_info_list(filtered_tables)
info_list = self._to_info_list(filtered_tables)
return [info for info in info_list if InfoFetcher._contains_all_tags(info.tags, having_tags)]

@staticmethod
def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool:
if not tags:
return True
if not tags_info:
return False

Check warning on line 133 in discoverx/table_info.py

View check run for this annotation

Codecov / codecov/patch

discoverx/table_info.py#L133

Added line #L133 was not covered by tests

all_tags = []

if tags_info.catalog_tags:
all_tags.extend(tags_info.catalog_tags)

if tags_info.schema_tags:
all_tags.extend(tags_info.schema_tags)

if tags_info.table_tags:
all_tags.extend(tags_info.table_tags)

return all([tag in all_tags for tag in tags])

def _get_table_list_sql(
self,
Expand Down
53 changes: 27 additions & 26 deletions examples/deep_clone_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
# MAGIC %md
# MAGIC # Deep Clone a Schema
# MAGIC
# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table.
# MAGIC Databricks' Deep Clone functionality enables the effortless creation of a data replica with minimal coding and maintenance overhead. Using the `CLONE` command, you can efficiently generate a duplicate of an existing Delta Lake table on Databricks at a designated version. The cloning process is incremental, ensuring that only new changes since the last clone are applied to the table.
# MAGIC
# MAGIC
# MAGIC Deep cloning is applied on a per-table basis, requiring a separate invocation for each table within your schema. In scenarios where automation is desirable, such as when dealing with shared schemas through Delta sharing, replicating the entire schema can be achieved using DiscoverX. This approach eliminates the need to manually inspect and modify your code each time a new table is added to the schema by the provider.
# MAGIC
# MAGIC This notebook serves as an example of utilizing DiscoverX to automate the replication of a schema using Delta Deep Clone.
# MAGIC
# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side.
# MAGIC Our recommendation is to schedule this notebook as a job at the recipient side.
# MAGIC

# COMMAND ----------
Expand All @@ -19,7 +19,7 @@

# COMMAND ----------

dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone")
dbutils.widgets.text("1.source_catalog", "_discoverx_deep_clone")
dbutils.widgets.text("2.destination_catalog", "_discoverx_deep_clone_replica")

source_catalog = dbutils.widgets.get("1.source_catalog")
Expand Down Expand Up @@ -52,31 +52,33 @@

# COMMAND ----------


def clone_tables(table_info):
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}")
try:
spark.sql(
f"""CREATE OR REPLACE TABLE

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {destination_catalog}.{table_info.schema}")
try:
spark.sql(
f"""CREATE OR REPLACE TABLE
{destination_catalog}.{table_info.schema}.{table_info.table}
CLONE {table_info.catalog}.{table_info.schema}.{table_info.table}
"""
)
result={
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success":True,
"info": None,
}
# Cloning Views is not supported
except Exception as error:
result={
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success":False,
"info": error,
}
return result
)
result = {
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success": True,
"info": None,
}
# Cloning Views is not supported
except Exception as error:
result = {
"source": f"{table_info.catalog}.{table_info.schema}.{table_info.table}",
"destination": f"{destination_catalog}.{table_info.schema}.{table_info.table}",
"success": False,
"info": error,
}
return result


# COMMAND ----------

Expand All @@ -86,5 +88,4 @@ def clone_tables(table_info):

# COMMAND ----------

res = dx.from_tables(f"{source_catalog}.*.*")\
.map(clone_tables)
res = dx.from_tables(f"{source_catalog}.*.*").map(clone_tables)
10 changes: 3 additions & 7 deletions examples/scan_with_user_specified_data_source_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

# COMMAND ----------

# MAGIC %md
# MAGIC %md
# MAGIC ### DiscoverX will scan all delta tables by default

# COMMAND ----------
Expand All @@ -52,15 +52,11 @@

# COMMAND ----------

# MAGIC %md
# MAGIC %md
# MAGIC ### User can specify data source formats as follows

# COMMAND ----------

(dx.from_tables(from_table_statement)
.with_data_source_formats(["DELTA","JSON"])
.scan())
(dx.from_tables(from_table_statement).with_data_source_formats(["DELTA", "JSON"]).scan())

# COMMAND ----------


42 changes: 23 additions & 19 deletions examples/update_owner_of_data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
dbutils.widgets.text("catalogs", "*", "Catalogs")
dbutils.widgets.text("schemas", "*", "Schemas")
dbutils.widgets.text("tables", "*", "Tables")
dbutils.widgets.text("owner","[email protected]","owner")
dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES","NO"])
dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES","NO"])
dbutils.widgets.text("owner", "[email protected]", "owner")
dbutils.widgets.dropdown("if_update_catalog_owner", "YES", ["YES", "NO"])
dbutils.widgets.dropdown("if_update_schema_owner", "YES", ["YES", "NO"])

# COMMAND ----------

Expand Down Expand Up @@ -54,23 +54,27 @@

# COMMAND ----------


def update_owner(table_info):
catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`"""
schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`"""
table_owner_alter_sql = f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`"""
try:
if(if_update_catalog_owner == 'YES'):
print(f"Executing {catalog_owner_alter_sql}")
spark.sql(catalog_owner_alter_sql)

if(if_update_schema_owner == 'YES'):
print(f"Executing {schema_owner_alter_sql}")
spark.sql(schema_owner_alter_sql)

print(f"Executing {table_owner_alter_sql}")
spark.sql(table_owner_alter_sql)
except Exception as exception:
print(f" Exception occurred while updating owner: {exception}")
catalog_owner_alter_sql = f""" ALTER CATALOG `{table_info.catalog}` SET OWNER TO `{owner}`"""
schema_owner_alter_sql = f""" ALTER SCHEMA `{table_info.catalog}`.`{table_info.schema}` SET OWNER TO `{owner}`"""
table_owner_alter_sql = (
f""" ALTER TABLE `{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}` SET OWNER TO `{owner}`"""
)
try:
if if_update_catalog_owner == "YES":
print(f"Executing {catalog_owner_alter_sql}")
spark.sql(catalog_owner_alter_sql)

if if_update_schema_owner == "YES":
print(f"Executing {schema_owner_alter_sql}")
spark.sql(schema_owner_alter_sql)

print(f"Executing {table_owner_alter_sql}")
spark.sql(table_owner_alter_sql)
except Exception as exception:
print(f" Exception occurred while updating owner: {exception}")


# COMMAND ----------

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/explorer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_validate_from_components():

def test_build_sql(sample_table_info):
sql_template = "SELECT * FROM {full_table_name}"
expected_sql = "SELECT * FROM catalog1.schema1.table1"
expected_sql = "SELECT * FROM `catalog1`.`schema1`.`table1`"
assert DataExplorerActions._build_sql(sql_template, sample_table_info) == expected_sql


Expand Down
23 changes: 23 additions & 0 deletions tests/unit/table_info_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
from discoverx.explorer import InfoFetcher, TagsInfo, TagInfo


def test_validate_from_components():
info_table = TagsInfo([], [TagInfo("a", "v1")], [], [])
info_schema = TagsInfo([], [], [TagInfo("a", "v1")], [])
info_catalog = TagsInfo([], [], [], [TagInfo("a", "v1")])
info_no_tags = TagsInfo([], [], [], [])

assert InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v2")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("b", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", None)])
# If no tags to check, then it should be true
assert InfoFetcher._contains_all_tags(info_table, [])

assert InfoFetcher._contains_all_tags(info_schema, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_catalog, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_no_tags, [])
assert not InfoFetcher._contains_all_tags(info_no_tags, [TagInfo("a", "v1")])
Loading