Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors in scan_delta and write_delta with nested struct schema evolution (aka adding new field) #19915

Open
2 tasks done
TinoSM opened this issue Nov 21, 2024 · 0 comments
Open
2 tasks done
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@TinoSM
Copy link

TinoSM commented Nov 21, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl
# Make sure you extract the  delta_table_with_nested.zip attachment below
pl.scan_delta("./delta_table_with_nested").collect()


print(pl.scan_delta(delta_table_path).collect())
df = pl.DataFrame({
    "id": [1, 2],
    "field1": ["value1", "value2"],
}).with_columns(pl.lit(1).alias("field2").cast(pl.Int32),pl.col("id").cast(pl.Int32))

df = df.with_columns(
    pl.struct(["field1", "field2", pl.lit("x").alias("newcol")]).alias("X")
).select(["id", "X"])

# This will not work either. it works fine if you don't use the rust engine, but pyarrow engine is being deprecated by deltalake and has another issues with nested writes ("fake" nulls in non-null cols...) i'm not mentioning here
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="append")
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="overwrite")

delta_table_with_nested.zip

Make sure you

This is how I generated the table using Spark 3.4.1

import shutil
from pyspark.sql import SparkSession
import polars as pl
# Initialize Spark session with Delta Lake
spark = SparkSession.builder \
    .appName("DeltaNestedStructure") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "/tmp/delta_table_with_nested"
shutil.rmtree(delta_table_path, ignore_errors=True)

# Step 1: Create Delta table with a nested structure X
create_table_query = """
CREATE TABLE delta_table_with_nested (
    id INT,
    X STRUCT<field1: STRING, field2: INT>
)
USING DELTA
LOCATION '{}'
""".format(delta_table_path)

spark.sql(create_table_query)

# Step 2: Insert some data into the table
insert_data_query = """
INSERT INTO delta_table_with_nested VALUES
(1, STRUCT('value1', 10)),
(2, STRUCT('value2', 20))
"""
spark.sql(insert_data_query)

# Step 3: Add a new column `new_col` to the nested structure X using SQL
# This operation does not write new data, just updates the schema.
add_column_query = """
ALTER TABLE delta_table_with_nested 
ADD COLUMNS X.newcol STRING
"""
spark.sql(add_column_query)

# Verify the schema update
df = spark.read.format("delta").load(delta_table_path)
df.show()


print(pl.scan_delta(delta_table_path).collect())
df = pl.DataFrame({
    "id": [1, 2],
    "field1": ["value1", "value2"],
}).with_columns(pl.lit(1).alias("field2").cast(pl.Int32),pl.col("id").cast(pl.Int32))

df = df.with_columns(
    pl.struct(["field1", "field2", pl.lit("x").alias("newcol")]).alias("X")
).select(["id", "X"])

# This will not work either
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="append")

spark.stop()

Log output

--------Version info---------
Polars:              1.14.0
Index type:          UInt32
Platform:            macOS-14.7.1-arm64-arm-64bit
Python:              3.11.10 (main, Sep  7 2024, 01:03:31) [Clang 15.0.0 (clang-1500.3.9.4)]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               5.4.1
boto3                1.35.36
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            0.17.0
fastexcel            <not installed>
fsspec               2024.10.0
gevent               <not installed>
google.auth          2.36.0
great_tables         <not installed>
matplotlib           <not installed>
nest_asyncio         <not installed>
numpy                1.26.4
openpyxl             <not installed>
pandas               2.2.3
pyarrow              18.0.0
pydantic             2.9.2
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
Traceback (most recent call last):
  File "/Users/FlorentinoSainz/Projects/gdt-opendata/fsainz_test_polars_list.py", line 49, in <module>
    print(pl.scan_delta(delta_table_path).collect())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/FlorentinoSainz/Projects/gdt-opendata/.venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 2029, in collect
    return wrap_df(ldf.collect(callback))
                   ^^^^^^^^^^^^^^^^^^^^^
polars.exceptions.SchemaError: dtypes differ for column X: Struct([Field { name: "field1", dtype: Utf8View, is_nullable: true, metadata: {} }, Field { name: "field2", dtype: Int32, is_nullable: true, metadata: {} }]) != Struct([Field { name: "field1", dtype: Utf8View, is_nullable: true, metadata: {} }, Field { name: "field2", dtype: Int32, is_nullable: true, metadata: {} }, Field { name: "newcol", dtype: Utf8View, is_nullable: true, metadata: {} }])

Issue description

  1. I created the table with a nested struct X
  2. I wrote into it
  3. I added a new column within X nested struct (typical case of schema-evolution, adding a new field)
  4. Scans in polars not working anymore (they work in Spark, and in polars with the workaround below)
  5. Also writes in polars not working anymore (they work in Spark, and in polars with the workaround below)

As a workaround I'm doing this instead of using scan_delta (and also instead of scan_parquet, I tried to use polars scan_parquet as much as possible, but I didn't manage to remove the schema when reading the parquets in python code as rust fails if I do so)

            dl_tbl = _get_delta_lake_table(
        table_path=table_path,
        version=kwargs.get("version",None),
        storage_options=storage_options,
        delta_table_options=kwargs.get("delta_table_options",None),
        )
        # .... some other code stolen from pl.scan_delta
        arrow_dataset=ds.dataset(urls, filesystem=fs, # type: ignore
                                 partitioning=part, partition_base_dir=partition_base_dir, 
                                 format="parquet")
        scan_df = pl.scan_pyarrow_dataset(arrow_dataset)
        # This concat fixes the case in which someone added a new column in delta
        # but no parquet file contains data with this column yet
        # also should solve the issue of reading empty dataframes
        # Similar to allow_missing_columns, but also works for nested structs
        schema_from_parquets= scan_df.collect_schema()
        aligned= align_schemas(empty_delta_schema_lf.collect_schema(), schema_from_parquets)
        casted = scan_df.cast(aligned, strict=False) # Doing this to correct Timestamp TZ=None and TimeStamp=UTC conversion
        
        return pl.concat([empty_delta_schema_lf, casted], how="diagonal_relaxed") #this will add the missing nested columns to the data

I do have this other code for the writing bug aswell

try:
            batch.write_delta(
                target=table_path, mode=mode_batch, storage_options=storage_options, 
                delta_write_options=delta_write_options, **kwargs
            )
        except Exception as e:
            if isinstance(e, SchemaMismatchError) and table.schema==batch.schema:
                logging.info("Retrying with schema override. "
                             f"Got SchemaMistMatchError {str(e)} and the schemas are the same."
                             " This usually happens when a new field is added to the table and "
                             "no parquet file contains it")
                batch.write_delta(
                    target=table_path, mode=mode_batch, storage_options=storage_options, 
                    delta_write_options={**delta_write_options,"schema_mode": "merge"}, **kwargs
                )
            else:
                raise e

Expected behavior

Polars reading the data with a NULL for the parquet files who are missing the data

Installed versions

--------Version info---------
Polars:              1.14.0
Index type:          UInt32
Platform:            macOS-14.7.1-arm64-arm-64bit
Python:              3.11.10 (main, Sep  7 2024, 01:03:31) [Clang 15.0.0 (clang-1500.3.9.4)]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               5.4.1
boto3                1.35.36
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            0.17.0
fastexcel            <not installed>
fsspec               2024.10.0
gevent               <not installed>
google.auth          2.36.0
great_tables         <not installed>
matplotlib           <not installed>
nest_asyncio         <not installed>
numpy                1.26.4
openpyxl             <not installed>
pandas               2.2.3
pyarrow              18.0.0
pydantic             2.9.2
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@TinoSM TinoSM added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Nov 21, 2024
@TinoSM TinoSM changed the title Scan_delta fails if the table has parquet files not containing one of the columns in the schema in a nested struct Issues in scan_delta and write_delta with nested struct schema evolution (aka adding new field) Nov 21, 2024
@TinoSM TinoSM changed the title Issues in scan_delta and write_delta with nested struct schema evolution (aka adding new field) Errors in scan_delta and write_delta with nested struct schema evolution (aka adding new field) Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

1 participant