Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synapse destination #900

Merged
merged 25 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7bc2163
Synapse destination initial commit
Jan 18, 2024
05b0530
make var type consistent
Jan 18, 2024
dc7619a
simplify client init logic
Jan 18, 2024
702dd28
add support for table index type configuration
Jan 21, 2024
db73162
add load concurrency handling and warning
Jan 23, 2024
5891a9a
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 832-synap…
Jan 23, 2024
75be2ce
rewrite naive code to prevent IndexError
Jan 23, 2024
014543a
add support for staged Parquet loading
Jan 25, 2024
7868ca6
made table index type logic Synapse specific through destination adapter
Jan 26, 2024
b4cdd36
moved test function into tests folder and renamed test file
Jan 26, 2024
97f66e2
ensure test data gets removed
Jan 27, 2024
90685e7
add pyarrow to synapse dependencies for parquet loading
Jan 27, 2024
494e45b
added user docs for synapse destination
Jan 27, 2024
e8c6b1d
refactor dbt test skipping to prevent unnecessary venv creation
Jan 28, 2024
e1e9bb3
replace CTAS with CREATE TABLE to eliminate concurrency issues
Jan 28, 2024
99a0718
change test config type to reduce unnecessary tests
Jan 28, 2024
6d14d57
remove trailing whitespace
Jan 28, 2024
b87dd1b
refine staging table indexing
Jan 29, 2024
1c817bd
use generic statement to prevent repeating info
Jan 30, 2024
2dd979e
remove outdated documentation
Feb 1, 2024
da5cdac
add synapse destination to sidebar
Feb 1, 2024
d7d9e35
add support for additional table hints
Feb 1, 2024
e931ffb
Merge branch 'devel' into 832-synapse-destination
jorritsandbrink Feb 5, 2024
bab216d
correct content-hash after merge conflict resolution
Feb 5, 2024
c3efe33
only remove hint if it is None, not if it is empty
Feb 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: test synapse

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://cf6086f7d263462088b9fb9f9947caee@o4505514867163136.ingest.sentry.io/4505516212682752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"synapse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Synapse loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Install ODBC driver for SQL Server
run: |
sudo ACCEPT_EULA=Y apt-get install --yes msodbcsql18

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd

matrix_job_required_check:
name: Synapse loader tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
10 changes: 8 additions & 2 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ def escape_mssql_literal(v: Any) -> Any:
json.dumps(v), prefix="N'", escape_dict=MS_SQL_ESCAPE_DICT, escape_re=MS_SQL_ESCAPE_RE
)
if isinstance(v, bytes):
base_64_string = base64.b64encode(v).decode("ascii")
return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')"""
from dlt.destinations.impl.mssql.mssql import VARBINARY_MAX_N

if len(v) <= VARBINARY_MAX_N:
n = str(len(v))
else:
n = "MAX"
return f"CONVERT(VARBINARY({n}), '{v.hex()}', 2)"

if isinstance(v, bool):
return str(int(v))
if v is None:
Expand Down
24 changes: 17 additions & 7 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,29 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# do not write INSERT INTO command, this must be added together with table name by the loader
self._f.write("INSERT INTO {}(")
self._f.write(",".join(map(self._caps.escape_identifier, headers)))
self._f.write(")\nVALUES\n")
if self._caps.insert_values_writer_type == "default":
self._f.write(")\nVALUES\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write(")\n")

def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)

