Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Arrow Flight deserialization of batch containing DataType::Struct leads to OutOfSpec #1092

Closed
nielsmeima opened this issue Jun 21, 2022 · 8 comments
Labels
question Further information is requested

Comments

@nielsmeima
Copy link

I am using arrow2-convert in combination with arrow2::io::flight to go from Rust types -> Arrow -> Flight -> Arrow -> Rust types.

I am trying to arrow2::io::flight::serialize_batch and arrow2::io::flight::deserialize_batch data using the following DataType:

DataType::Struct(vec![
    Field::new(
        "target",
        DataType::Struct(vec![
            Field::new("name", DataType::Utf8, false),
            Field::new("value", DataType::Float64, false)
        ]),
        false
    ),
    Field::new(
        "assignments",
        DataType::List(Box::new(Field::new(
            "item",
            DataType::Struct(vec![
                Field::new("name", DataType::Utf8, false),
                Field::new("value", DataType::Float64, false)
            ]),
            false
        ))),
        false
    )
]);

After serializing data of the above type and trying to deserialize it, the following error arises: OutOfSpec("The slots of the array times the physical size must be smaller or equal to the length of the IPC buffer. However, this array reports 2 slots, which, for physical type \"f64\", corresponds to 16 bytes, which is larger than the buffer length 0").

Misuse of the API or a bug in either the serialization or deserialization for the Schema using the fields specified in this DataType?

A repository containing an example demonstrating the above behaviour can be found here: https://github.com/nielsmeima/convert-flight-test/blob/master/src/main.rs

@jorgecarleitao
Copy link
Owner

jorgecarleitao commented Jun 21, 2022

Great that you are looking into arrow2-convert and arrow-flight! More than happy to help!

I see that there is a design bug in arrow2's API whereby

let (_, serialized_batch) = arrow2::io::flight::serialize_batch(&chunk, &[], &options);

does not error - we should error if the second argument is not correct. In particular, in this case you should pass &arrow2::io::ipc::write::default_ipc_fields(&schema.fields) to it.

Essentially, the second arguments' needs to be aligned with the fields representing the chunk. In this case they are not, which causes us to write an invalid IPC file (we should error instead).

@nielsmeima
Copy link
Author

nielsmeima commented Jun 21, 2022

Thanks once again for the quick response!

That change leads to the following new error when serialize_batch is called: DictionaryTracker configured above to not error on replacement: InvalidArgumentError("The number of fields in a struct must equal the number of children in IpcField")'. I assume dictionaries are encoded independently from whether they are actually used or not, in this case I am passing an empty one in the form of HashMap::new().

I tracked this error down to the encode_dictionary function in src/io/ipc/write/common.rs, specifically the Struct arm of the match expression: if array.fields().len() != fields.len() is true.

I simplified the struct I am trying to serialize to:

#[derive(Debug, Clone, ArrowField, Deserialize)]
pub struct Action {
    pub target: Assignment,
}

and the logged both array.fields() and fields in encode_dictionary, which results in the following output:

array.fields(): [
    Field {
        name: "target",
        data_type: Struct(
            [
                Field {
                    name: "name",
                    data_type: Utf8,
                    is_nullable: false,
                    metadata: {},
                },
                Field {
                    name: "value",
                    data_type: Float64,
                    is_nullable: false,
                    metadata: {},
                },
            ],
        ),
        is_nullable: false,
        metadata: {},
    },
]
fields: [
    IpcField {
        fields: [],
        dictionary_id: None,
    },
    IpcField {
        fields: [],
        dictionary_id: None,
    },
]

There seems to be either a superfluous Field in the schema or an IpcField missing. I tried diving a bit deeper, but could not quickly work what the issue is.

EDIT: some additional information, these are the schema and ipc_schema passed to the serialize_batch call:

