Skip to content

Commit

Permalink
Fix snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Sep 11, 2024
1 parent 9e90d7b commit 6de258b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ description: Use filesystem source as a building block
keywords: [readers source and filesystem, files, filesystem, readers source, cloud storage]
---

The filesystem source actually provides you with the building blocks to facilitate loading data from files. This section aims to give you more information about how you can customize the filesystem source for your use case.
The filesystem source provides the building blocks to load data from files. This section explains how you can customize the filesystem source for your use case.

## Standalone Filesystem Resource

You can use the [standalone filesystem](../../../general-usage/resource#declare-a-standalone-resource) resource to list files in cloud storage or local filesystem. This allows you to customize file readers or manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html).
You can use the [standalone filesystem](../../../general-usage/resource#declare-a-standalone-resource) resource to list files in cloud storage or a local filesystem. This allows you to customize file readers or manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html).

```py
from dlt.sources.filesystem import filesystem

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
pipeline.run(files)
```
Expand Down Expand Up @@ -43,23 +46,30 @@ When using a nested or recursive glob pattern, `relative_path` will include the

### File manipulation

[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L40), backed by a dictionary implementation, offers these helper methods:
[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L40), backed by a dictionary implementation, offers these helpers:

- `read_bytes()` - method, which returns the file content as bytes.
- `open()` - method which provides a file object when opened.
- `filesystem` - field, which gives access to authorized `AbstractFilesystem` with standard fsspec methods.

## Create Your Own Transformer

While the `filesystem` resource yields the files from cloud storage or local filesystem, to get the actual records from the files you need to apply a transformer resource. `dlt` natively supports three file types: `csv`, `parquet`, and `jsonl` (more details in [filesystem transformer resource](../filesystem/basic#2-choose-the-right-transformer-resource)).
Although the `filesystem` resource yields the files from cloud storage or a local filesystem, you need to apply a transformer resource to retrieve the records from files. `dlt` natively supports three file types: `csv`, `parquet`, and `jsonl` (more details in [filesystem transformer resource](../filesystem/basic#2-choose-the-right-transformer-resource)).

But you can easily create your own. In order to do this, you just need a function that takes as input a `FileItemDict` iterator and yields a list of records (recommended for performance) or individual records.

### Example: Read Data from Excel Files

To set up a pipeline that reads from an Excel file using a standalone transformer:
The code below sets up a pipeline that reads from an Excel file using a standalone transformer:

```py
import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an Excel file.
@dlt.transformer(standalone=True)
def read_excel(
Expand All @@ -80,13 +90,9 @@ example_xls = filesystem(
bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx"
) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet.

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xls_data",)
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = dlt.run(
example_xls.with_name("example_xls_data"),
destination="duckdb",
dataset_name="example_xls_data",
)

load_info = pipeline.run(example_xls.with_name("example_xls_data"))
# Print the loading information.
print(load_info)
```
Expand All @@ -96,6 +102,13 @@ print(load_info)
You can use any third-party library to parse an `xml` file (e.g., [BeautifulSoup](https://pypi.org/project/beautifulsoup4/), [pandas](https://pandas.pydata.org/docs/reference/api/pandas.read_xml.html)). In the following example, we will be using the [xmltodict](https://pypi.org/project/xmltodict/) Python library.

```py
import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an XML file.
@dlt.transformer(standalone=True)
def read_excel(
Expand All @@ -116,12 +129,9 @@ example_xls = filesystem(
bucket_url=BUCKET_URL, file_glob="../directory/example.xml"
) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet.

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xml_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = dlt.run(
example_xls.with_name("example_xml_data"),
destination="duckdb",
dataset_name="example_xml_data",
)
load_info = pipeline.run(example_xls.with_name("example_xml_data"))

# Print the loading information.
print(load_info)
Expand All @@ -132,11 +142,13 @@ print(load_info)
You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows:

```py
from filesystem import filesystem, fsspec_from_resource
from dlt.sources.filesystem import filesystem, fsspec_from_resource, read_csv

# get filesystem source
gs_resource = filesystem("gs://ci-test-bucket/")
# extract files
pipeline.run(gs_resource | read_csv)
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
pipeline.run(gs_resource | read_csv())
# get fs client
fs_client = fsspec_from_resource(gs_resource)
# do any operation
Expand All @@ -148,6 +160,13 @@ fs_client.ls("ci-test-bucket/standard_source/samples")
To copy files locally, add a step in the filesystem resource and then load the listing to the database:

```py
import os

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

def _copy(item: FileItemDict) -> FileItemDict:
# instantiate fsspec and copy file
dest_file = os.path.join(local_folder, item["file_name"])
Expand All @@ -158,6 +177,8 @@ def _copy(item: FileItemDict) -> FileItemDict:
# return file item unchanged
return item

BUCKET_URL = "gs://ci-test-bucket/"

# use recursive glob pattern and add file copy step
downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)

Expand All @@ -166,6 +187,7 @@ downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)
listing = list(downloader)
print(listing)
# download to table "listing"
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(
downloader.with_name("listing"), write_disposition="replace"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ To load unstructured data (`.pdf`, `.txt`, e-mail), please refer to the [unstruc

```py
import dlt
from dlt.filesystem import filesystem, read_parquet
from dlt.sources.filesystem import filesystem, read_parquet

filesystem_resource = filesystem(
bucket_url="file://Users/admin/Documents/parquet_files",
Expand Down Expand Up @@ -50,7 +50,7 @@ To get started with your data pipeline, follow these steps:
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem_pipeline.py)
with the filesystem as the source and [duckdb](../../destinations/duckdb.md) as the destination.

2. If you'd like to use a different destination, simply replace `duckdb` with the name of your
2. If you would like to use a different destination, simply replace `duckdb` with the name of your
preferred [destination](../../destinations).

3. After running this command, a new directory will be created with the necessary files and
Expand Down Expand Up @@ -122,7 +122,7 @@ You don't need any credentials for the local filesystem.

### Add credentials to dlt pipeline

To provide credentials to the filesystem source, you can use any method available in `dlt`.
To provide credentials to the filesystem source, you can use [any method available](../../../general-usage/credentials/setup#available-config-providers) in `dlt`.
One of the easiest ways is to use configuration files. The `.dlt` folder in your working directory
contains two files: `config.toml` and `secrets.toml`. Sensitive information, like passwords and
access tokens, should only be put into `secrets.toml`, while any other configuration, like the path to
Expand Down Expand Up @@ -238,7 +238,7 @@ Usually, you need two resources:

All parameters of the resource can be specified directly in code:
```py
from dlt.filesystem import filesystem
from dlt.sources.filesystem import filesystem

filesystem_source = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
Expand All @@ -247,7 +247,7 @@ filesystem_source = filesystem(
```
or taken from the config:
```py
from dlt.filesystem import filesystem
from dlt.sources.filesystem import filesystem

filesystem_source = filesystem()
```
Expand All @@ -267,7 +267,7 @@ You can apply any of the above or create your own [transformer](advanced#create-
resource, use pipe notation `|`:

```py
from dlt.filesystem import filesystem, read_csv
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
Expand All @@ -294,7 +294,7 @@ want and that each pipeline uses a

```py
import dlt
from dlt.filesystem import filesystem, read_csv
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
Expand All @@ -308,7 +308,7 @@ For more information on how to create and run the pipeline, read the [Walkthroug

```py
import dlt
from dlt.filesystem import filesystem, read_csv
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# tell dlt to merge on date
Expand All @@ -326,10 +326,15 @@ print(pipeline.last_trace.last_normalize_info)
To load only new CSV files with [incremental loading](../../../general-usage/incremental-loading):

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider new csv files
new_files = filesystem(bucket_url=BUCKET_URL, file_glob="directory/*.csv")
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
Expand Down

0 comments on commit 6de258b

Please sign in to comment.