Skip to content

Commit

Permalink
add support for parquet files and compression on jsonl files in files…
Browse files Browse the repository at this point in the history
…ystem dataframe implementation
  • Loading branch information
sh-rp committed Jun 26, 2024
1 parent 36e94af commit 20bf9ce
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
31 changes: 27 additions & 4 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,20 +564,43 @@ def iter_df(
) -> Generator[DataFrame, None, None]:
"""Provide dataframes via duckdb"""
import duckdb

duckdb.register_filesystem(self.fs_client)
from duckdb import InvalidInputException

# create in memory table, for now we read all available files
db = duckdb.connect(":memory:")
db.register_filesystem(self.fs_client)
files = self.list_table_files(table)
if not files:
return None

file_type = os.path.splitext(files[0])[1][1:]
if file_type == "jsonl":
read_command = "read_json"
elif file_type == "parquet":
read_command = "read_parquet"
else:
raise AssertionError("Unknown filetype")

protocol = "" if self.is_local_filesystem else f"{self.config.protocol}://"
files_string = ",".join([f"'{protocol}{f}'" for f in files])
db.sql(f"CREATE TABLE {table} AS SELECT * FROM read_json([{files_string}]);")

def _build_sql_string(read_params: str = "") -> str:
return (
f"SELECT * FROM {read_command}([{files_string}]{read_params}) OFFSET {offset} LIMIT"
f" {batch_size}"
)

# yield in batches
offset = 0
while True:
df = db.sql(f"SELECT * FROM {table} OFFSET {offset} LIMIT {batch_size}").df()
try:
df = db.sql(_build_sql_string()).df()
except InvalidInputException:
# if jsonl and could not read, try with gzip setting
if file_type == "jsonl":
df = db.sql(_build_sql_string(read_params=", compression = 'gzip'")).df()
else:
raise
if len(df.index) == 0:
break
yield df
Expand Down
11 changes: 7 additions & 4 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ def test_read_interfaces_filesystem(destination_config: DestinationTestConfigura
# we force multiple files per table, they may only hold 50 items
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "50"

if destination_config.file_format not in ["parquet", "jsonl"]:
pytest.skip("Test only works for jsonl and parquet")

pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", dev_mode=True
"read_pipeline",
dataset_name="read_test",
dev_mode=True,
)

# run source
s = source()
pipeline.run(
s,
)
pipeline.run(s, loader_file_format=destination_config.file_format)

# get one df
df = pipeline.dataset.df(table="items", batch_size=5)
Expand Down

0 comments on commit 20bf9ce

Please sign in to comment.