Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
azorej committed Oct 10, 2024
1 parent 069ffa4 commit 2dc689c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
2 changes: 1 addition & 1 deletion dbxio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from dbxio.utils import * # noqa: F403
from dbxio.volume import * # noqa: F403

__version__ = '0.5.6' # single source of truth
__version__ = '0.5.0' # single source of truth
8 changes: 5 additions & 3 deletions dbxio/delta/table_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def create_table(table: Union[str, Table], client: 'DbxIOClient', replace: bool
[USING <table_format> LOCATION <location>]
[PARTITIONED BY (col1, col2, ...)]
"""
query = _create_table_query(table, replace, True, True)
query = _create_table_query(table, replace, if_not_exists=True, include_schema=True)
return client.sql(query)


Expand Down Expand Up @@ -230,24 +230,26 @@ def read_files_as_table(
Copy data from blob storage as a table. All files that match the pattern *.{table_format} will be copied.
If force_schema == False it will use schemaHints instead of schema option
"""
create_query = _create_table_query(table, replace, False, False)
create_query = _create_table_query(table, replace, if_not_exists=False, include_schema=False)
options = {
'format': f"'{table_format.value.lower()}'",
}
if include_files_pattern:
options['fileNamePattern'] = f"'*.{table_format.value.lower()}'"
if table.schema:
sql_schema = f"'{table.schema.as_sql()}'"
columns_exp = ', '.join(table.schema.columns)
if force_schema:
options['schema'] = sql_schema
else:
options['schemaHints'] = sql_schema
else:
columns_exp = '*'
options['mergeSchema'] = 'true'

options_query = ',\n'.join([f'{k} => {v}' for k, v in options.items()])
select_query = dedent(f"""
AS SELECT *
AS SELECT {columns_exp}
FROM read_files(
'abfss://{abs_container_name}@{abs_name}.dfs.core.windows.net/{blob_path}',
{options_query}
Expand Down
30 changes: 25 additions & 5 deletions tests/test_table_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
bulk_write_local_files,
bulk_write_table,
copy_into_table,
read_files_as_table,
create_table,
drop_table,
exists_table,
get_comment_on_table,
get_tags_on_table,
merge_table,
read_files_as_table,
read_table,
save_table_to_files,
set_comment_on_table,
Expand Down Expand Up @@ -168,8 +168,8 @@ def test_read_files_as_table(self, mock_sql):
)
expected_query = dedent("""
CREATE TABLE `catalog`.`schema`.`table`
AS SELECT * FROM read_files( 'abfss://test_abs_container_name@test_abs_name.dfs.core.windows.net/test-bucket/test-path', format => 'parquet', schema => '`a` INT, `b` INT' )
""")
AS SELECT a, b FROM read_files( 'abfss://test_abs_container_name@test_abs_name.dfs.core.windows.net/test-bucket/test-path', format => 'parquet', schema => '`a` INT, `b` INT' )
""") # noqa: E501
observed_query = mock_sql.call_args[0][0].query

assert flatten_query(observed_query) == flatten_query(expected_query)
Expand All @@ -188,8 +188,28 @@ def test_read_files_as_replace_table(self, mock_sql):
)
expected_query = dedent("""
CREATE OR REPLACE TABLE `catalog`.`schema`.`table`
AS SELECT * FROM read_files( 'abfss://test_abs_container_name@test_abs_name.dfs.core.windows.net/test-bucket/test-path', format => 'parquet', schemaHints => '`a` INT, `b` INT' )
""")
AS SELECT a, b FROM read_files( 'abfss://test_abs_container_name@test_abs_name.dfs.core.windows.net/test-bucket/test-path', format => 'parquet', schemaHints => '`a` INT, `b` INT' )
""") # noqa: E501
observed_query = mock_sql.call_args[0][0].query

assert flatten_query(observed_query) == flatten_query(expected_query)

@patch.object(DbxIOClient, 'sql', side_effect=sql_mock)
def test_read_files_as_table_without_schema(self, mock_sql):
read_files_as_table(
self.client,
table=Table('catalog.schema.table'),
blob_path='test-bucket/test-path',
table_format=TableFormat.PARQUET,
abs_name='test_abs_name',
abs_container_name='test_abs_container_name',
replace=True,
force_schema=False,
)
expected_query = dedent("""
CREATE OR REPLACE TABLE `catalog`.`schema`.`table`
AS SELECT * FROM read_files( 'abfss://test_abs_container_name@test_abs_name.dfs.core.windows.net/test-bucket/test-path', format => 'parquet', mergeSchema => true )
""") # noqa: E501
observed_query = mock_sql.call_args[0][0].query

assert flatten_query(observed_query) == flatten_query(expected_query)
Expand Down

0 comments on commit 2dc689c

Please sign in to comment.