Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Upgrade odbc-api to latest 4.1.0 #1598

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ strength_reduce = { version = "0.2", optional = true }
multiversion = { version = "0.7.3", optional = true }

# For support for odbc
odbc-api = { version = "0.36", optional = true }
odbc-api = { version = "4.1.0", optional = true }

# Faster hashing
ahash = "0.8"
Expand Down
43 changes: 8 additions & 35 deletions arrow-odbc-integration-testing/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use stdext::function_name;

use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Int64Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, TimeUnit};
use arrow2::datatypes::{DataType, TimeUnit};
use arrow2::error::Result;
use arrow2::io::odbc::api::{Connection, Cursor};
use arrow2::io::odbc::read::{buffer_from_metadata, deserialize, infer_schema};
use arrow2::io::odbc::api::ConnectionOptions;
use arrow2::io::odbc::read::Reader;

use super::{setup_empty_table, ENV, MSSQL};

Expand Down Expand Up @@ -138,45 +138,18 @@ fn test(
insert: &str,
table_name: &str,
) -> Result<()> {
let connection = ENV.connect_with_connection_string(MSSQL).unwrap();
let connection = ENV
.connect_with_connection_string(MSSQL, ConnectionOptions::default())
.unwrap();
setup_empty_table(&connection, table_name, &[type_]).unwrap();

connection
.execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ())
.unwrap();

// When
let query = format!("SELECT a FROM {table_name} ORDER BY id");

let chunks = read(&connection, &query)?.1;
let chunks = Reader::new(MSSQL.to_string(), query, None, None).read()?;

assert_eq!(chunks, expected);
Ok(())
}

pub fn read(
connection: &Connection<'_>,
query: &str,
) -> Result<(Vec<Field>, Vec<Chunk<Box<dyn Array>>>)> {
let mut a = connection.prepare(query).unwrap();
let fields = infer_schema(&a)?;

let max_batch_size = 100;
let buffer = buffer_from_metadata(&a, max_batch_size).unwrap();

let cursor = a.execute(()).unwrap().unwrap();
let mut cursor = cursor.bind_buffer(buffer).unwrap();

let mut chunks = vec![];
while let Some(batch) = cursor.fetch().unwrap() {
let arrays = (0..batch.num_cols())
.zip(fields.iter())
.map(|(index, field)| {
let column_view = batch.column(index);
deserialize(column_view, field.data_type.clone())
})
.collect::<Vec<_>>();
chunks.push(Chunk::new(arrays));
}

Ok((fields, chunks))
}
30 changes: 14 additions & 16 deletions arrow-odbc-integration-testing/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,34 @@ use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field};
use arrow2::error::Result;
use arrow2::io::odbc::write::{buffer_from_description, infer_descriptions, serialize};

use super::read::read;
use super::{setup_empty_table, ENV, MSSQL};

use arrow2::io::odbc::api::ConnectionOptions;
use arrow2::io::odbc::read::Reader;
use arrow2::io::odbc::write::Writer;