schema: Schema {
    fields: [
        Field {
            name: "target",
            data_type: Struct(
                [
                    Field {
                        name: "name",
                        data_type: Utf8,
                        is_nullable: false,
                        metadata: {},
                    },
                    Field {
                        name: "value",
                        data_type: Float64,
                        is_nullable: false,
                        metadata: {},
                    },
                ],
            ),
            is_nullable: false,
            metadata: {},
        },
    ],
    metadata: {},
}
ipc_schema: [
    IpcField {
        fields: [
            IpcField {
                fields: [],
                dictionary_id: None,
            },
            IpcField {
                fields: [],
                dictionary_id: None,
            },
        ],
        dictionary_id: None,
    },
]

@nielsmeima
Copy link
Author

nielsmeima commented Jun 21, 2022

Dove a little bit deeper. Disabling the dictionary encoding in encode_chunk results in the serialize_batch call passing. However, then a problem arises in the deserialize_batch call:

OutOfSpec("The slots of the array times the physical size must be smaller or equal to the length of the IPC buffer. However, this array reports 2 slots, which, for physical type \"f64\", corresponds to 16 bytes, which is larger than the buffer length 0",)

which seems again related to the issue above where Schema and IpcSchema are not in sync.

@jorgecarleitao
Copy link
Owner

I am sorry for the previous information - there is indeed something beyond an error in the parameter - I will look into this today

@jorgecarleitao
Copy link
Owner

I am trying to reproduce the bug. The test I have so far is:

use arrow2::array::*;
use arrow2::datatypes::{DataType, Field};
use arrow2::chunk::Chunk;
use arrow2::error::Error;

use arrow2::io::flight::*;
use arrow2::io::ipc::write::{default_ipc_fields, WriteOptions};


