Skip to content

Commit

Permalink
Rebase on the latest devel
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Sep 11, 2024
1 parent 277613f commit 0add817
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
---
title: Advanced filesystem usage
title: Advanced Filesystem Usage
description: Use filesystem source as a building block
keywords: [readers source and filesystem, files, filesystem, readers source, cloud storage]
---

The filesystem source provides the building blocks to load data from files. This section explains how you can customize the filesystem source for your use case.

## Standalone Filesystem Resource
## Standalone filesystem resource

You can use the [standalone filesystem](../../../general-usage/resource#declare-a-standalone-resource) resource to list files in cloud storage or a local filesystem. This allows you to customize file readers or manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html).

Expand All @@ -30,7 +30,7 @@ The filesystem ensures consistent file representation across bucket types and of
- File content is typically not loaded (you can control it with the `extract_content` parameter of the filesystem resource). Instead, full file info and methods to access content are available.
- Users can request an authenticated [fsspec AbstractFileSystem](https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/spec.html#AbstractFileSystem) instance.

#### `FileItem` fields:
#### `FileItem` fields

- `file_url` - complete URL of the file (e.g. `s3://bucket-name/path/file`). This field serves as a primary key.
- `file_name` - name of the file from the bucket URL.
Expand All @@ -52,13 +52,13 @@ When using a nested or recursive glob pattern, `relative_path` will include the
- `open()` - method which provides a file object when opened.
- `filesystem` - field, which gives access to authorized `AbstractFilesystem` with standard fsspec methods.

## Create Your Own Transformer
## Create your own transformer

Although the `filesystem` resource yields the files from cloud storage or a local filesystem, you need to apply a transformer resource to retrieve the records from files. `dlt` natively supports three file types: `csv`, `parquet`, and `jsonl` (more details in [filesystem transformer resource](../filesystem/basic#2-choose-the-right-transformer-resource)).

But you can easily create your own. In order to do this, you just need a function that takes as input a `FileItemDict` iterator and yields a list of records (recommended for performance) or individual records.

### Example: Read Data from Excel Files
### Example: read data from Excel files

The code below sets up a pipeline that reads from an Excel file using a standalone transformer:

Expand Down Expand Up @@ -97,7 +97,7 @@ load_info = pipeline.run(example_xls.with_name("example_xls_data"))
print(load_info)
```

### Example: Read Data from XML Files
### Example: read data from XML files

You can use any third-party library to parse an `xml` file (e.g., [BeautifulSoup](https://pypi.org/project/beautifulsoup4/), [pandas](https://pandas.pydata.org/docs/reference/api/pandas.read_xml.html)). In the following example, we will be using the [xmltodict](https://pypi.org/project/xmltodict/) Python library.

Expand Down Expand Up @@ -135,7 +135,7 @@ load_info = pipeline.run(example_xml.with_name("example_xml_data"))
print(load_info)
```

## Clean Files After Loading
## Clean files after loading

You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows:

Expand All @@ -153,7 +153,7 @@ fs_client = fsspec_from_resource(gs_resource)
fs_client.ls("ci-test-bucket/standard_source/samples")
```

## Copy Files Locally
## Copy files locally

To copy files locally, add a step in the filesystem resource and then load the listing to the database:

Expand All @@ -162,7 +162,6 @@ import os

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

def _copy(item: FileItemDict) -> FileItemDict:
Expand Down
160 changes: 134 additions & 26 deletions docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ keywords: [readers source and filesystem, files, filesystem, readers source, clo
import Header from '../_source-info-header.md';
<Header/>

Filesystem source is a generic source that allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure) or the local filesystem seamlessly. Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files.
Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure) or the local filesystem seamlessly. Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files.

To load unstructured data (`.pdf`, `.txt`, e-mail), please refer to the [unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data).

## How Filesystem source works?

The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.

Filesystem source loads data in two steps:
1. It [accesses the files](#1-initialize-a-filesystem-resource) in your remote or local file storage without actually reading the content yet. At this point, you can [filter files by metadata or name](#6-filter-files). You can also set up [incremental loading](#5-incremental-loading) to load only new files.
2. [The transformer](#2-choose-the-right-transformer-resource) reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or [perform incremental loading](#load-new-records-based-on-a-specific-column) based on the file content.

## Quick example

```py
Expand Down Expand Up @@ -228,10 +236,10 @@ and default credentials. To learn more about adding credentials to your pipeline
## Usage

The filesystem source is quite unique since it provides you with building blocks for loading data from files.
First, it iterates over files in the storage and then process each file to yield the records.
First, it iterates over files in the storage and then processes each file to yield the records.
Usually, you need two resources:

1. The filesystem resource enumerates files in a selected bucket using a glob pattern, returning details as `FileInfo` in customizable page sizes.
1. The `filesystem` resource enumerates files in a selected bucket using a glob pattern, returning details as `FileInfo` in customizable page sizes.
2. One of the available transformer resources to process each file in a specific transforming function and yield the records.

### 1. Initialize a `filesystem` resource
Expand All @@ -246,11 +254,21 @@ filesystem_source = filesystem(
)
```
or taken from the config:
```py
from dlt.sources.filesystem import filesystem

filesystem_source = filesystem()
```
* python code:

```py
from dlt.sources.filesystem import filesystem

filesystem_source = filesystem()
```

* configuration file:
```toml
[sources.filesystem]
bucket_url="file://Users/admin/Documents/csv_files"
file_glob="*.csv"
```

Full list of `filesystem` resource parameters:

Expand All @@ -263,7 +281,7 @@ Full list of `filesystem` resource parameters:
### 2. Choose the right transformer resource

The current implementation of the filesystem source natively supports three file types: `csv`, `parquet`, and `jsonl`.
You can apply any of the above or create your own [transformer](advanced#create-your-own-transformer). To apply the selected transformer
You can apply any of the above or [create your own transformer](advanced#create-your-own-transformer). To apply the selected transformer
resource, use pipe notation `|`:

```py
Expand All @@ -277,10 +295,10 @@ filesystem_pipe = filesystem(

#### Available transformers

- `read_csv()`
- `read_jsonl()`
- `read_parquet()`
- `read_csv_duckdb()`
- `read_csv()` - process `csv` files using `pandas`
- `read_jsonl()` - process `jsonl` files chuck by chunk
- `read_parquet()` - process `parquet` files using `pyarrow`
- `read_csv_duckdb()` - this transformer process `csv` files using DuckDB, which usually shows better performance, than `pandas`.

:::tip
We advise that you give each resource a
Expand Down Expand Up @@ -318,27 +336,117 @@ filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```

### 5. Incremental loading

To load only new CSV files with [incremental loading](../../../general-usage/incremental-loading):
Here are a few simple ways to load your data incrementally:

1. [Load files based on modification date](#load-files-based-on-modification-date). Only load files that have been updated since the last time `dlt` processed them. `dlt` checks the files' metadata (like the modification date) and skips those that haven't changed.
2. [Load new records based on a specific column](#load-new-records-based-on-a-specific-column). You can load only the new or updated records by looking at a specific column, like `updated_at`. Unlike the first method, this approach would read all files every time and then filter the records which was updated.
3. [Combine loading only updated files and records](#combine-loading-only-updated-files-and-records). Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but modified records as well.

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv
#### Load files based on modification date
For example, to load only new CSV files with [incremental loading](../../../general-usage/incremental-loading) you can use `apply_hints` method.

# This configuration will only consider new csv files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider new csv files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
```

#### Load new records based on a specific column

In this example we load only new records based on the field called `updated_at`. This method may be useful if you are not able to
filter files by modification date because for example, all files are modified each time new record is appeared.
```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# We consider all csv files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")

# But filter out only updated records
filesystem_pipe = (all_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

#### Combine loading only updated files and records

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```
# This configuration will only consider modified csv files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))

# And in each modified file we filter out only updated records
filesystem_pipe = (new_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

### 6. Filter files

If you need to filter out files based on their metadata, you can easily do this using the `add_filter` method.
Within your filtering function, you'll have access to [any field](advanced#fileitem-fields) of the `FileItem` representation.

#### Filter by name
To filter only files that have `London` and `Berlin` in their names, you can do the following:
```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# Filter files accessing file_name field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: ("London" in item.file_name) or ("Berlin" in item.file_name))

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

:::tip
You could also use `file_glob` to filter files by names. It works very well in simple cases, for example, filtering by extention:
```py
from dlt.sources.filesystem import filesystem

filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="**/*.json")
```
:::

#### Filter by size

If for some reason you only want to load small files, you can also do that:

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

MAX_SIZE_IN_BYTES = 10

# Filter files accessing size_in_bytes field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: item.size_in_bytes < MAX_SIZE_IN_BYTES)

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
```

## Troubleshooting

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
---
title: Filesystem & Buckets
description: dlt verified source for Filesystem & Buckets
description: dlt-verified source for Filesystem & Buckets
keywords: [readers source and filesystem, files, filesystem, readers source, cloud storage]
---

The Filesystem source is a generic source that allows seamless loading files from the following locations:
The Filesystem source allows seamless loading of files from the following locations:
* AWS S3
* Google Cloud Storage
* Google Drive
* Azure
* local filesystem

The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files, and allows customization for loading any type of structured files.
The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files.

import DocCardList from '@theme/DocCardList';

Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/dlt-ecosystem/verified-sources/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Planning to use `dlt` in production and need a source that isn't listed? We're h
### Core sources

<DocCardList items={useCurrentSidebarCategory().items.filter(
item => item.label === '30+ SQL Databases' || item.label === 'REST API generic source' || item.label === 'Filesystem'
item => item.label === '30+ SQL Databases' || item.label === 'REST API generic source' || item.label === 'Filesystem & buckets'
)} />

### Verified sources
Expand All @@ -24,7 +24,7 @@ If you couldn't find a source implementation, you can easily create your own, ch
:::

<DocCardList items={useCurrentSidebarCategory().items.filter(
item => item.label !== '30+ SQL Databases' && item.label !== 'REST API generic source'&& item.label !== 'Filesystem'
item => item.label !== '30+ SQL Databases' && item.label !== 'REST API generic source'&& item.label !== 'Filesystem & buckets'
)} />

### What's the difference between core and verified sources?
Expand Down
1 change: 1 addition & 0 deletions docs/website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const sidebars = {
{
type: 'category',
label: 'Filesystem & buckets',
description: 'AWS S3, GCP, Azure, local files',
link: {
type: 'doc',
id: 'dlt-ecosystem/verified-sources/filesystem/index',
Expand Down

0 comments on commit 0add817

Please sign in to comment.