Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

predicate/projection pushdown causes ShapeError #19944

Open
2 tasks done
mdavis-xyz opened this issue Nov 23, 2024 · 1 comment
Open
2 tasks done

predicate/projection pushdown causes ShapeError #19944

mdavis-xyz opened this issue Nov 23, 2024 · 1 comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@mdavis-xyz
Copy link
Contributor

mdavis-xyz commented Nov 23, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

Note that this script will download 170MB of sample files from AWS S3. But you shouldn't need any AWS credentials for this to work. (I made the bucket public.)

import os
import datetime as dt
from copy import deepcopy as copy
from tempfile import TemporaryDirectory

import boto3
from botocore.config import Config
from botocore import UNSIGNED
import polars as pl

# download the data
filenames_short = ['bop1.parquet', 'bop2.parquet', 'bdo.parquet']
bucket_name = 'mdavis-xyz-public-mwe'
s3_prefix = 'pl-shape-error/01/'
local_dir = 'mwe-data'
region_name = "eu-west-3"

s3 = boto3.resource('s3', 
                  config=Config(signature_version=UNSIGNED),
                  region_name=region_name)
bucket = s3.Bucket(bucket_name)

os.makedirs(local_dir, exist_ok=True)
local_paths = []
remote_paths = []
for f in filenames_short:
    local_path = os.path.join(local_dir, f)
    remote_path = os.path.join(s3_prefix, f)
    bucket.download_file(remote_path, local_path)
    local_paths.append(local_path)
    remote_paths.append(os.path.join('s3://' + bucket_name, remote_path))
remote_paths

# define the query which causes problems
def query(bop_paths, bdo_path, storage_options={}, predicate_pushdown=True, projection_pushdown=True):
    bidofferperiod_wide = (
        pl.concat([
            pl.scan_parquet(bop_paths[0], storage_options=storage_options),
            pl.scan_parquet(bop_paths[1], storage_options=storage_options)
        ], how='diagonal_relaxed')
    )
    
    biddayoffer_per_bid_wide = pl.scan_parquet(bdo_path, storage_options=storage_options)
    
    lf = (
        biddayoffer_per_bid_wide
        .rename({
            'SETTLEMENTDATE': 'TRADINGDATE',
            'OFFERDATE': 'OFFERDATETIME',
        })
        .with_columns(pl.col("TRADINGDATE").cast(dt.date))
        .join(bidofferperiod_wide, 
              how='inner', 
              on=['BIDTYPE', 'DIRECTION', 'DUID', 'OFFERDATETIME', 'TRADINGDATE'], 
              join_nulls=True,
              coalesce=True
        )
        .filter(pl.col("MAXAVAIL") > 0) # avoid divide by zero later
        .filter(pl.col("DUID").is_not_null())
    
        
        .tail()
        .group_by("DUID")
        .agg([
             pl.len().alias("LENGTH"),
        ])
    )
    lf.collect(predicate_pushdown=predicate_pushdown, projection_pushdown=projection_pushdown)

# these ones work
query(remote_paths[:2], remote_paths[-1], storage_options={"aws_region": region_name})
query(local_paths[:2], local_paths[-1], predicate_pushdown=False, projection_pushdown=False)
query(local_paths[:2], local_paths[-1], predicate_pushdown=False, projection_pushdown=True)
query(local_paths[:2], local_paths[-1], predicate_pushdown=True, projection_pushdown=False)

# this throws an error
query(local_paths[:2], local_paths[-1], predicate_pushdown=True, projection_pushdown=True)

Log output

Async thread count: 4
async download_chunk_size: 67108864
join parallel: true
UNION: union is run in parallel
POLARS PREFETCH_SIZE: 96
querying metadata of 1/1 files...
POLARS PREFETCH_SIZE: 96
reading of 1/1 file...
querying metadata of 1/1 files...
POLARS PREFETCH_SIZE: 96
reading of 1/1 file...
querying metadata of 1/1 files...
reading of 1/1 file...
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
POLARS ROW_GROUP PREFETCH_SIZE: 128
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
POLARS ROW_GROUP PREFETCH_SIZE: 128
parquet scan with parallel = Columns
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet scan with parallel = RowGroups
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
...
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
sys:1: CategoricalRemappingWarning: Local categoricals have different encodings, expensive re-encoding is done to perform this merge operation. Consider using a StringCache or an Enum type if the categories are known in advance
parquet scan with parallel = RowGroups
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
...
parquet row group must be read, statistics not sufficient for predicate.
/home/azureuser/after_holiday/mwe/gh/script.py:63: CategoricalRemappingWarning: Local categoricals have different encodings, expensive re-encoding is done to perform this merge operation. Consider using a StringCache or an Enum type if the categories are known in advance
  lf.collect(predicate_pushdown=predicate_pushdown, projection_pushdown=projection_pushdown)
