Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synapse destination #900

Merged
merged 25 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7bc2163
Synapse destination initial commit
Jan 18, 2024
05b0530
make var type consistent
Jan 18, 2024
dc7619a
simplify client init logic
Jan 18, 2024
702dd28
add support for table index type configuration
Jan 21, 2024
db73162
add load concurrency handling and warning
Jan 23, 2024
5891a9a
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 832-synap…
Jan 23, 2024
75be2ce
rewrite naive code to prevent IndexError
Jan 23, 2024
014543a
add support for staged Parquet loading
Jan 25, 2024
7868ca6
made table index type logic Synapse specific through destination adapter
Jan 26, 2024
b4cdd36
moved test function into tests folder and renamed test file
Jan 26, 2024
97f66e2
ensure test data gets removed
Jan 27, 2024
90685e7
add pyarrow to synapse dependencies for parquet loading
Jan 27, 2024
494e45b
added user docs for synapse destination
Jan 27, 2024
e8c6b1d
refactor dbt test skipping to prevent unnecessary venv creation
Jan 28, 2024
e1e9bb3
replace CTAS with CREATE TABLE to eliminate concurrency issues
Jan 28, 2024
99a0718
change test config type to reduce unnecessary tests
Jan 28, 2024
6d14d57
remove trailing whitespace
Jan 28, 2024
b87dd1b
refine staging table indexing
Jan 29, 2024
1c817bd
use generic statement to prevent repeating info
Jan 30, 2024
2dd979e
remove outdated documentation
Feb 1, 2024
da5cdac
add synapse destination to sidebar
Feb 1, 2024
d7d9e35
add support for additional table hints
Feb 1, 2024
e931ffb
Merge branch 'devel' into 832-synapse-destination
jorritsandbrink Feb 5, 2024
bab216d
correct content-hash after merge conflict resolution
Feb 5, 2024
c3efe33
only remove hint if it is None, not if it is empty
Feb 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test_destination_mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ jobs:
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load --ignore tests/load/pipeline/test_dbt_helper.py
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
Expand Down
90 changes: 90 additions & 0 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: test synapse

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://cf6086f7d263462088b9fb9f9947caee@o4505514867163136.ingest.sentry.io/4505516212682752
RUNTIME__LOG_LEVEL: ERROR

ACTIVE_DESTINATIONS: "[\"synapse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork }}

run_loader:
name: Tests Synapse loader
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
defaults:
run:
shell: bash
runs-on: ${{ matrix.os }}

steps:

- name: Check out
uses: actions/checkout@master

- name: Install ODBC driver for SQL Server
run: |
sudo ACCEPT_EULA=Y apt-get install --yes msodbcsql18

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E synapse -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests/load
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd

matrix_job_required_check:
name: Synapse loader tests
needs: run_loader
runs-on: ubuntu-latest
if: always()
steps:
- name: Check matrix job results
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: |
echo "One or more matrix job tests failed or were cancelled. You may need to re-run them." && exit 1
10 changes: 8 additions & 2 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ def escape_mssql_literal(v: Any) -> Any:
json.dumps(v), prefix="N'", escape_dict=MS_SQL_ESCAPE_DICT, escape_re=MS_SQL_ESCAPE_RE
)
if isinstance(v, bytes):
base_64_string = base64.b64encode(v).decode("ascii")
return f"""CAST('' AS XML).value('xs:base64Binary("{base_64_string}")', 'VARBINARY(MAX)')"""
from dlt.destinations.impl.mssql.mssql import VARBINARY_MAX_N

if len(v) <= VARBINARY_MAX_N:
n = str(len(v))
else:
n = "MAX"
return f"CONVERT(VARBINARY({n}), '{v.hex()}', 2)"

if isinstance(v, bool):
return str(int(v))
if v is None:
Expand Down
24 changes: 17 additions & 7 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,29 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# do not write INSERT INTO command, this must be added together with table name by the loader
self._f.write("INSERT INTO {}(")
self._f.write(",".join(map(self._caps.escape_identifier, headers)))
self._f.write(")\nVALUES\n")
if self._caps.insert_values_writer_type == "default":
self._f.write(")\nVALUES\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write(")\n")

def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)

def write_row(row: StrAny) -> None:
def write_row(row: StrAny, last_row: bool = False) -> None:
output = ["NULL"] * len(self._headers_lookup)
for n, v in row.items():
output[self._headers_lookup[n]] = self._caps.escape_literal(v)
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if self._caps.insert_values_writer_type == "default":
self._f.write("(")
self._f.write(",".join(output))
self._f.write(")")
if not last_row:
self._f.write(",\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write("SELECT ")
self._f.write(",".join(output))
if not last_row:
self._f.write("\nUNION ALL\n")

# if next chunk add separator
if self._chunks_written > 0:
Expand All @@ -195,10 +206,9 @@ def write_row(row: StrAny) -> None:
# write rows
for row in rows[:-1]:
write_row(row)
self._f.write(",\n")

# write last row without separator so we can write footer eventually
write_row(rows[-1])
write_row(rows[-1], last_row=True)
self._chunks_written += 1

def write_footer(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
schema_supports_numeric_precision: bool = True
timestamp_precision: int = 6
max_rows_per_insert: Optional[int] = None
insert_values_writer_type: str = "default"

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,20 @@ def data_tables(self, include_incomplete: bool = False) -> List[TTableSchema]:
)
]

def data_table_names(self) -> List[str]:
"""Returns list of table table names. Excludes dlt table names."""
return [t["name"] for t in self.data_tables()]

def dlt_tables(self) -> List[TTableSchema]:
"""Gets dlt tables"""
return [
t for t in self._schema_tables.values() if t["name"].startswith(self._dlt_tables_prefix)
]