def write_row(row: StrAny) -> None:
def write_row(row: StrAny, last_row: bool = False) -> None:
output = ["NULL"] * len(self._headers_lookup)
for n, v in row.items():
output[self._headers_lookup[n]] = self._caps.escape_literal(v)
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if self._caps.insert_values_writer_type == "default":
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if not last_row:
self._f.write(",\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write("SELECT ")
self._f.write(",".join(output))
if not last_row:
self._f.write("\nUNION ALL\n")

# if next chunk add separator
if self._chunks_written > 0:
Expand All @@ -195,10 +206,9 @@ def write_row(row: StrAny) -> None:
# write rows
for row in rows[:-1]:
write_row(row)
self._f.write(",\n")

# write last row without separator so we can write footer eventually
write_row(rows[-1])
write_row(rows[-1], last_row=True)
self._chunks_written += 1

def write_footer(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
schema_supports_numeric_precision: bool = True
timestamp_precision: int = 6
max_rows_per_insert: Optional[int] = None
insert_values_writer_type: str = "default"

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.schema.utils import get_write_disposition, get_table_format, get_table_index_type
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
Expand Down Expand Up @@ -372,6 +372,8 @@ def get_load_table(self, table_name: str, prepare_for_staging: bool = False) ->
table["write_disposition"] = get_write_disposition(self.schema.tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(self.schema.tables, table_name)
if "table_index_type" not in table:
table["table_index_type"] = get_table_index_type(self.schema.tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,20 @@ def data_tables(self, include_incomplete: bool = False) -> List[TTableSchema]:
)
]

def data_table_names(self) -> List[str]:
"""Returns list of table table names. Excludes dlt table names."""
return [t["name"] for t in self.data_tables()]

def dlt_tables(self) -> List[TTableSchema]:
"""Gets dlt tables"""
return [
t for t in self._schema_tables.values() if t["name"].startswith(self._dlt_tables_prefix)
]

def dlt_table_names(self) -> List[str]:
"""Returns list of dlt table names."""
return [t["name"] for t in self.dlt_tables()]

def get_preferred_type(self, col_name: str) -> Optional[TDataType]:
return next((m[1] for m in self._compiled_preferred_types if m[0].search(col_name)), None)

Expand Down
3 changes: 3 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTableIndexType = Literal["heap", "clustered_columnstore_index"]
"Table index type. Currently only used for Synapse destination."
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
Expand Down Expand Up @@ -165,6 +167,7 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
table_index_type: Optional[TTableIndexType]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix we can switch between 2 index types (https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index) and we have a new table hint here. I am wondering if we can re-use table_format or wether this needs to be a separate hint. I am not quite sure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to keep table_format limited to table formats such as Iceberg, Delta, and Hudi. Then within a certain table format, one can still choose between different table indexing types supported by that table format. Look at Hudi for example, which supports a bunch of different indexing types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a case for destination adapter. it sets per-destination options on a resource using x- hints. please look into:
https://github.com/dlt-hub/dlt/blob/devel/dlt/destinations/adapters.py
and bigquery adapter that @Pipboyguy is doing: #855

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the code to make use of a destination adapter. Looked at qdrant_adapter and weaviate_adapter for inspiration. The synapse_adapter is slightly different because it works with a table hint, not column hints like qdrant and weaviate. Since x-table-index-type is not defined on TResourceHints, I have to ignore mypy's typeddict-unknown-key errors. Is this what you had in mind @rudolfix?



class TPartialTableSchema(TTableSchema):
Expand Down
12 changes: 12 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
TColumnSchema,
TColumnProp,
TTableFormat,
TTableIndexType,
TColumnHint,
TTypeDetectionFunc,
TTypeDetections,
Expand Down Expand Up @@ -618,6 +619,14 @@ def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
)


def get_table_index_type(tables: TSchemaTables, table_name: str) -> TTableIndexType:
"""Returns table index type of a table if present. If not, looks up into parent table."""
return cast(
TTableIndexType,
get_inherited_table_hint(tables, table_name, "table_index_type", allow_none=True),
)


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
"""Checks if `table` schema contains column with type _typ"""
return any(c.get("data_type") == _typ for c in table["columns"].values())
Expand Down Expand Up @@ -724,6 +733,7 @@ def new_table(
resource: str = None,
schema_contract: TSchemaContract = None,
table_format: TTableFormat = None,
table_index_type: TTableIndexType = None,
) -> TTableSchema:
table: TTableSchema = {
"name": table_name,
Expand All @@ -742,6 +752,8 @@ def new_table(
table["schema_contract"] = schema_contract
if table_format:
table["table_format"] = table_format
if table_index_type is not None:
table["table_index_type"] = table_index_type
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate
from dlt.destinations.impl.synapse.factory import synapse


__all__ = [
Expand All @@ -25,4 +26,5 @@
"qdrant",
"motherduck",
"weaviate",
"synapse",
]
31 changes: 19 additions & 12 deletions dlt/destinations/impl/mssql/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Final, ClassVar, Any, List, Optional, TYPE_CHECKING
from typing import Final, ClassVar, Any, List, Dict, Optional, TYPE_CHECKING
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
Expand All @@ -10,9 +10,6 @@
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


SUPPORTED_DRIVERS = ["ODBC Driver 18 for SQL Server", "ODBC Driver 17 for SQL Server"]


@configspec
class MsSqlCredentials(ConnectionStringCredentials):
drivername: Final[str] = "mssql" # type: ignore
Expand All @@ -24,22 +21,27 @@ class MsSqlCredentials(ConnectionStringCredentials):

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

SUPPORTED_DRIVERS: ClassVar[List[str]] = [
"ODBC Driver 18 for SQL Server",
"ODBC Driver 17 for SQL Server",
]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
if self.query is not None:
self.query = {k.lower(): v for k, v in self.query.items()} # Make case-insensitive.
if "driver" in self.query and self.query.get("driver") not in SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.query.get('driver')}" is not supported."""
f" Choose one of the supported drivers: {', '.join(SUPPORTED_DRIVERS)}."
)
self.driver = self.query.get("driver", self.driver)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
if self.driver not in self.SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.driver}" is not supported."""
f" Choose one of the supported drivers: {', '.join(self.SUPPORTED_DRIVERS)}."
)
self.database = self.database.lower()

def to_url(self) -> URL:
Expand All @@ -55,20 +57,21 @@ def on_partial(self) -> None:
def _get_driver(self) -> str:
if self.driver:
return self.driver

# Pick a default driver if available
import pyodbc

available_drivers = pyodbc.drivers()
for d in SUPPORTED_DRIVERS:
for d in self.SUPPORTED_DRIVERS:
if d in available_drivers:
return d
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. See {docs_url} for information on"
f" how to install the '{SUPPORTED_DRIVERS[0]}' on your platform."
f" how to install the '{self.SUPPORTED_DRIVERS[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
def _get_odbc_dsn_dict(self) -> Dict[str, Any]:
params = {
"DRIVER": self.driver,
"SERVER": f"{self.host},{self.port}",
Expand All @@ -78,6 +81,10 @@ def to_odbc_dsn(self) -> str:
}
if self.query is not None:
params.update({k.upper(): v for k, v in self.query.items()})
return params

def to_odbc_dsn(self) -> str:
params = self._get_odbc_dsn_dict()
return ";".join([f"{k}={v}" for k, v in params.items()])


Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@


HINT_TO_MSSQL_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"}
VARCHAR_MAX_N: int = 4000
VARBINARY_MAX_N: int = 8000


class MsSqlTypeMapper(TypeMapper):
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def drop_dataset(self) -> None:
)
table_names = [row[0] for row in rows]
self.drop_tables(*table_names)

self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())
# Drop schema
self._drop_schema()

def _drop_views(self, *tables: str) -> None:
if not tables:
Expand All @@ -117,6 +117,9 @@ def _drop_views(self, *tables: str) -> None:
]
self.execute_fragments(statements)

def _drop_schema(self) -> None:
self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())

def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
Expand Down
Loading
Loading