INNER join dataframes finished
FOUND SORTED KEY: running default HASH AGGREGATION
join parallel: true
UNION: union is run in parallel
parquet scan with parallel = RowGroups
parquet scan with parallel = Columns
parquet scan with parallel = Columns
INNER join dataframes finished
dataframe filtered
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
join parallel: true
UNION: union is run in parallel
parquet scan with parallel = RowGroups
parquet scan with parallel = Columns
parquet scan with parallel = RowGroups
INNER join dataframes finished
dataframe filtered
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
join parallel: true
UNION: union is run in parallel
parquet scan with parallel = Prefiltered
parquet live columns = 1, dead columns = 30
parquet scan with parallel = Columns
parquet scan with parallel = Prefiltered
parquet live columns = 2, dead columns = 26
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
...
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
INNER join dataframes finished
FOUND SORTED KEY: running default HASH AGGREGATION
join parallel: true
UNION: union is run in parallel
parquet scan with parallel = Columns
parquet scan with parallel = Prefiltered
parquet live columns = 1, dead columns = 4
parquet scan with parallel = Prefiltered
parquet live columns = 2, dead columns = 4
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...
parquet file must be read, statistics not sufficient for predicate.
parquet row group must be read, statistics not sufficient for predicate.
Traceback (most recent call last):
  File "/home/azureuser/after_holiday/mwe/gh/script.py", line 69, in <module>
    query(local_paths[:2], local_paths[-1], predicate_pushdown=True, projection_pushdown=True)
  File "/home/azureuser/after_holiday/mwe/gh/script.py", line 63, in query
    lf.collect(predicate_pushdown=predicate_pushdown, projection_pushdown=projection_pushdown)
  File "/home/azureuser/venv/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 2029, in collect
    return wrap_df(ldf.collect(callback))
polars.exceptions.ShapeError: unable to vstack, column names don't match: "BIDTYPE" and "DUID"

Issue description

I get error:

ShapeError: unable to vstack, column names don't match: "BIDTYPE" and "DUID"

For code which has no vstack command. I do have a concat but with diagonal_relaxed.

Note that if I query from AWS S3 directly, I don't get the error. If I turn off either predicate or projection pushdown (or both), I don't get the error. Only when I turn on both, and read the file locally, do I get the error.

I suspect the error may be related to enums/categoricals.

It may be related to #13381

Expected behavior

The script should run without throwing an exception.

Querying files locally should give the same result as querying from S3.

Installed versions

--------Version info---------
Polars:              1.14.0
Index type:          UInt32
Platform:            Linux-6.8.0-1018-azure-x86_64-with-glibc2.35
Python:              3.10.12 (main, Nov  6 2024, 20:22:13) [GCC 11.4.0]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               <not installed>
boto3                1.35.68
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            0.12.0
fsspec               <not installed>
gevent               <not installed>
google.auth          <not installed>
great_tables         <not installed>
matplotlib           3.9.2
nest_asyncio         1.6.0
numpy                2.0.0
openpyxl             3.1.5
pandas               2.2.3
pyarrow              18.0.0
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             0.8.4
xlsxwriter           3.2.0

@mdavis-xyz mdavis-xyz added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Nov 23, 2024
@cmdlineluser
Copy link
Contributor

cmdlineluser commented Nov 23, 2024

Attempted minimal repro.

import polars as pl

filename = '19944.parquet'

n = 1_500_000

values = list('abcdefghijlmno')
dtype = pl.Enum(values)
s = pl.Series(values, dtype=dtype)

pl.select(
    A = pl.lit('a'),
    B = s.sample(n, with_replacement=True),
    C = s.sample(n, with_replacement=True)
).write_parquet(filename)

r1 = pl.LazyFrame({'A': 'a', 'B': 'b'}).cast({'B': dtype})
r2 = pl.scan_parquet(filename).cast({'B': dtype, 'C': dtype})

l = pl.LazyFrame({'B': 'b'}).cast({'B': dtype})
r = pl.concat([r1, r2], how='diagonal_relaxed')

(l.join(r, on='B')
  .filter(A = pl.col('A'))
  .group_by('A')
  .first()
  .collect() 
)
# ShapeError: unable to vstack, column names don't match: "B" and "A"

Update: It seems this started to error in 1.13.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

2 participants