def dlt_table_names(self) -> List[str]:
"""Returns list of dlt table names."""
return [t["name"] for t in self.dlt_tables()]

def get_preferred_type(self, col_name: str) -> Optional[TDataType]:
return next((m[1] for m in self._compiled_preferred_types if m[0].search(col_name)), None)

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate
from dlt.destinations.impl.synapse.factory import synapse


__all__ = [
Expand All @@ -25,4 +26,5 @@
"qdrant",
"motherduck",
"weaviate",
"synapse",
]
3 changes: 2 additions & 1 deletion dlt/destinations/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

from dlt.destinations.impl.weaviate import weaviate_adapter
from dlt.destinations.impl.qdrant import qdrant_adapter
from dlt.destinations.impl.synapse import synapse_adapter

__all__ = ["weaviate_adapter", "qdrant_adapter"]
__all__ = ["weaviate_adapter", "qdrant_adapter", "synapse_adapter"]
31 changes: 19 additions & 12 deletions dlt/destinations/impl/mssql/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Final, ClassVar, Any, List, Optional, TYPE_CHECKING
from typing import Final, ClassVar, Any, List, Dict, Optional, TYPE_CHECKING
from sqlalchemy.engine import URL

from dlt.common.configuration import configspec
Expand All @@ -10,9 +10,6 @@
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration


SUPPORTED_DRIVERS = ["ODBC Driver 18 for SQL Server", "ODBC Driver 17 for SQL Server"]


@configspec
class MsSqlCredentials(ConnectionStringCredentials):
drivername: Final[str] = "mssql" # type: ignore
Expand All @@ -24,22 +21,27 @@ class MsSqlCredentials(ConnectionStringCredentials):

__config_gen_annotations__: ClassVar[List[str]] = ["port", "connect_timeout"]

SUPPORTED_DRIVERS: ClassVar[List[str]] = [
"ODBC Driver 18 for SQL Server",
"ODBC Driver 17 for SQL Server",
]

def parse_native_representation(self, native_value: Any) -> None:
# TODO: Support ODBC connection string or sqlalchemy URL
super().parse_native_representation(native_value)
if self.query is not None:
self.query = {k.lower(): v for k, v in self.query.items()} # Make case-insensitive.
if "driver" in self.query and self.query.get("driver") not in SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.query.get('driver')}" is not supported."""
f" Choose one of the supported drivers: {', '.join(SUPPORTED_DRIVERS)}."
)
self.driver = self.query.get("driver", self.driver)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
if not self.is_partial():
self.resolve()

def on_resolved(self) -> None:
if self.driver not in self.SUPPORTED_DRIVERS:
raise SystemConfigurationException(
f"""The specified driver "{self.driver}" is not supported."""
f" Choose one of the supported drivers: {', '.join(self.SUPPORTED_DRIVERS)}."
)
self.database = self.database.lower()

def to_url(self) -> URL:
Expand All @@ -55,20 +57,21 @@ def on_partial(self) -> None:
def _get_driver(self) -> str:
if self.driver:
return self.driver

# Pick a default driver if available
import pyodbc

available_drivers = pyodbc.drivers()
for d in SUPPORTED_DRIVERS:
for d in self.SUPPORTED_DRIVERS:
if d in available_drivers:
return d
docs_url = "https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16"
raise SystemConfigurationException(
f"No supported ODBC driver found for MS SQL Server. See {docs_url} for information on"
f" how to install the '{SUPPORTED_DRIVERS[0]}' on your platform."
f" how to install the '{self.SUPPORTED_DRIVERS[0]}' on your platform."
)

def to_odbc_dsn(self) -> str:
def _get_odbc_dsn_dict(self) -> Dict[str, Any]:
params = {
"DRIVER": self.driver,
"SERVER": f"{self.host},{self.port}",
Expand All @@ -78,6 +81,10 @@ def to_odbc_dsn(self) -> str:
}
if self.query is not None:
params.update({k.upper(): v for k, v in self.query.items()})
return params

def to_odbc_dsn(self) -> str:
params = self._get_odbc_dsn_dict()
return ";".join([f"{k}={v}" for k, v in params.items()])


Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@


HINT_TO_MSSQL_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"}
VARCHAR_MAX_N: int = 4000
VARBINARY_MAX_N: int = 8000


class MsSqlTypeMapper(TypeMapper):
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def drop_dataset(self) -> None:
)
table_names = [row[0] for row in rows]
self.drop_tables(*table_names)

self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())
# Drop schema
self._drop_schema()

def _drop_views(self, *tables: str) -> None:
if not tables:
Expand All @@ -117,6 +117,9 @@ def _drop_views(self, *tables: str) -> None:
]
self.execute_fragments(statements)

def _drop_schema(self) -> None:
self.execute_sql("DROP SCHEMA IF EXISTS %s;" % self.fully_qualified_dataset_name())

def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
Expand Down
11 changes: 2 additions & 9 deletions dlt/destinations/impl/qdrant/qdrant_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.extract import DltResource, resource as make_resource
from dlt.destinations.utils import ensure_resource

VECTORIZE_HINT = "x-qdrant-embed"

Expand Down Expand Up @@ -31,15 +32,7 @@ def qdrant_adapter(
>>> qdrant_adapter(data, embed="description")
[DltResource with hints applied]
"""
# wrap `data` in a resource if not an instance already
resource: DltResource
if not isinstance(data, DltResource):
resource_name: str = None
if not hasattr(data, "__name__"):
resource_name = "content"
resource = make_resource(data, name=resource_name)
else:
resource = data
resource = ensure_resource(data)

column_hints: TTableSchemaColumns = {}

Expand Down
Loading
Loading