Skip to content

Commit

Permalink
Upgrading to polars 0.44 (#14478)
Browse files Browse the repository at this point in the history
Upgrading to polars 0.44
  • Loading branch information
ayax79 authored Nov 30, 2024
1 parent e1f74a6 commit 0172ad8
Show file tree
Hide file tree
Showing 12 changed files with 847 additions and 243 deletions.
905 changes: 753 additions & 152 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions crates/nu_plugin_polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ mimalloc = { version = "0.1.42" }
num = {version = "0.4"}
serde = { version = "1.0", features = ["derive"] }
sqlparser = { version = "0.49"}
polars-io = { version = "0.43", features = ["avro"]}
polars-arrow = { version = "0.43"}
polars-ops = { version = "0.43", features = ["pivot"]}
polars-plan = { version = "0.43", features = ["regex"]}
polars-utils = { version = "0.43"}
polars-io = { version = "0.44", features = ["avro"]}
polars-arrow = { version = "0.44"}
polars-ops = { version = "0.44", features = ["pivot"]}
polars-plan = { version = "0.44", features = ["regex"]}
polars-utils = { version = "0.44"}
typetag = "0.2"
env_logger = "0.11.3"
log.workspace = true
Expand Down Expand Up @@ -81,7 +81,7 @@ features = [
"to_dummies",
]
optional = false
version = "0.43"
version = "0.44"

[dev-dependencies]
nu-cmd-lang = { path = "../nu-cmd-lang", version = "0.100.1" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type,
Value,
};
use polars::prelude::{lit, QuantileInterpolOptions};
use polars::prelude::{lit, QuantileMethod};

#[derive(Clone)]
pub struct LazyQuantile;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl PluginCommand for LazyQuantile {
PolarsPluginObject::NuExpression(expr) => {
let expr: NuExpression = expr
.into_polars()
.quantile(lit(quantile), QuantileInterpolOptions::default())
.quantile(lit(quantile), QuantileMethod::default())
.into();
expr.to_pipeline_data(plugin, engine, call.head)
}
Expand All @@ -136,7 +136,7 @@ fn command(
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.quantile(lit(quantile), QuantileInterpolOptions::default()),
.quantile(lit(quantile), QuantileMethod::default()),
);

lazy.to_pipeline_data(plugin, engine, call.head)
Expand Down
19 changes: 2 additions & 17 deletions crates/nu_plugin_polars/src/dataframe/command/core/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use polars::{
},
};

use polars_io::{
avro::AvroReader, csv::read::CsvReadOptions, prelude::ParallelStrategy, HiveOptions,
};
use polars_io::{avro::AvroReader, csv::read::CsvReadOptions, HiveOptions};

const DEFAULT_INFER_SCHEMA: usize = 100;

Expand Down Expand Up @@ -179,20 +177,7 @@ fn from_parquet(
) -> Result<Value, ShellError> {
if !call.has_flag("eager")? {
let file: String = call.req(0)?;
let args = ScanArgsParquet {
n_rows: None,
cache: true,
parallel: ParallelStrategy::Auto,
rechunk: false,
row_index: None,
low_memory: false,
cloud_options: None,
use_statistics: false,
hive_options: HiveOptions::default(),
glob: true,
include_file_paths: None,
};

let args = ScanArgsParquet::default();
let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
.map_err(|e| ShellError::GenericError {
error: "Parquet reader error".into(),
Expand Down
12 changes: 7 additions & 5 deletions crates/nu_plugin_polars/src/dataframe/command/core/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use nu_protocol::{
use polars::{
chunked_array::ChunkedArray,
prelude::{
AnyValue, DataFrame, DataType, Float64Type, IntoSeries, NewChunkedArray,
QuantileInterpolOptions, Series, StringType,
AnyValue, Column as PolarsColumn, DataFrame, DataType, Float64Type, IntoSeries,
NewChunkedArray, QuantileMethod, StringType,
},
};

Expand Down Expand Up @@ -184,7 +184,6 @@ fn command(

let tail = df
.as_ref()
.get_columns()
.iter()
.filter(|col| !matches!(col.dtype(), &DataType::Object("object", _)))
.map(|col| {
Expand All @@ -200,7 +199,7 @@ fn command(
.clone()
.into_iter()
.map(|q| {
col.quantile_reduce(q, QuantileInterpolOptions::default())
col.quantile_reduce(q, QuantileMethod::default())
.ok()
.map(|s| s.into_series("quantile".into()))
.and_then(|ca| ca.cast(&DataType::Float64).ok())
Expand All @@ -221,7 +220,10 @@ fn command(
ChunkedArray::<Float64Type>::from_slice_options(name.into(), &descriptors).into_series()
});

let res = head.chain(tail).collect::<Vec<Series>>();
let res = head
.chain(tail)
.map(PolarsColumn::from)
.collect::<Vec<PolarsColumn>>();

let polars_df = DataFrame::new(res).map_err(|e| ShellError::GenericError {
error: "Dataframe Error".into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ fn command_lazy(
}

let lazy = lazy.to_polars();
let lazy: NuLazyFrame = lazy.rename(&columns, &new_names).into();
let lazy: NuLazyFrame = lazy.rename(&columns, &new_names, true).into();

lazy.to_pipeline_data(plugin, engine, call.head)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,14 @@ fn command_df(
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let start: i64 = call.req(0)?;
let start = Series::new("".into(), &[start]);
let start = Series::new("".into(), &[start]).into();

let length: Option<i64> = call.get_flag("length")?;
let length = match length {
Some(v) => Series::new("".into(), &[v as u64]),
None => Series::new_null("".into(), 1),
};
}
.into();

let series = df.as_series(call.head)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use nu_protocol::{
};
use num::Zero;
use polars::prelude::{
BooleanType, ChunkCompare, ChunkedArray, DataType, Float64Type, Int64Type, IntoSeries,
NumOpsDispatchChecked, PolarsError, Series, StringNameSpaceImpl,
BooleanType, ChunkCompareEq, ChunkCompareIneq, ChunkedArray, DataType, Float64Type, Int64Type,
IntoSeries, NumOpsDispatchChecked, PolarsError, Series, StringNameSpaceImpl,
};
use std::ops::{Add, BitAnd, BitOr, Div, Mul, Sub};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use polars::chunked_array::ChunkedArray;
use polars::datatypes::{AnyValue, PlSmallStr};
use polars::export::arrow::Either;
use polars::prelude::{
ChunkAnyValue, DataFrame, DataType, DatetimeChunked, Float32Type, Float64Type, Int16Type,
Int32Type, Int64Type, Int8Type, IntoSeries, ListBooleanChunkedBuilder, ListBuilderTrait,
ListPrimitiveChunkedBuilder, ListStringChunkedBuilder, ListType, NamedFrom, NewChunkedArray,
ObjectType, PolarsError, Schema, SchemaExt, Series, StructChunked, TemporalMethods, TimeUnit,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
ChunkAnyValue, Column as PolarsColumn, DataFrame, DataType, DatetimeChunked, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntoSeries, ListBooleanChunkedBuilder,
ListBuilderTrait, ListPrimitiveChunkedBuilder, ListStringChunkedBuilder, ListType, NamedFrom,
NewChunkedArray, ObjectType, PolarsError, Schema, SchemaExt, Series, StructChunked,
TemporalMethods, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};

use nu_protocol::{Record, ShellError, Span, Value};
Expand Down Expand Up @@ -146,6 +146,16 @@ impl DerefMut for TypedColumn {
pub type ColumnMap = IndexMap<PlSmallStr, TypedColumn>;

pub fn create_column(
column: &PolarsColumn,
from_row: usize,
to_row: usize,
span: Span,
) -> Result<Column, ShellError> {
let series = column.as_materialized_series();
create_column_from_series(series, from_row, to_row, span)
}

pub fn create_column_from_series(
series: &Series,
from_row: usize,
to_row: usize,
Expand Down Expand Up @@ -497,15 +507,19 @@ fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Serie
insert_record(&mut column_values, record.clone(), &schema)?;
let df = from_parsed_columns(column_values)?;
for name in df.df.get_column_names() {
let series = df.df.column(name).map_err(|e| ShellError::GenericError {
error: format!(
"Error creating struct, could not get column name {name}: {e}"
),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let series = df
.df
.column(name)
.map_err(|e| ShellError::GenericError {
error: format!(
"Error creating struct, could not get column name {name}: {e}"
),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?
.as_materialized_series();

if let Some(v) = structs.get_mut(name) {
let _ = v.append(series)
Expand All @@ -524,15 +538,18 @@ fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Serie

let structs: Vec<Series> = structs.into_values().collect();

let chunked =
StructChunked::from_series(column.name().to_owned(), structs.as_slice())
.map_err(|e| ShellError::GenericError {
error: format!("Error creating struct: {e}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
let chunked = StructChunked::from_series(
column.name().to_owned(),
structs.len(),
structs.iter(),
)
.map_err(|e| ShellError::GenericError {
error: format!("Error creating struct: {e}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
Ok(chunked.into_series())
}
_ => Err(ShellError::GenericError {
Expand All @@ -558,13 +575,13 @@ fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Serie
// This data can be used to create a Series object that can initialize
// the dataframe based on the type of data that is found
pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, ShellError> {
let mut df_series: Vec<Series> = Vec::new();
let mut df_columns: Vec<PolarsColumn> = Vec::new();
for (name, column) in column_values {
let series = typed_column_to_series(name, column)?;
df_series.push(series);
df_columns.push(series.into());
}

DataFrame::new(df_series)
DataFrame::new(df_columns)
.map(|df| NuDataFrame::new(false, df))
.map_err(|e| ShellError::GenericError {
error: "Error creating dataframe".into(),
Expand Down Expand Up @@ -1245,7 +1262,8 @@ fn any_value_to_value(any_value: &AnyValue, span: Span) -> Result<Value, ShellEr
}
AnyValue::Datetime(a, time_unit, tz) => {
let nanos = nanos_from_timeunit(*a, *time_unit);
datetime_from_epoch_nanos(nanos, tz, span).map(|datetime| Value::date(datetime, span))
datetime_from_epoch_nanos(nanos, &tz.cloned(), span)
.map(|datetime| Value::date(datetime, span))
}
AnyValue::Duration(a, time_unit) => {
let nanos = match time_unit {
Expand All @@ -1264,17 +1282,7 @@ fn any_value_to_value(any_value: &AnyValue, span: Span) -> Result<Value, ShellEr
}
AnyValue::Struct(_idx, _struct_array, _s_fields) => {
// This should convert to a StructOwned object.
let static_value =
any_value
.clone()
.into_static()
.map_err(|e| ShellError::GenericError {
error: "Cannot convert polars struct to static value".into(),
msg: e.to_string(),
span: Some(span),
help: None,
inner: Vec::new(),
})?;
let static_value = any_value.clone().into_static();
any_value_to_value(&static_value, span)
}
AnyValue::StructOwned(struct_tuple) => {
Expand Down Expand Up @@ -1485,7 +1493,7 @@ mod tests {
let test_millis = 946_684_800_000;
assert_eq!(
any_value_to_value(
&AnyValue::Datetime(test_millis, TimeUnit::Milliseconds, &None),
&AnyValue::Datetime(test_millis, TimeUnit::Milliseconds, None),
span
)?,
Value::date(comparison_date, span)
Expand Down Expand Up @@ -1575,6 +1583,7 @@ mod tests {
let test_bool_arr = BooleanArray::from([Some(true)]);
let test_struct_arr = StructArray::new(
DataType::Struct(fields.clone()).to_arrow(CompatLevel::newest()),
1,
vec![Box::new(test_int_arr), Box::new(test_bool_arr)],
None,
);
Expand Down
28 changes: 17 additions & 11 deletions crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ pub use operations::Axis;

use indexmap::map::IndexMap;
use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value};
use polars::prelude::{DataFrame, DataType, IntoLazy, PolarsObject, Series};
use polars::prelude::{
Column as PolarsColumn, DataFrame, DataType, IntoLazy, PolarsObject, Series,
};
use polars_plan::prelude::{lit, Expr, Null};
use polars_utils::total_ord::{TotalEq, TotalHash};
use std::{
Expand Down Expand Up @@ -135,7 +137,7 @@ impl NuDataFrame {
}

pub fn try_from_series(series: Series, span: Span) -> Result<Self, ShellError> {
match DataFrame::new(vec![series]) {
match DataFrame::new(vec![series.into()]) {
Ok(dataframe) => Ok(NuDataFrame::new(false, dataframe)),
Err(e) => Err(ShellError::GenericError {
error: "Error creating dataframe".into(),
Expand Down Expand Up @@ -191,13 +193,16 @@ impl NuDataFrame {
}

pub fn try_from_series_vec(columns: Vec<Series>, span: Span) -> Result<Self, ShellError> {
let dataframe = DataFrame::new(columns).map_err(|e| ShellError::GenericError {
error: "Error creating dataframe".into(),
msg: format!("Unable to create DataFrame: {e}"),
span: Some(span),
help: None,
inner: vec![],
})?;
let columns_converted: Vec<PolarsColumn> = columns.into_iter().map(Into::into).collect();

let dataframe =
DataFrame::new(columns_converted).map_err(|e| ShellError::GenericError {
error: "Error creating dataframe".into(),
msg: format!("Unable to create DataFrame: {e}"),
span: Some(span),
help: None,
inner: vec![],
})?;

Ok(Self::new(false, dataframe))
}
Expand Down Expand Up @@ -295,14 +300,15 @@ impl NuDataFrame {
.df
.get_columns()
.first()
.expect("We have already checked that the width is 1");
.expect("We have already checked that the width is 1")
.as_materialized_series();

Ok(series.clone())
}

pub fn get_value(&self, row: usize, span: Span) -> Result<Value, ShellError> {
let series = self.as_series(span)?;
let column = conversion::create_column(&series, row, row + 1, span)?;
let column = conversion::create_column_from_series(&series, row, row + 1, span)?;

if column.len() == 0 {
Err(ShellError::AccessEmptyContent { span })
Expand Down
Loading

0 comments on commit 0172ad8

Please sign in to comment.