From 0add817b01ac856b5b82c6fa30430e30f1738781 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Wed, 11 Sep 2024 17:42:54 +0200 Subject: [PATCH] Rebase on the latest devel --- .../verified-sources/filesystem/advanced.md | 17 +- .../verified-sources/filesystem/basic.md | 160 +++++++++++++++--- .../verified-sources/filesystem/index.md | 6 +- .../dlt-ecosystem/verified-sources/index.md | 4 +- docs/website/sidebars.js | 1 + 5 files changed, 148 insertions(+), 40 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md index 3e4d678558..1f8dff684a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md @@ -1,12 +1,12 @@ --- -title: Advanced filesystem usage +title: Advanced Filesystem Usage description: Use filesystem source as a building block keywords: [readers source and filesystem, files, filesystem, readers source, cloud storage] --- The filesystem source provides the building blocks to load data from files. This section explains how you can customize the filesystem source for your use case. -## Standalone Filesystem Resource +## Standalone filesystem resource You can use the [standalone filesystem](../../../general-usage/resource#declare-a-standalone-resource) resource to list files in cloud storage or a local filesystem. This allows you to customize file readers or manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html). @@ -30,7 +30,7 @@ The filesystem ensures consistent file representation across bucket types and of - File content is typically not loaded (you can control it with the `extract_content` parameter of the filesystem resource). Instead, full file info and methods to access content are available. - Users can request an authenticated [fsspec AbstractFileSystem](https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/spec.html#AbstractFileSystem) instance. -#### `FileItem` fields: +#### `FileItem` fields - `file_url` - complete URL of the file (e.g. `s3://bucket-name/path/file`). This field serves as a primary key. - `file_name` - name of the file from the bucket URL. @@ -52,13 +52,13 @@ When using a nested or recursive glob pattern, `relative_path` will include the - `open()` - method which provides a file object when opened. - `filesystem` - field, which gives access to authorized `AbstractFilesystem` with standard fsspec methods. -## Create Your Own Transformer +## Create your own transformer Although the `filesystem` resource yields the files from cloud storage or a local filesystem, you need to apply a transformer resource to retrieve the records from files. `dlt` natively supports three file types: `csv`, `parquet`, and `jsonl` (more details in [filesystem transformer resource](../filesystem/basic#2-choose-the-right-transformer-resource)). But you can easily create your own. In order to do this, you just need a function that takes as input a `FileItemDict` iterator and yields a list of records (recommended for performance) or individual records. -### Example: Read Data from Excel Files +### Example: read data from Excel files The code below sets up a pipeline that reads from an Excel file using a standalone transformer: @@ -97,7 +97,7 @@ load_info = pipeline.run(example_xls.with_name("example_xls_data")) print(load_info) ``` -### Example: Read Data from XML Files +### Example: read data from XML files You can use any third-party library to parse an `xml` file (e.g., [BeautifulSoup](https://pypi.org/project/beautifulsoup4/), [pandas](https://pandas.pydata.org/docs/reference/api/pandas.read_xml.html)). In the following example, we will be using the [xmltodict](https://pypi.org/project/xmltodict/) Python library. @@ -135,7 +135,7 @@ load_info = pipeline.run(example_xml.with_name("example_xml_data")) print(load_info) ``` -## Clean Files After Loading +## Clean files after loading You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows: @@ -153,7 +153,7 @@ fs_client = fsspec_from_resource(gs_resource) fs_client.ls("ci-test-bucket/standard_source/samples") ``` -## Copy Files Locally +## Copy files locally To copy files locally, add a step in the filesystem resource and then load the listing to the database: @@ -162,7 +162,6 @@ import os import dlt from dlt.common.storages.fsspec_filesystem import FileItemDict -from dlt.common.typing import TDataItems from dlt.sources.filesystem import filesystem def _copy(item: FileItemDict) -> FileItemDict: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md index 489cb746d6..6dc9010a51 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md @@ -6,10 +6,18 @@ keywords: [readers source and filesystem, files, filesystem, readers source, clo import Header from '../_source-info-header.md';
-Filesystem source is a generic source that allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure) or the local filesystem seamlessly. Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files. +Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure) or the local filesystem seamlessly. Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files. To load unstructured data (`.pdf`, `.txt`, e-mail), please refer to the [unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data). +## How Filesystem source works? + +The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs. + +Filesystem source loads data in two steps: +1. It [accesses the files](#1-initialize-a-filesystem-resource) in your remote or local file storage without actually reading the content yet. At this point, you can [filter files by metadata or name](#6-filter-files). You can also set up [incremental loading](#5-incremental-loading) to load only new files. +2. [The transformer](#2-choose-the-right-transformer-resource) reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or [perform incremental loading](#load-new-records-based-on-a-specific-column) based on the file content. + ## Quick example ```py @@ -228,10 +236,10 @@ and default credentials. To learn more about adding credentials to your pipeline ## Usage The filesystem source is quite unique since it provides you with building blocks for loading data from files. -First, it iterates over files in the storage and then process each file to yield the records. +First, it iterates over files in the storage and then processes each file to yield the records. Usually, you need two resources: -1. The filesystem resource enumerates files in a selected bucket using a glob pattern, returning details as `FileInfo` in customizable page sizes. +1. The `filesystem` resource enumerates files in a selected bucket using a glob pattern, returning details as `FileInfo` in customizable page sizes. 2. One of the available transformer resources to process each file in a specific transforming function and yield the records. ### 1. Initialize a `filesystem` resource @@ -246,11 +254,21 @@ filesystem_source = filesystem( ) ``` or taken from the config: -```py -from dlt.sources.filesystem import filesystem -filesystem_source = filesystem() -``` +* python code: + + ```py + from dlt.sources.filesystem import filesystem + + filesystem_source = filesystem() + ``` + +* configuration file: + ```toml + [sources.filesystem] + bucket_url="file://Users/admin/Documents/csv_files" + file_glob="*.csv" + ``` Full list of `filesystem` resource parameters: @@ -263,7 +281,7 @@ Full list of `filesystem` resource parameters: ### 2. Choose the right transformer resource The current implementation of the filesystem source natively supports three file types: `csv`, `parquet`, and `jsonl`. -You can apply any of the above or create your own [transformer](advanced#create-your-own-transformer). To apply the selected transformer +You can apply any of the above or [create your own transformer](advanced#create-your-own-transformer). To apply the selected transformer resource, use pipe notation `|`: ```py @@ -277,10 +295,10 @@ filesystem_pipe = filesystem( #### Available transformers -- `read_csv()` -- `read_jsonl()` -- `read_parquet()` -- `read_csv_duckdb()` +- `read_csv()` - process `csv` files using `pandas` +- `read_jsonl()` - process `jsonl` files chuck by chunk +- `read_parquet()` - process `parquet` files using `pyarrow` +- `read_csv_duckdb()` - this transformer process `csv` files using DuckDB, which usually shows better performance, than `pandas`. :::tip We advise that you give each resource a @@ -318,27 +336,117 @@ filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date") pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") load_info = pipeline.run(filesystem_pipe.with_name("table_name")) print(load_info) -print(pipeline.last_trace.last_normalize_info) ``` ### 5. Incremental loading -To load only new CSV files with [incremental loading](../../../general-usage/incremental-loading): +Here are a few simple ways to load your data incrementally: + +1. [Load files based on modification date](#load-files-based-on-modification-date). Only load files that have been updated since the last time `dlt` processed them. `dlt` checks the files' metadata (like the modification date) and skips those that haven't changed. +2. [Load new records based on a specific column](#load-new-records-based-on-a-specific-column). You can load only the new or updated records by looking at a specific column, like `updated_at`. Unlike the first method, this approach would read all files every time and then filter the records which was updated. +3. [Combine loading only updated files and records](#combine-loading-only-updated-files-and-records). Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but modified records as well. - ```py - import dlt - from dlt.sources.filesystem import filesystem, read_csv +#### Load files based on modification date +For example, to load only new CSV files with [incremental loading](../../../general-usage/incremental-loading) you can use `apply_hints` method. - # This configuration will only consider new csv files - new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") - # add incremental on modification time - new_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) +```py +import dlt +from dlt.sources.filesystem import filesystem, read_csv + +# This configuration will only consider new csv files +new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") +# add incremental on modification time +new_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) + +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run((new_files | read_csv()).with_name("csv_files")) +print(load_info) +``` + +#### Load new records based on a specific column + +In this example we load only new records based on the field called `updated_at`. This method may be useful if you are not able to +filter files by modification date because for example, all files are modified each time new record is appeared. +```py +import dlt +from dlt.sources.filesystem import filesystem, read_csv + +# We consider all csv files +all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") + +# But filter out only updated records +filesystem_pipe = (all_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at")) +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run(filesystem_pipe) +print(load_info) +``` + +#### Combine loading only updated files and records + +```py +import dlt +from dlt.sources.filesystem import filesystem, read_csv - pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") - load_info = pipeline.run((new_files | read_csv()).with_name("csv_files")) - print(load_info) - print(pipeline.last_trace.last_normalize_info) - ``` +# This configuration will only consider modified csv files +new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") +new_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) + +# And in each modified file we filter out only updated records +filesystem_pipe = (new_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at")) +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run(filesystem_pipe) +print(load_info) +``` + +### 6. Filter files + +If you need to filter out files based on their metadata, you can easily do this using the `add_filter` method. +Within your filtering function, you'll have access to [any field](advanced#fileitem-fields) of the `FileItem` representation. + +#### Filter by name +To filter only files that have `London` and `Berlin` in their names, you can do the following: +```py +import dlt +from dlt.sources.filesystem import filesystem, read_csv + +# Filter files accessing file_name field +filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") +filtered_files.add_filter(lambda item: ("London" in item.file_name) or ("Berlin" in item.file_name)) + +filesystem_pipe = (filtered_files | read_csv()) +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run(filesystem_pipe) +print(load_info) +``` + +:::tip +You could also use `file_glob` to filter files by names. It works very well in simple cases, for example, filtering by extention: +```py +from dlt.sources.filesystem import filesystem + +filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="**/*.json") +``` +::: + +#### Filter by size + +If for some reason you only want to load small files, you can also do that: + +```py +import dlt +from dlt.sources.filesystem import filesystem, read_csv + +MAX_SIZE_IN_BYTES = 10 + +# Filter files accessing size_in_bytes field +filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv") +filtered_files.add_filter(lambda item: item.size_in_bytes < MAX_SIZE_IN_BYTES) + +filesystem_pipe = (filtered_files | read_csv()) +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run(filesystem_pipe) +print(load_info) +``` ## Troubleshooting diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md index b7fb4d67e9..32e0df77c2 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md @@ -1,17 +1,17 @@ --- title: Filesystem & Buckets -description: dlt verified source for Filesystem & Buckets +description: dlt-verified source for Filesystem & Buckets keywords: [readers source and filesystem, files, filesystem, readers source, cloud storage] --- -The Filesystem source is a generic source that allows seamless loading files from the following locations: +The Filesystem source allows seamless loading of files from the following locations: * AWS S3 * Google Cloud Storage * Google Drive * Azure * local filesystem -The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files, and allows customization for loading any type of structured files. +The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files. import DocCardList from '@theme/DocCardList'; diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/index.md index cc54c8dc67..8c67ce206c 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/index.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/index.md @@ -12,7 +12,7 @@ Planning to use `dlt` in production and need a source that isn't listed? We're h ### Core sources item.label === '30+ SQL Databases' || item.label === 'REST API generic source' || item.label === 'Filesystem' +item => item.label === '30+ SQL Databases' || item.label === 'REST API generic source' || item.label === 'Filesystem & buckets' )} /> ### Verified sources @@ -24,7 +24,7 @@ If you couldn't find a source implementation, you can easily create your own, ch ::: item.label !== '30+ SQL Databases' && item.label !== 'REST API generic source'&& item.label !== 'Filesystem' +item => item.label !== '30+ SQL Databases' && item.label !== 'REST API generic source'&& item.label !== 'Filesystem & buckets' )} /> ### What's the difference between core and verified sources? diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index cd00903004..9fb36c8bae 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -59,6 +59,7 @@ const sidebars = { { type: 'category', label: 'Filesystem & buckets', + description: 'AWS S3, GCP, Azure, local files', link: { type: 'doc', id: 'dlt-ecosystem/verified-sources/filesystem/index',