From 7af9930f77022d747d42678f215564141ee8c7bb Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Mon, 26 Aug 2024 10:00:09 +0200 Subject: [PATCH] Refactor filesystem doc --- .../verified-sources/filesystem.md | 535 ------------------ .../verified-sources/filesystem/advanced.md | 194 +++++++ .../verified-sources/filesystem/index.md | 339 +++++++++++ 3 files changed, 533 insertions(+), 535 deletions(-) delete mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md create mode 100644 docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md deleted file mode 100644 index 7552a0acb2..0000000000 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ /dev/null @@ -1,535 +0,0 @@ ---- -title: Filesystem -description: dlt verified source for Readers Source and Filesystem -keywords: [readers source and filesystem, filesystem, readers source] ---- -import Header from './_source-info-header.md'; - -# Readers Source and Filesystem - -
- -This verified source easily streams files from AWS S3, Google Cloud Storage, Google Drive, Azure, or local filesystem using the reader source. - -Sources and resources that can be used with this verified source are: - -| Name | Type | Description | -|--------------|----------------------|---------------------------------------------------------------------------| -| readers | Source | Lists and reads files with resource `filesystem` and readers transformers | -| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern | -| read_csv | Resource-transformer | Reads csv file with **Pandas** chunk by chunk | -| read_jsonl | Resource-transformer | Reads jsonl file content and extract the data | -| read_parquet | Resource-transformer | Reads parquet file content and extract the data with **Pyarrow** | - -## Setup Guide - -### Grab credentials - -This source can access various bucket types, including: - -- AWS S3. -- Google Cloud Storage. -- Google Drive. -- Azure Blob Storage. -- Local Storage - -To access these, you'll need secret credentials: - -#### AWS S3 credentials - -To get AWS keys for S3 access: - -1. Access IAM in AWS Console. -2. Select "Users", choose a user, and open "Security credentials". -3. Click "Create access key" for AWS ID and Secret Key. - -For more info, see -[AWS official documentation.](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) - -#### Google Cloud Storage / Google Drive credentials - -To get GCS/GDrive access: - -1. Log in to [console.cloud.google.com](http://console.cloud.google.com/). -2. Create a [service account](https://cloud.google.com/iam/docs/service-accounts-create#creating). -3. Enable "Cloud Storage API" / "Google Drive API"; see - [Google's guide](https://support.google.com/googleapi/answer/6158841?hl=en). -4. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > - "ADD KEY" > "CREATE" to get a JSON credential file. -5. Grant the service account appropriate permissions for cloud storage access. - -For more info, see how to -[create service account](https://support.google.com/a/answer/7378726?hl=en). - -#### Azure Blob Storage credentials - -To obtain Azure blob storage access: - -1. Go to Azure Portal (portal.azure.com). -2. Select "Storage accounts" > your storage. -3. Click "Settings" > "Access keys". -4. View account name and two keys (primary/secondary). Keep keys confidential. - -For more info, see -[Azure official documentation](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal). - -### Initialize the verified source - -To get started with your data pipeline, follow these steps: - -1. Enter the following command: - - ```sh - dlt init filesystem duckdb - ``` - - [This command](../../reference/command-line-interface) will initialize - [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem_pipeline.py) - with filesystem as the [source](../../general-usage/source) and - [duckdb](../destinations/duckdb.md) as the [destination](../destinations). - -2. If you'd like to use a different destination, simply replace `duckdb` with the name of your - preferred [destination](../destinations). - -3. After running this command, a new directory will be created with the necessary files and - configuration settings to get started. - -For more information, read the -[Walkthrough: Add a verified source.](../../walkthroughs/add-a-verified-source) - -### Add credentials - -1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive - information securely, like access tokens. Keep this file safe. Here's its format for service - account authentication: - - ```toml - [sources.filesystem.credentials] # use [sources.readers.credentials] for the "readers" source - # For AWS S3 access: - aws_access_key_id="Please set me up!" - aws_secret_access_key="Please set me up!" - - # For GCS bucket / Google Drive access: - client_email="Please set me up!" - private_key="Please set me up!" - project_id="Please set me up!" - - # For Azure blob storage access: - azure_storage_account_name="Please set me up!" - azure_storage_account_key="Please set me up!" - ``` - -2. Finally, enter credentials for your chosen destination as per the [docs](../destinations/). - -3. You can pass the bucket URL and glob pattern or use `config.toml`. For local filesystems, use - `file://` as follows: - - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url='file://Users/admin/Documents/csv_files' - file_glob="*" - ``` - or skip the schema and provide the local path in a format native for your operating system as follows: - - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url='~\Documents\csv_files\' - file_glob="*" - ``` - - In the example above we use Windows path to current user's Documents folder. Mind that literal toml string (single quotes) - was used to conveniently use the backslashes without need to escape. - - For remote file systems you need to add the schema, it will be used to get the protocol being - used. The protocols that can be used are: - - - For Azure blob storage - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="az:////" - ``` - - - `az://` indicates the Azure Blob Storage protocol. - - `container_name` is the name of the container. - - `path_to_files/` is a directory path within the container. - - `CAUTION: For Azure, use adlfs>=2023.9.0. Older versions mishandle globs.` - - - For Google Drive - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="gdrive:////" - ``` - - - `gdrive://` indicates that the Google Drive protocol. - - `folder_name` refers to a folder within Google Drive. - - `subfolder_or_file_path/` is a sub-folder or directory path within the my-bucket folder. - - - For Google Storage - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="gs:////" - ``` - - - `gs://` indicates the Google Cloud Storage protocol. - - `bucket_name` is the name of the bucket. - - `path_to_files/` is a directory path within the bucket. - - - For AWS S3 - ```toml - [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="s3:////" - ``` - - - `s3://` indicates the AWS S3 protocol. - - `bucket_name` is the name of the bucket. - - `path_to_files/` is a directory path within the bucket. - -### Use local file system paths -You can use both native local file system paths and in form of `file:` uri. Absolute, relative and UNC Windows paths are supported. -You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows -the same rules to specify the `bucket_url`. - -:::caution -Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception. - - To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). - **Note that Python glob does not work with extended UNC paths** so you will not be able to use them - -```toml -[sources.filesystem] -bucket_url = '\\?\C:\a\b\c' -``` -::: - -## Run the pipeline - -1. Before running the pipeline, ensure that you have installed all the necessary dependencies by - running the command: - - ```sh - pip install -r requirements.txt - ``` - -2. Install optional modules: - - - For AWS S3: - ```sh - pip install s3fs - ``` - - For Azure blob: - ```sh - pip install adlfs>=2023.9.0 - ``` - - GCS storage: No separate module needed. - -3. You're now ready to run the pipeline! To get started, run the following command: - - ```sh - python filesystem_pipeline.py - ``` - -4. Once the pipeline has finished running, you can verify that everything loaded correctly by using - the following command: - - ```sh - dlt pipeline show - ``` - - For example, the `pipeline_name` for the above pipeline example is `standard_filesystem`, you may - also use any custom name instead. - -For more information, read the [Walkthrough: Run a pipeline](../../walkthroughs/run-a-pipeline). - -## Sources and resources - -`dlt` works on the principle of [sources](../../general-usage/source) and -[resources](../../general-usage/resource). - -### Source `readers` - -This source offers chunked file readers as resources, which can be optionally customized. Provided resources include: - -- `read_csv()` -- `read_jsonl()` -- `read_parquet()` - -```py -@dlt.source(_impl_cls=ReadersSource, spec=FilesystemConfigurationResource) -def readers( - bucket_url: str = dlt.secrets.value, - credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, - file_glob: Optional[str] = "*", -) -> Tuple[DltResource, ...]: - ... -``` - -- `bucket_url`: The url to the bucket. -- `credentials`: The credentials to the filesystem of fsspec `AbstractFilesystem` instance. -- `file_glob`: Glob filter for files. Defaults to non-recursive listing in the bucket. - -:::tip -We advise that you give each resource a -[specific name](../../general-usage/resource#duplicate-and-rename-resources) -before loading with `pipeline.run`. This will make sure that data goes to a table with the name you -want and that each pipeline uses a -[separate state for incremental loading.](../../general-usage/state#read-and-write-pipeline-state-in-a-resource) -::: - - -### Resource `filesystem` - -This resource lists files in `bucket_url` based on the `file_glob` pattern, returning them as -[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L22) -with data access methods. These can be paired with transformers for enhanced processing. - -```py -@dlt.resource( - primary_key="file_url", spec=FilesystemConfigurationResource, standalone=True -) -def filesystem( - bucket_url: str = dlt.secrets.value, - credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, - file_glob: Optional[str] = "*", - files_per_page: int = DEFAULT_CHUNK_SIZE, - extract_content: bool = False, -) -> Iterator[List[FileItem]]: - ... -``` - -- `bucket_url`: URL of the bucket. -- `credentials`: Filesystem credentials of `AbstractFilesystem` instance. -- `file_glob`: File filter in glob format. Defaults to listing all non-recursive files -in bucket URL. -- `files_per_page`: Number of files processed at once. Default: 100. -- `extract_content`: If true, the content of the file will be read and returned in the resource. Default: False. - - -## Filesystem Integration and Data Extraction Guide - -### Filesystem Usage - -- The filesystem tool enumerates files in a selected bucket using a glob pattern, returning details as FileInfo in customizable page sizes. - -- This resource integrates with transform functions and transformers for customized extraction pipelines. - -To load data into a specific table (instead of the default filesystem table), see the snippet below: - -```py -@dlt.transformer(standalone=True) -def read_csv(items, chunksize: int = 15): - """Reads csv file with Pandas chunk by chunk.""" - ... - -# list only the *.csv in specific folder and pass the file items to read_csv() -met_files = ( - filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv") - | read_csv() -) -# load to met_csv table using with_name() -pipeline.run(met_files.with_name("csv_data")) -``` - -Use the -[standalone filesystem](../../general-usage/resource#declare-a-standalone-resource) -resource to list files in s3, GCS, and Azure buckets. This allows you to customize file readers or -manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html). -```py -files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv") -pipeline.run(files) -``` -The filesystem ensures consistent file representation across bucket types and offers methods to access and read -data. You can quickly build pipelines to: - -- Extract text from PDFs. -- Stream large file content directly from buckets. -- Copy files locally. - -### `FileItem` Representation - -- All dlt sources/resources that yield files follow the [FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L22) contract. -- File content is typically not loaded; instead, full file info and methods to access content are - available. -- Users can request an authenticated [fsspec AbstractFileSystem](https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/spec.html#AbstractFileSystem) instance. - -#### `FileItem` Fields: - -- `file_url` - Complete URL of the file; also the primary key (e.g. `s3://bucket-name/path/file`). -- `file_name` - Name of the file from the bucket URL. -- `relative_path` - Set when doing `glob`, is a relative path to a `bucket_url` argument. -- `mime_type` - File's mime type; sourced from the bucket provider or inferred from its extension. -- `modification_date` - File's last modification time (format: `pendulum.DateTime`). -- `size_in_bytes` - File size. -- `file_content` - Content, provided upon request. - -:::info -When using a nested or recursive glob pattern, `relative_path` will include the file's path relative to `bucket_url`. For -instance, using the resource: -`filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")` -will produce file names relative to the `/standard_source/samples` path, such as -`met_csv/A801/A881_20230920.csv`. For local filesystems, POSIX paths (using "/" as separator) are returned. -::: - -### File Manipulation - -[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L22), backed by a dictionary implementation, offers these helper methods: - -- `read_bytes()`: Returns the file content as bytes. -- `open()`: Provides a file object when opened. -- `filesystem`: Gives access to an authorized `AbstractFilesystem` with standard fsspec methods. - -## Customization - -### Create your own pipeline - -If you wish to create your own pipelines, you can leverage source and resource methods from this -verified source. - -1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows: - - ```py - pipeline = dlt.pipeline( - pipeline_name="standard_filesystem", # Use a custom name if desired - destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post) - dataset_name="filesystem_data_csv" # Use a custom name if desired - ) - ``` - -1. To read and load CSV files: - - ```py - BUCKET_URL = "YOUR_BUCKET_PATH_HERE" # path of the bucket url or local destination - met_files = readers( - bucket_url=BUCKET_URL, file_glob="directory/*.csv" - ).read_csv() - # tell dlt to merge on date - met_files.apply_hints(write_disposition="merge", merge_key="date") - # We load the data into the met_csv table - load_info = pipeline.run(met_files.with_name("table_name")) - print(load_info) - print(pipeline.last_trace.last_normalize_info) - ``` - - - The `file_glob` parameter targets all CSVs in the "met_csv/A801" directory. - - The `print(pipeline.last_trace.last_normalize_info)` line displays the data normalization details from the pipeline's last trace. - - :::info - If you have a default bucket URL set in `.dlt/config.toml`, you can omit the `bucket_url` parameter. - ::: -1. To load only new CSV files with [incremental loading](../../general-usage/incremental-loading): - - ```py - # This configuration will only consider new csv files - new_files = filesystem(bucket_url=BUCKET_URL, file_glob="directory/*.csv") - # add incremental on modification time - new_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) - load_info = pipeline.run((new_files | read_csv()).with_name("csv_files")) - print(load_info) - print(pipeline.last_trace.last_normalize_info) - ``` - -1. To read and load Parquet and JSONL from a bucket: - ```py - jsonl_reader = readers(BUCKET_URL, file_glob="**/*.jsonl").read_jsonl( - chunksize=10000 - ) - # PARQUET reading - parquet_reader = readers(BUCKET_URL, file_glob="**/*.parquet").read_parquet() - # load both folders together to specified tables - load_info = pipeline.run( - [ - jsonl_reader.with_name("jsonl_data"), - parquet_reader.with_name("parquet_data"), - ] - ) - print(load_info) - print(pipeline.last_trace.last_normalize_info) - ``` - - The `file_glob`: Specifies file pattern; reads all JSONL and Parquet files across directories. - - The `chunksize`: Set to 10,000; data read in chunks of 10,000 records each. - - `print(pipeline.last_trace.last_normalize_info)`: Displays the data normalization details from the pipeline's last trace. - -1. To set up a pipeline that reads from an Excel file using a standalone transformer: - - ```py - # Define a standalone transformer to read data from an Excel file. - @dlt.transformer(standalone=True) - def read_excel( - items: Iterator[FileItemDict], sheet_name: str - ) -> Iterator[TDataItems]: - # Import the required pandas library. - import pandas as pd - - # Iterate through each file item. - for file_obj in items: - # Open the file object. - with file_obj.open() as file: - # Read from the Excel file and yield its content as dictionary records. - yield pd.read_excel(file, sheet_name).to_dict(orient="records") - - # Set up the pipeline to fetch a specific Excel file from a filesystem (bucket). - example_xls = filesystem( - bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx" - ) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet. - - # Execute the pipeline and load the extracted data into the "duckdb" destination. - load_info = dlt.run( - example_xls.with_name("example_xls_data"), - destination="duckdb", - dataset_name="example_xls_data", - ) - - # Print the loading information. - print(load_info) - ``` - - The code loads data from `example.xlsx` into the `duckdb` destination. - -1. To copy files locally, add a step in the filesystem resource and then load the listing to the database: - - ```py - def _copy(item: FileItemDict) -> FileItemDict: - # instantiate fsspec and copy file - dest_file = os.path.join(local_folder, item["file_name"]) - # create dest folder - os.makedirs(os.path.dirname(dest_file), exist_ok=True) - # download file - item.fsspec.download(item["file_url"], dest_file) - # return file item unchanged - return item - - # use recursive glob pattern and add file copy step - downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy) - - # NOTE: you do not need to load any data to execute extract, below we obtain - # a list of files in a bucket and also copy them locally - listing = list(downloader) - print(listing) - # download to table "listing" - load_info = pipeline.run( - downloader.with_name("listing"), write_disposition="replace" - ) - # pretty print the information on data that was loaded - print(load_info) - print(listing) - print(pipeline.last_trace.last_normalize_info) - ``` - -1. Cleanup after loading: - - You can get a fsspec client from filesystem resource after it was extracted i.e. in order to delete processed files etc. - The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows: - - ```py - from filesystem import filesystem, fsspec_from_resource - # get filesystem source - gs_resource = filesystem("gs://ci-test-bucket/") - # extract files - pipeline.run(gs_resource | read_csv) - # get fs client - fs_client = fsspec_from_resource(gs_resource) - # do any operation - fs_client.ls("ci-test-bucket/standard_source/samples") - ``` - - \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md new file mode 100644 index 0000000000..2c7d554108 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/advanced.md @@ -0,0 +1,194 @@ +--- +title: Filesystem +description: dlt verified source for Readers Source and Filesystem +keywords: [readers source and filesystem, filesystem, readers source] +--- +import Header from './_source-info-header.md'; +
+ +Filesystem source actually provides you with the building blocks to facilitate loading data from the files. +This section aims to give you more information about how you can use and customize the filesystem source for +your use case. + +## Standalone filesystem resource + +You can use the +[standalone filesystem](../../general-usage/resource#declare-a-standalone-resource) +resource to list files in cloud storage or local filesystem. This allows you to customize file readers or +manage files using [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html). + +```py +files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv") +pipeline.run(files) +``` +The filesystem ensures consistent file representation across bucket types and offers methods to access and read +data. You can quickly build pipelines to: + +- Extract text from PDFs ([unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data)). +- Stream large file content directly from buckets. +- Copy files locally ([copy files](#copy-files-locally)) + +### `FileItem` Representation + +- All dlt sources/resources that yield files follow the [FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L22) contract. +- File content is typically not loaded (you can control is with `extract_content` parameter of filesystem resource). Instead, full file info and methods to access content are + available. +- Users can request an authenticated [fsspec AbstractFileSystem](https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/spec.html#AbstractFileSystem) instance. + +#### `FileItem` Fields: + +- `file_url` - complete URL of the file (e.g. `s3://bucket-name/path/file`). This field serves as a primary key. +- `file_name` - name of the file from the bucket URL. +- `relative_path` - set when doing `glob`, is a relative path to a `bucket_url` argument. +- `mime_type` - file's mime type. It is sourced from the bucket provider or inferred from its extension. +- `modification_date` - file's last modification time (format: `pendulum.DateTime`). +- `size_in_bytes` - file size. +- `file_content` - content, provided upon request. + +:::info +When using a nested or recursive glob pattern, `relative_path` will include the file's path relative to `bucket_url`. For +instance, using the resource: +`filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")` +will produce file names relative to the `/standard_source/samples` path, such as +`met_csv/A801/A881_20230920.csv`. For local filesystems, POSIX paths (using "/" as separator) are returned. +::: + +### File Manipulation + +[FileItem](https://github.com/dlt-hub/dlt/blob/devel/dlt/common/storages/fsspec_filesystem.py#L22), backed by a dictionary implementation, offers these helper methods: + +- `read_bytes()` - method, which returns the file content as bytes. +- `open()` - method which provides a file object when opened. +- `filesystem` - field, which gives access to authorized `AbstractFilesystem` with standard fsspec methods. + +## Create your own transformer + +While `filesystem` resource yields the files from cloud storage of local filesystem, to get the actual records from the +files you need to apply a transformer resource. `dlt` natively supports three file types: `csv`, `parquet` and `jsonl` +(more details in [filesystem transformer resource](../filesystem/index#2-choose-the-right-transformer-resource)). + +But you can easily create your own. In order to do this you just need a function, which takes as an input +`FileItemDict` iterator and yields list of records (recommended for performance) or individual records. + +### Example: read data from Excel files + +To set up a pipeline that reads from an Excel file using a standalone transformer: + + ```py + # Define a standalone transformer to read data from an Excel file. + @dlt.transformer(standalone=True) + def read_excel( + items: Iterator[FileItemDict], sheet_name: str + ) -> Iterator[TDataItems]: + # Import the required pandas library. + import pandas as pd + + # Iterate through each file item. + for file_obj in items: + # Open the file object. + with file_obj.open() as file: + # Read from the Excel file and yield its content as dictionary records. + yield pd.read_excel(file, sheet_name).to_dict(orient="records") + + # Set up the pipeline to fetch a specific Excel file from a filesystem (bucket). + example_xls = filesystem( + bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx" + ) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet. + + # Execute the pipeline and load the extracted data into the "duckdb" destination. + load_info = dlt.run( + example_xls.with_name("example_xls_data"), + destination="duckdb", + dataset_name="example_xls_data", + ) + + # Print the loading information. + print(load_info) + ``` + +### Example: read data from xml files + +You can use any third-party library to parse an `xml` file (ex. [BeautifulSoup](https://pypi.org/project/beautifulsoup4/), [pandas](https://pandas.pydata.org/docs/reference/api/pandas.read_xml.html)). +In the following example, we will be using [xmltodict](https://pypi.org/project/xmltodict/) Python library. + + ```py + # Define a standalone transformer to read data from an xml file. + @dlt.transformer(standalone=True) + def read_excel( + items: Iterator[FileItemDict], sheet_name: str + ) -> Iterator[TDataItems]: + # Import the required xmltodict library. + import xmltodict + + # Iterate through each file item. + for file_obj in items: + # Open the file object. + with file_obj.open() as file: + # Parse the file to dict records + yield xmltodict.parse(file.read()) + + # Set up the pipeline to fetch a specific Excel file from a filesystem (bucket). + example_xls = filesystem( + bucket_url=BUCKET_URL, file_glob="../directory/example.xml" + ) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet. + + # Execute the pipeline and load the extracted data into the "duckdb" destination. + load_info = dlt.run( + example_xls.with_name("example_xml_data"), + destination="duckdb", + dataset_name="example_xml_data", + ) + + # Print the loading information. + print(load_info) + ``` + +## Clean files after loading + +You can get a fsspec client from filesystem resource after it was extracted i.e. in order to delete processed files etc. +The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows: + +```py +from filesystem import filesystem, fsspec_from_resource +# get filesystem source +gs_resource = filesystem("gs://ci-test-bucket/") +# extract files +pipeline.run(gs_resource | read_csv) +# get fs client +fs_client = fsspec_from_resource(gs_resource) +# do any operation +fs_client.ls("ci-test-bucket/standard_source/samples") +``` + +## Copy files locally + +To copy files locally, add a step in the filesystem resource and then load the listing to the database: + + ```py + def _copy(item: FileItemDict) -> FileItemDict: + # instantiate fsspec and copy file + dest_file = os.path.join(local_folder, item["file_name"]) + # create dest folder + os.makedirs(os.path.dirname(dest_file), exist_ok=True) + # download file + item.fsspec.download(item["file_url"], dest_file) + # return file item unchanged + return item + + # use recursive glob pattern and add file copy step + downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy) + + # NOTE: you do not need to load any data to execute extract, below we obtain + # a list of files in a bucket and also copy them locally + listing = list(downloader) + print(listing) + # download to table "listing" + load_info = pipeline.run( + downloader.with_name("listing"), write_disposition="replace" + ) + # pretty print the information on data that was loaded + print(load_info) + print(listing) + print(pipeline.last_trace.last_normalize_info) + ``` + diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md new file mode 100644 index 0000000000..8b9f7a020b --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem/index.md @@ -0,0 +1,339 @@ +--- +title: Filesystem +description: dlt verified source for Readers Source and Filesystem +keywords: [readers source and filesystem, filesystem, readers source] +--- +import Header from './_source-info-header.md'; +
+ +Filesystem source is a generic source which allows to load files from remote (AWS S3, Google Cloud Storage, Google Drive, Azure) or local filesystem seamlessly. Filesystem source natively supports `csv`, `parquet` and `jsonl` files and allow customization for loading any type of structured files. + +To load unstructured data (`.pdf`, `.txt`, e-mail) please refer to [unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data) + +## Setup + +### Prerequisites + +Please make sure `dlt` library is installed. Refer to the [installation guide](../../getting-started). + +### Initialize the filesystem source + +To get started with your data pipeline, follow these steps: + +1. Enter the following command: + + ```sh + dlt init filesystem duckdb + ``` + + [This command](../../reference/command-line-interface) will initialize + [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem_pipeline.py) + with filesystem as the [source](../../general-usage/source) and + [duckdb](../destinations/duckdb.md) as the [destination](../destinations). + +2. If you'd like to use a different destination, simply replace `duckdb` with the name of your + preferred [destination](../destinations). + +3. After running this command, a new directory will be created with the necessary files and + configuration settings to get started. + +## Configuration + +### Get credentials + + + + + +To get AWS keys for S3 access: + +1. Access IAM in AWS Console. +2. Select "Users", choose a user, and open "Security credentials". +3. Click "Create access key" for AWS ID and Secret Key. + +For more info, see +[AWS official documentation.](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) + + + + + +To get GCS/GDrive access: + +1. Log in to [console.cloud.google.com](http://console.cloud.google.com/). +2. Create a [service account](https://cloud.google.com/iam/docs/service-accounts-create#creating). +3. Enable "Cloud Storage API" / "Google Drive API"; see + [Google's guide](https://support.google.com/googleapi/answer/6158841?hl=en). +4. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > + "ADD KEY" > "CREATE" to get a JSON credential file. +5. Grant the service account appropriate permissions for cloud storage access. + +For more info, see how to +[create service account](https://support.google.com/a/answer/7378726?hl=en). + + + + + +To obtain Azure blob storage access: + +1. Go to Azure Portal (portal.azure.com). +2. Select "Storage accounts" > your storage. +3. Click "Settings" > "Access keys". +4. View account name and two keys (primary/secondary). Keep keys confidential. + +For more info, see +[Azure official documentation](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal). + + + + +You don't need any credentials for local filesystem. + + + + +### Add credentials + +To provide credentials to filesystem source you can use any method available in `dlt`. +One of the easiest way is to use configuration files. `.dlt` folder in your working directory +contains two files: `config.toml` and `secrets.toml`. Sensitive information, like passwords and +access tokens should only be put into `secrets.toml`, while any other configuration, like the path to +a bucket can be specified in `config.toml`. + + + + + +```toml +# secrets.toml +[sources.filesystem.credentials] +aws_access_key_id="Please set me up!" +aws_secret_access_key="Please set me up!" + +# config.toml +[sources.filesystem] +bucket_url="s3:////" +``` + + + + +```toml +# secrets.toml +[sources.filesystem.credentials] +azure_storage_account_name="Please set me up!" +azure_storage_account_key="Please set me up!" + +# config.toml +[sources.filesystem] # use [sources.readers.credentials] for the "readers" source +bucket_url="az:////" +``` + + + + +```toml +# secrets.toml +[sources.filesystem.credentials] +client_email="Please set me up!" +private_key="Please set me up!" +project_id="Please set me up!" + +# config.toml +# gdrive +[sources.filesystem] +bucket_url="gdrive:////" + +# Google storage +[sources.filesystem] +bucket_url="gs:////" +``` + + + + +You can use both native local file system paths and in form of `file:` uri. Absolute, relative and UNC Windows paths are supported. +You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows +the same rules to specify the `bucket_url`. + +You could provide an absolute filepath: + +```toml +# config.toml +[sources.filesystem] +bucket_url='file://Users/admin/Documents/csv_files' +``` + +Or skip the schema and provide the local path in a format native for your operating system. For example, for Windows: + +```toml +[sources.filesystem] +bucket_url='~\Documents\csv_files\' +``` + + + + + +You can also specify the credentials using Environment variables. The name of the corresponding environment +variable should be slightly different than corresponding name in the `toml` file. Simply replace dots `.` with double +underscores `__`: + +```sh +export SOURCES__FILESYSTEM__AWS_ACCESS_KEY_ID="Please set me up!" +export SOURCES__FILESYSTEM__AWS_SECRET_ACCESS_KEY="Please set me up!" +``` + +:::tip +To know about other ways of adding credentials to your pipeline, please refer to the section +[Configuration and secrets](../../../general-usage/credentials/setup). +::: + +## Usage + +The filesystem source is quite unique, since it provides you with a building blocks for loading data from files. +Usually you need two resources: + +1. The filesystem resource enumerates files in a selected bucket using a glob pattern, returning details as FileInfo in customizable page sizes. +2. One of the available transformer resources to process each file in specific transforming function and yield the records. + +### 1. Initialize a `filesystem` resource + +Initialize a `filesystem` resource. All parameters of the resource can be specified directly in code: +```py +from dlt.filesystem import filesystem + +filesystem_source = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") +``` +or taken from the config: +```py +from dlt.filesystem import filesystem + +filesystem_source = filesystem() +``` + +Full list of `filesystem` resource parameters: + +* `bucket_url` - URL of the bucket (could be relative path in case of local fileystem). +* `credentials` - cloud storage credentials of `AbstractFilesystem` instance (should be empty for local filesystem). +* `file_glob` - file filter in glob format. Defaults to listing all non-recursive files in the bucket URL. +* `files_per_page` - number of files processed at once. Default value is `100`. +* `extract_content` - if true, the content of the file will be read and returned in the resource. Default value is `False`. + +### 2. Choose the right transformer resource + +Current implementation of filesystem source natively supports three file types: `csv`, `parquet` and `jsonl`. +You can apply any of the above or create your own [transformer](#create-your-own-transformer). To apply the selected transformer +resource use pipe notation `|`: + +```py +from dlt.filesystem import filesystem, read_csv + +filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv() +``` + +#### Available transformers + +- `read_csv()` +- `read_jsonl()` +- `read_parquet()` +- `read_csv_duckdb()` + +:::tip +We advise that you give each resource a +[specific name](../../general-usage/resource#duplicate-and-rename-resources) +before loading with `pipeline.run`. This will make sure that data goes to a table with the name you +want and that each pipeline uses a +[separate state for incremental loading.](../../general-usage/state#read-and-write-pipeline-state-in-a-resource) +::: + +### 3. Create and run a pipeline + +```py +import dlt +from dlt.filesystem import filesystem, read_csv + +filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv() +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +info = pipeline.run(filesystem_pipe) +print(info) +``` + +For more information on how to create and run the pipeline, read the [Walkthrough: Run a pipeline](../../walkthroughs/run-a-pipeline). + +### 4. Apply hints + +```py +import dlt +from dlt.filesystem import filesystem, read_csv + +filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv() +# tell dlt to merge on date +filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date") + +# We load the data into the table_name table +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +load_info = pipeline.run(filesystem_pipe.with_name("table_name")) +print(load_info) +print(pipeline.last_trace.last_normalize_info) +``` + +### 5. Incremental loading + +To load only new CSV files with [incremental loading](../../general-usage/incremental-loading): + + ```py + # This configuration will only consider new csv files + new_files = filesystem(bucket_url=BUCKET_URL, file_glob="directory/*.csv") + # add incremental on modification time + new_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) + load_info = pipeline.run((new_files | read_csv()).with_name("csv_files")) + print(load_info) + print(pipeline.last_trace.last_normalize_info) + ``` + +## Troubleshooting + +### Access extremely long file paths + +Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception. + + To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). + **Note that Python glob does not work with extended UNC paths** so you will not be able to use them + +```toml +[sources.filesystem] +bucket_url = '\\?\C:\a\b\c' +``` + +### Get empty list of files + +If you are running a dlt pipeline with filesystem source and get zero records, we recommend you to check +the configuration of `bucket_url` and `file_glob` parameters. + +For example with Azure Blob storage people sometimes mistake the account name for container name. Make sure +you've set up a url as `"az:///"`. + +Also, please reference the [`glob`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob) +function to configure resource correctly. Use `**` to include recursive files. Note, that local +filesystem supports full Python [`glob`](https://docs.python.org/3/library/glob.html#glob.glob) functionality, +while cloud storage supports restricted `fsspec` version. + + \ No newline at end of file