Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/devel' into docs/update_filesyst…
Browse files Browse the repository at this point in the history
…em_docs
  • Loading branch information
dat-a-man committed May 24, 2024
2 parents 9225803 + 7c07c67 commit 862d634
Show file tree
Hide file tree
Showing 33 changed files with 576 additions and 257 deletions.
24 changes: 24 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ We use **master** branch for hot fixes (including documentation) that needs to b

On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**.

### Branch naming rules

We want to make sure that our git history explains in a human readable way what has been changed with which Branch or PR. To this end, we are using the following branch naming pattern (all lowercase and dashes, no underscores):

```sh
{category}/{ticket-id}-description-of-the-branch
# example:
feat/4922-add-avro-support
```

#### Branch categories

* **feat** - a new feature that is being implemented (ticket required)
* **fix** - a change that fixes a bug (ticket required)
* **exp** - an experiment where we are testing a new idea or want to demonstrate something to the team, might turn into a `feat` later (ticket encouraged)
* **test** - anything related to the tests (ticket encouraged)
* **blogs** - a new entry to our blog (ticket optional)
* **docs** - a change to our docs (ticket optional)

#### Ticket Numbers

We encourage you to attach your branches to a ticket, if none exists, create one and explain what you are doing. For `feat` and `fix` branches, tickets are mandatory, for `exp` and `test` branches encouraged and for `blogs` and `docs` branches optional.

### Submitting a hotfix
We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.

Expand Down Expand Up @@ -166,3 +189,4 @@ Once the version has been bumped, follow these steps to publish the new release
- [Poetry Documentation](https://python-poetry.org/docs/)

If you have any questions or need help, don't hesitate to reach out to us. We're here to help you succeed in contributing to `dlt`. Happy coding!
****
3 changes: 3 additions & 0 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def __init__(
self.closed_files: List[DataWriterMetrics] = [] # all fully processed files
# buffered items must be less than max items in file
self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items)
# Explicitly configured max size supersedes destination limit
self.file_max_bytes = file_max_bytes
if self.file_max_bytes is None and _caps:
self.file_max_bytes = _caps.recommended_file_size
self.file_max_items = file_max_items
# the open function is either gzip.open or open
self.open = (
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
supported_staging_file_formats: Sequence[TLoaderFileFormat] = None
escape_identifier: Callable[[str], str] = None
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
# BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size
caps.recommended_file_size = int(1024 * 1024 * 1024)
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.format_datetime_literal = format_bigquery_datetime_literal
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
# A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet
caps.max_query_length = 65536 * 10
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 2**30 - 1
caps.is_max_text_data_type_length_in_bytes = False
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
if c.get(h, False) is True
)
column_name = self.capabilities.escape_identifier(c["name"])
return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}"
return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c.get('nullable', True))}"

def _create_replace_followup_jobs(
self, table_chain: Sequence[TTableSchema]
Expand Down
15 changes: 6 additions & 9 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,13 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl:
compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs),
incremental,
)
except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex:
except InvalidResourceDataTypeFunctionNotAGenerator:
# we allow an edge case: resource can return another resource
try:
# actually call the function to see if it contains DltResource
data_ = conf_f(*args, **kwargs)
if not isinstance(data_, DltResource):
raise
r = data_ # type: ignore[assignment]
except Exception:
raise gen_ex from None
# actually call the function to see if it contains DltResource
data_ = conf_f(*args, **kwargs)
if not isinstance(data_, DltResource):
raise
r = data_ # type: ignore[assignment]
# consider transformer arguments bound
r._args_bound = True
# keep explicit args passed
Expand Down
3 changes: 3 additions & 0 deletions dlt/load/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration):
raise_on_max_retries: int = 5
"""When gt 0 will raise when job reaches raise_on_max_retries"""
_load_storage_config: LoadStorageConfiguration = None
# if set to `True`, the staging dataset will be
# truncated after loading the data
truncate_staging_dataset: bool = False

def on_resolved(self) -> None:
self.pool_type = "none" if self.workers == 1 else "thread"
35 changes: 34 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
LoadClientUnsupportedWriteDisposition,
LoadClientUnsupportedFileFormats,
)
from dlt.load.utils import get_completed_table_chain, init_client
from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client


class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]):
Expand Down Expand Up @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False)
)
):
job_client.complete_load(load_id)
self._maybe_trancate_staging_dataset(schema, job_client)

self.load_storage.complete_load_package(load_id, aborted)
# collect package info
self._loaded_packages.append(self.load_storage.get_load_package_info(load_id))
Expand Down Expand Up @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:

return TRunMetrics(False, len(self.load_storage.list_normalized_packages()))

def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None:
"""
Truncate the staging dataset if one used,
and configuration requests truncation.
Args:
schema (Schema): Schema to use for the staging dataset.
job_client (JobClientBase):
Job client to use for the staging dataset.
"""
if not (
isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset
):
return

data_tables = schema.data_table_names()
tables = _extend_tables_with_table_chain(
schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset
)

try:
with self.get_destination_client(schema) as client:
with client.with_staging_dataset(): # type: ignore
client.initialize_storage(truncate_tables=tables)

except Exception as exc:
logger.warn(
f"Staging dataset truncate failed due to the following error: {exc}"
" However, it didn't affect the data integrity."
)

def get_step_info(
self,
pipeline: SupportsPipeline,
Expand Down
1 change: 1 addition & 0 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ def load(
with signals.delayed_signals():
runner.run_pool(load_step.config, load_step)
info: LoadInfo = self._get_step_info(load_step)

self.first_run = False
return info
except Exception as l_ex:
Expand Down
9 changes: 7 additions & 2 deletions dlt/sources/helpers/rest_client/detector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from typing import List, Dict, Any, Tuple, Union, Optional, Callable, Iterable
from pathlib import PurePosixPath
from typing import List, Dict, Any, Tuple, Union, Callable, Iterable
from urllib.parse import urlparse

from requests import Response
Expand All @@ -25,6 +26,7 @@
"payload",
"content",
"objects",
"values",
]
)

Expand All @@ -46,7 +48,10 @@

def single_entity_path(path: str) -> bool:
"""Checks if path ends with path param indicating that single object is returned"""
return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}/?$", path) is not None
# get last path segment
name = PurePosixPath(path).name
# alphabet for a name taken from https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.0.3.md#fixed-fields-6
return re.search(r"\{([a-zA-Z0-9\.\-_]+)\}", name) is not None


def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
In this example, you'll find a Python script that demonstrates how to load to BigQuery with the custom destination.
We'll learn how to:
- Use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials)
- Use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on BigQuery
- Use BigQuery `autodetect=True` for schema inference from parquet files
- Use [built-in credentials.](../general-usage/credentials/config_specs#gcp-credentials)
- Use the [custom destination.](../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on BigQuery.
- Use BigQuery `autodetect=True` for schema inference from parquet files.
"""

Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ keywords: [Snowflake, destination, data warehouse]
## Install `dlt` with Snowflake
**To install the `dlt` library with Snowflake dependencies, run:**
```sh
pip install dlt[snowflake]
pip install "dlt[snowflake]"
```

## Setup Guide
Expand Down
36 changes: 33 additions & 3 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ The fields in the endpoint configuration are:
- `json`: The JSON payload to be sent with the request (for POST and PUT requests).
- `paginator`: Pagination configuration for the endpoint. See the [pagination](#pagination) section for more details.
- `data_selector`: A JSONPath to select the data from the response. See the [data selection](#data-selection) section for more details.
- `response_actions`: A list of actions that define how to process the response data.
- `response_actions`: A list of actions that define how to process the response data. See the [response actions](#response-actions) section for more details.
- `incremental`: Configuration for [incremental loading](#incremental-loading).

### Pagination
Expand Down Expand Up @@ -414,8 +414,8 @@ Available authentication types:
| Authentication class | String Alias (`type`) | Description |
| ------------------- | ----------- | ----------- |
| [BearTokenAuth](../../general-usage/http/rest-client.md#bearer-token-authentication) | `bearer` | Bearer token authentication. |
| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `api_key` | Basic HTTP authentication. |
| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `http_basic` | API key authentication with key defined in the query parameters or in the headers. |
| [HTTPBasicAuth](../../general-usage/http/rest-client.md#http-basic-authentication) | `http_basic` | Basic HTTP authentication. |
| [APIKeyAuth](../../general-usage/http/rest-client.md#api-key-authentication) | `api_key` | API key authentication with key defined in the query parameters or in the headers. |

To specify the authentication configuration, use the `auth` field in the [client](#client) configuration:

Expand Down Expand Up @@ -586,3 +586,33 @@ See the [incremental loading](../../general-usage/incremental-loading.md#increme
- `root_key` (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False.
- `schema_contract`: Schema contract settings that will be applied to this resource.
- `spec`: A specification of configuration and secret values required by the source.

### Response actions

The `response_actions` field in the endpoint configuration allows you to specify how to handle specific responses from the API based on status codes or content substrings. This is useful for handling edge cases like ignoring responses on specific conditions.

:::caution Experimental Feature
This is an experimental feature and may change in future releases.
:::

#### Example

```py
{
"path": "issues",
"response_actions": [
{"status_code": 404, "action": "ignore"},
{"content": "Not found", "action": "ignore"},
{"status_code": 200, "content": "some text", "action": "ignore"},
],
}
```

In this example, the source will ignore responses with a status code of 404, responses with the content "Not found", and responses with a status code of 200 _and_ content "some text".

**Fields:**

- `status_code` (int, optional): The HTTP status code to match.
- `content` (str, optional): A substring to search for in the response content.
- `action` (str): The action to take when the condition is met. Currently supported actions:
- `ignore`: Ignore the response.
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/verified-sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps:

[This command](../../reference/command-line-interface) will initialize
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py)
with Google Sheets as the [source](../../general-usage/source) and
with Slack as the [source](../../general-usage/source) and
[duckdb](../destinations/duckdb.md) as the [destination](../destinations).

1. If you'd like to use a different destination, simply replace `duckdb` with the name of your
Expand Down
Loading

0 comments on commit 862d634

Please sign in to comment.