fn test(
expected: Chunk<Box<dyn Array>>,
fields: Vec<Field>,
_fields: Vec<Field>,
type_: &str,
table_name: &str,
) -> Result<()> {
let connection = ENV.connect_with_connection_string(MSSQL).unwrap();
let connection = ENV
.connect_with_connection_string(MSSQL, ConnectionOptions::default())
.unwrap();
setup_empty_table(&connection, table_name, &[type_]).unwrap();

let query = &format!("INSERT INTO {table_name} (a) VALUES (?)");
let mut a = connection.prepare(query).unwrap();

let mut buffer = buffer_from_description(infer_descriptions(&fields)?, expected.len());
let write_query = &format!("INSERT INTO {table_name} (a) VALUES (?)");

// write
buffer.set_num_rows(expected.len());
let array = &expected.columns()[0];
let mut writer = Writer::new(MSSQL.to_string(), write_query.to_string(), None);

serialize(array.as_ref(), &mut buffer.column_mut(0))?;

a.execute(&buffer).unwrap();
writer.write(&expected)?;

// read
let query = format!("SELECT a FROM {table_name} ORDER BY id");
let chunks = read(&connection, &query)?.1;
let read_query = format!("SELECT a FROM {table_name} ORDER BY id");

let chunks = Reader::new(MSSQL.to_string(), read_query, None, None).read()?;

assert_eq!(chunks[0], expected);
Ok(())
Expand Down
83 changes: 0 additions & 83 deletions examples/io_odbc.rs

This file was deleted.

59 changes: 29 additions & 30 deletions src/io/odbc/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use chrono::{NaiveDate, NaiveDateTime};
use odbc_api::buffers::{BinColumnView, TextColumnView};
use odbc_api::Bit;

use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array};
use crate::bitmap::{Bitmap, MutableBitmap};
Expand All @@ -9,77 +7,78 @@ use crate::datatypes::{DataType, TimeUnit};
use crate::offset::{Offsets, OffsetsBuffer};
use crate::types::NativeType;

use super::super::api::buffers::AnyColumnView;
use super::super::api::buffers::{AnySlice, BinColumnView, TextColumnView};
use super::super::api::Bit;

/// Deserializes a [`AnyColumnView`] into an array of [`DataType`].
/// Deserializes a [`AnySlice`] into an array of [`DataType`].
/// This is CPU-bounded
pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box<dyn Array> {
pub fn deserialize(column: AnySlice, data_type: DataType) -> Box<dyn Array> {
match column {
AnyColumnView::Text(view) => Box::new(utf8(data_type, view)) as _,
AnyColumnView::WText(_) => todo!(),
AnyColumnView::Binary(view) => Box::new(binary(data_type, view)) as _,
AnyColumnView::Date(values) => Box::new(date(data_type, values)) as _,
AnyColumnView::Time(values) => Box::new(time(data_type, values)) as _,
AnyColumnView::Timestamp(values) => Box::new(timestamp(data_type, values)) as _,
AnyColumnView::F64(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::F32(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::I8(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::I16(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::I32(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::I64(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::U8(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _,
AnyColumnView::NullableDate(slice) => Box::new(date_optional(
AnySlice::Text(view) => Box::new(utf8(data_type, view)) as _,
AnySlice::WText(_) => todo!(),
AnySlice::Binary(view) => Box::new(binary(data_type, view)) as _,
AnySlice::Date(values) => Box::new(date(data_type, values)) as _,
AnySlice::Time(values) => Box::new(time(data_type, values)) as _,
AnySlice::Timestamp(values) => Box::new(timestamp(data_type, values)) as _,
AnySlice::F64(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::F32(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::I8(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::I16(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::I32(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::I64(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::U8(values) => Box::new(primitive(data_type, values)) as _,
AnySlice::Bit(values) => Box::new(bool(data_type, values)) as _,
AnySlice::NullableDate(slice) => Box::new(date_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableTime(slice) => Box::new(time_optional(
AnySlice::NullableTime(slice) => Box::new(time_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableTimestamp(slice) => Box::new(timestamp_optional(
AnySlice::NullableTimestamp(slice) => Box::new(timestamp_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableF64(slice) => Box::new(primitive_optional(
AnySlice::NullableF64(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableF32(slice) => Box::new(primitive_optional(
AnySlice::NullableF32(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableI8(slice) => Box::new(primitive_optional(
AnySlice::NullableI8(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableI16(slice) => Box::new(primitive_optional(
AnySlice::NullableI16(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableI32(slice) => Box::new(primitive_optional(
AnySlice::NullableI32(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableI64(slice) => Box::new(primitive_optional(
AnySlice::NullableI64(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableU8(slice) => Box::new(primitive_optional(
AnySlice::NullableU8(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableBit(slice) => Box::new(bool_optional(
AnySlice::NullableBit(slice) => Box::new(bool_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
Expand Down
Loading