fn round_trip(schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<(), Error> {
    let fields = default_ipc_fields(&schema.fields);
    let serialized = serialize_schema(&schema, Some(&fields));
    let (result, ipc_schema) = deserialize_schemas(&serialized.data_header)?;

    // schema roundtrips
    assert_eq!(schema, result);

    let (_, batch) = serialize_batch(&chunk, &fields, &WriteOptions { compression: None });

    let result = deserialize_batch(&batch, &result.fields, &ipc_schema, &Default::default())?;

    // batch roundtrips
    assert_eq!(result, chunk);
    Ok(())
}


#[test]
fn struct_1() -> Result<(), Error> {
    let array = StructArray::from_data(
        DataType::Struct(vec![Field::new("a", DataType::Float64, true)]),
        vec![PrimitiveArray::<f64>::from_slice([1.1, 2.2, 3.3]).boxed()],
        None,
    );
    let schema = vec![Field::new("b", array.data_type().clone(), false)].into();

    round_trip(schema, Chunk::new(vec![array.boxed()]))?;

    Ok(())
}

but this is passing on my machine.

I can't spot any obvious difference vs what you have. Can you spot any?

@nielsmeima
Copy link
Author

Your test perfectly illustrated how I was incorrectly using the API. Thank you! I added the two tests below to illustrate my confusion using the API.

My mistake was to assume that identical structure of the schema and the Chunk::new(vec![array]) would be the correct usage of the deserialize_batch API (see below). I now realize why this assumption is incorrect.

Thanks for the wonderful crates and taking the time to help me out! I really appreciate it!

To make the interoperation between arrow2 and arrow2-convert even smoother, an ArrowSchema macro could be added? For example:

#[derive(ArrowField)]
struct Root {
    a: f64
}

#[derive(ArrowSchema)]
struct Schema {
    root: Root
}

@jorgecarleitao I could give this a shot and open a PR on the arrow2-convert repo?

I hope the information below helps someone else avoid making this assumption.


Assumption: Schema and StructArray look identical in in structure. Should work.

 Schema {
    fields: [
        Field {
            name: "b",
            data_type: Struct(
                [
                    Field {
                        name: "a",
                        data_type: Float64,
                        is_nullable: false,
                        metadata: {},
                    },
                ],
            ),
            is_nullable: false,
            metadata: {},
        },
    ],
    metadata: {},
}

StructArray[{b: {a: 1.1}}, {b: {a: 2.2}}, {b: {a: 3.3}}] 

Tests: illustrate the above assumption in struct_3. Show the correct usage by adding a field in struct_2 which actually has the required DataType (in exactly the manner as you have demonstrated in struct_1).

fn round_trip(schema: Schema, chunk: Chunk<Arc<dyn Array>>) -> Result<(), Error> {
    let fields = default_ipc_fields(&schema.fields);
    let serialized = serialize_schema(&schema, Some(&fields));
    let (result, ipc_schema) = deserialize_schemas(&serialized.data_header)?;

    // schema roundtrips
    assert_eq!(schema, result);

    let (_, batch) = serialize_batch(&chunk, &fields, &WriteOptions { compression: None });

    let result = deserialize_batch(&batch, &result.fields, &ipc_schema, &Default::default())?;

    // batch roundtrips
    assert_eq!(result, chunk);

    Ok(())
}

#[test]
fn struct_1() -> Result<(), Error> {
    let array = StructArray::from_data(
        DataType::Struct(vec![Field::new("a", DataType::Float64, true)]),
        vec![PrimitiveArray::<f64>::from_slice([1.1, 2.2, 3.3]).arced()],
        None,
    );

    let schema = vec![Field::new("b", array.data_type().clone(), false)].into();

    round_trip(schema, Chunk::new(vec![array.arced()]))?;

    Ok(())
}


#[derive(Debug, Clone, ArrowField)]
struct Inner {
    pub a: f64,
}

#[derive(Debug, Clone, ArrowField)]
struct Outer {
    b: Inner,
}


#[test]
fn struct_2() -> Result<(), Error> {
    let data = vec![
        Outer {
            b: Inner { a: 1.1 },
        },
        Outer {
            b: Inner { a: 2.2 },
        },
        Outer {
            b: Inner { a: 3.3 },
        },
    ];

    let array: Arc<dyn Array> = data.try_into_arrow()?;

    let data_type = DataType::Struct(vec![Field::new(
        "b",
        DataType::Struct(vec![Field::new("a", DataType::Float64, false)]),
        false,
    )]);

    assert_eq!(array.data_type().clone(), data_type);

    // adding a field works!
    let schema = vec![Field::new("field", data_type, false)].into();
    
    round_trip(schema, Chunk::new(vec![array]))?;

    Ok(())
}

#[test]
fn struct_3() -> Result<(), Error> {

    let data = vec![
        Outer {
            b: Inner { a: 1.1 },
        },
        Outer {
            b: Inner { a: 2.2 },
        },
        Outer {
            b: Inner { a: 3.3 },
        },
    ];

    let array: Arc<dyn Array> = data.try_into_arrow()?;

    let data_type = DataType::Struct(vec![Field::new(
        "b",
        DataType::Struct(vec![Field::new("a", DataType::Float64, false)]),
        false,
    )]);

    assert_eq!(array.data_type().clone(), data_type);

    let schema = vec![StructArray::get_fields(&data_type).to_vec().pop().unwrap()].into();

    round_trip(schema, Chunk::new(vec![array]))?;

    Ok(())
}

@jorgecarleitao jorgecarleitao added the question Further information is requested label Jun 22, 2022
@jorgecarleitao
Copy link
Owner

So, I do not know xD - My understanding is that a Struct is mapped to a Field, and thus the schema needs to be build from that field, but I can also see the value of generating a Schema where each entry of the struct is a column of the file.

maybe @ncpenke could pitch in here?

@ncpenke
Copy link
Contributor

ncpenke commented Jun 25, 2022

Thanks for using these libraries and the feedback @nielsmeima. Opened DataEngineeringLabs/arrow2-convert#40 to track and discuss this further.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants