Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade duckdb to 0.10 #1197

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lilac/data/dataset_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ def stats(self, leaf_path: Path, include_deleted: bool = False) -> StatsResult:
avg_text_length: Optional[int] = None
if leaf.dtype in (STRING, STRING_SPAN):
avg_length_query = f"""
SELECT avg(length(val))
SELECT avg(length(CAST(val AS VARCHAR)))
FROM (SELECT {inner_select} AS val FROM t {where_clause})
USING SAMPLE {SAMPLE_AVG_TEXT_LENGTH};
"""
Expand Down Expand Up @@ -1835,10 +1835,16 @@ def stats(self, leaf_path: Path, include_deleted: bool = False) -> StatsResult:
"""
row = self._query(min_max_query)[0]
result.min_val, result.max_val = row
if is_temporal(leaf.dtype):
sample_where_clause = ''
else:
sample_where_clause = (
f'WHERE val != 0 {'AND NOT isnan(val)' if is_float(leaf.dtype) else ''}'
)
sample_query = f"""
SELECT COALESCE(ARRAY_AGG(val), [])
FROM (SELECT {inner_select} as val FROM t {where_clause})
WHERE val != 0 {'AND NOT isnan(val)' if is_float(leaf.dtype) else ''}
{sample_where_clause}
USING SAMPLE 100;
"""
result.value_samples = list(self._query(sample_query)[0][0])
Expand Down Expand Up @@ -1894,9 +1900,9 @@ def select_groups(
sql_bounds = []
for label, start, end in named_bins:
if start is None:
start = cast(float, "'-Infinity'")
start = cast(float, "'-Infinity'::FLOAT")
if end is None:
end = cast(float, "'Infinity'")
end = cast(float, "'Infinity'::FLOAT")
sql_bounds.append(f"('{label}', {start}, {end})")

bin_index_col = 'col0'
Expand Down
4 changes: 2 additions & 2 deletions lilac/data/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ def test_select_star(make_test_data: TestDataMaker) -> None:
# Select * plus an inner `info.age` column.
result = dataset.select_rows(['*', ('info', 'age')])
assert list(result) == [
{'info.age': 40, 'info.age_2': 40, 'name': 'A'},
{'info.age': 42, 'info.age_2': 42, 'name': 'B'},
{'info.age': 40, 'info.age_1': 40, 'name': 'A'},
{'info.age': 42, 'info.age_1': 42, 'name': 'B'},
]


Expand Down
23 changes: 13 additions & 10 deletions lilac/sources/csv_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,31 @@ def setup(self) -> None:

# NOTE: We use duckdb here to increase parallelism for multiple files.
# NOTE: We turn off the parallel reader because of https://github.com/lilacai/lilac/issues/373.
self._con.execute(
f"""
CREATE SEQUENCE serial START 1;
CREATE VIEW t as (SELECT nextval('serial') as "{LINE_NUMBER_COLUMN}", * FROM read_csv_auto(
csv_source_sql = f"""read_csv_auto(
{duckdb_paths},
SAMPLE_SIZE=500000,
HEADER={self.header},
{f'NAMES={self.names},' if self.names else ''}
DELIM='{self.delim or ','}',
IGNORE_ERRORS=true,
PARALLEL=false
));
"""
)

res = self._con.execute('SELECT COUNT(*) FROM t').fetchone()
)"""
res = self._con.execute(f'SELECT COUNT(*) FROM {csv_source_sql}').fetchone()
num_items = cast(tuple[int], res)[0]

self._reader = self._con.execute('SELECT * from t').fetch_record_batch(rows_per_batch=10_000)
self._reader = self._con.execute(
f'SELECT 1::BIGINT as "{LINE_NUMBER_COLUMN}", * from {csv_source_sql}'
).fetch_record_batch(rows_per_batch=10_000)
# Create the source schema in prepare to share it between process and source_schema.
schema = arrow_schema_to_schema(self._reader.schema)
self._source_schema = SourceSchema(fields=schema.fields, num_items=num_items)
self._con.execute(
f"""
CREATE SEQUENCE serial START 1;
CREATE VIEW t as (
SELECT nextval('serial') as "{LINE_NUMBER_COLUMN}", * FROM {csv_source_sql});
"""
)

@override
def source_schema(self) -> SourceSchema:
Expand Down
Loading
Loading