Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arc in Datatype to reduce memory overhead #3

Merged
merged 15 commits into from
Jan 15, 2024
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::new(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)].into()),
vec![array.boxed()],
None,
)
Expand Down
3 changes: 2 additions & 1 deletion examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crossbeam_channel::unbounded;

use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

Expand All @@ -18,7 +19,7 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
let mut reader = read::ReaderBuilder::new().from_path(path)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Box::new(fields);
let fields = Arc::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand Down
7 changes: 5 additions & 2 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::io::{Cursor, Seek, Write};
use std::{
io::{Cursor, Seek, Write},
sync::Arc,
};

use re_arrow2::array::*;
use re_arrow2::chunk::Chunk;
Expand All @@ -10,7 +13,7 @@ use re_arrow2::io::ipc::write;
fn main() -> Result<()> {
// declare an extension.
let extension_type =
DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None);
DataType::Extension("date16".to_string(), Arc::new(DataType::UInt16), None);

// initialize an array with it.
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());
Expand Down
5 changes: 2 additions & 3 deletions src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::hash::Hash;
use std::hint::unreachable_unchecked;
use std::{hash::Hash, hint::unreachable_unchecked, sync::Arc};

use crate::{
bitmap::{
Expand Down Expand Up @@ -292,7 +291,7 @@ impl<K: DictionaryKey> DictionaryArray<K> {
}

pub(crate) fn default_data_type(values_datatype: DataType) -> DataType {
DataType::Dictionary(K::KEY_TYPE, Box::new(values_datatype), false)
DataType::Dictionary(K::KEY_TYPE, Arc::new(values_datatype), false)
}

/// Slices this [`DictionaryArray`].
Expand Down
2 changes: 1 addition & 1 deletion src/array/dictionary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<K: DictionaryKey, M: MutableArray> MutableDictionaryArray<K, M> {
fn from_value_map(value_map: ValueMap<K, M>) -> Self {
let keys = MutablePrimitiveArray::<K>::new();
let data_type =
DataType::Dictionary(K::KEY_TYPE, Box::new(value_map.data_type().clone()), false);
DataType::Dictionary(K::KEY_TYPE, Arc::new(value_map.data_type().clone()), false);
Self {
data_type,
map: value_map,
Expand Down
4 changes: 3 additions & 1 deletion src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -203,7 +205,7 @@ impl FixedSizeListArray {

/// Returns a [`DataType`] consistent with [`FixedSizeListArray`].
pub fn default_datatype(data_type: DataType, size: usize) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
let field = Arc::new(Field::new("item", data_type, true));
DataType::FixedSizeList(field, size)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/array/fixed_size_list/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<M: MutableArray> MutableFixedSizeListArray<M> {
/// Creates a new [`MutableFixedSizeListArray`] from a [`MutableArray`] and size.
pub fn new_with_field(values: M, name: &str, nullable: bool, size: usize) -> Self {
let data_type = DataType::FixedSizeList(
Box::new(Field::new(name, values.data_type().clone(), nullable)),
Arc::new(Field::new(name, values.data_type().clone(), nullable)),
size,
);
Self::new_from(values, data_type, size)
Expand Down
4 changes: 3 additions & 1 deletion src/array/list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -188,7 +190,7 @@ impl<O: Offset> ListArray<O> {
impl<O: Offset> ListArray<O> {
/// Returns a default [`DataType`]: inner field is named "item" and is nullable
pub fn default_datatype(data_type: DataType) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
let field = Arc::new(Field::new("item", data_type, true));
if O::IS_LARGE {
DataType::LargeList(field)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/array/list/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<O: Offset, M: MutableArray> MutableListArray<O, M> {

/// Creates a new [`MutableListArray`] from a [`MutableArray`].
pub fn new_with_field(values: M, name: &str, nullable: bool) -> Self {
let field = Box::new(Field::new(name, values.data_type().clone(), nullable));
let field = Arc::new(Field::new(name, values.data_type().clone(), nullable));
let data_type = if O::IS_LARGE {
DataType::LargeList(field)
} else {
Expand Down
6 changes: 4 additions & 2 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -28,7 +30,7 @@ pub use mutable::*;
/// Field::new("c", DataType::Int32, false),
/// ];
///
/// let array = StructArray::new(DataType::Struct(fields), vec![boolean, int], None);
/// let array = StructArray::new(DataType::Struct(std::sync::Arc::new(fields)), vec![boolean, int], None);
/// ```
#[derive(Clone)]
pub struct StructArray {
Expand Down Expand Up @@ -153,7 +155,7 @@ impl StructArray {
impl StructArray {
/// Deconstructs the [`StructArray`] into its individual components.
#[must_use]
pub fn into_data(self) -> (Vec<Field>, Vec<Box<dyn Array>>, Option<Bitmap>) {
pub fn into_data(self) -> (Arc<Vec<Field>>, Vec<Box<dyn Array>>, Option<Bitmap>) {
let Self {
data_type,
values,
Expand Down
4 changes: 2 additions & 2 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl UnionArray {
.try_for_each(|(index, (data_type, child))| {
if data_type != child {
Err(Error::oos(format!(
"The children DataTypes of a UnionArray must equal the children data types.
"The children DataTypes of a UnionArray must equal the children data types.
However, the field {index} has data type {data_type:?} but the value has data type {child:?}"
)))
} else {
Expand Down Expand Up @@ -352,7 +352,7 @@ impl UnionArray {
fn try_get_all(data_type: &DataType) -> Result<UnionComponents, Error> {
match data_type.to_logical_type() {
DataType::Union(fields, ids, mode) => {
Ok((fields, ids.as_ref().map(|x| x.as_ref()), *mode))
Ok((fields, ids.as_ref().map(|x| x.as_slice()), *mode))
}
_ => Err(Error::oos(
"The UnionArray requires a logical type of DataType::Union",
Expand Down
8 changes: 4 additions & 4 deletions src/compute/arithmetics/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -96,7 +96,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down Expand Up @@ -161,7 +161,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -176,7 +176,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down
4 changes: 2 additions & 2 deletions src/compute/cast/dictionary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
} else {
let data_type = DataType::Dictionary(
K2::KEY_TYPE,
Box::new(values.data_type().clone()),
std::sync::Arc::new(values.data_type().clone()),
is_ordered,
);
// Safety: this is safe because given a type `T` that fits in a `usize`, casting it to type `P` either overflows or also fits in a `usize`
Expand All @@ -116,7 +116,7 @@ where
} else {
let data_type = DataType::Dictionary(
K2::KEY_TYPE,
Box::new(values.data_type().clone()),
std::sync::Arc::new(values.data_type().clone()),
is_ordered,
);
// some of the values may not fit in `usize` and thus this needs to be checked
Expand Down
2 changes: 1 addition & 1 deletion src/compute/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ fn cast_list_to_fixed_size_list<O: Offset>(
);
let new_values = cast(sliced_values.as_ref(), inner.data_type(), options)?;
Ok(FixedSizeListArray::new(
DataType::FixedSizeList(Box::new(inner.clone()), size),
DataType::FixedSizeList(std::sync::Arc::new(inner.clone()), size),
new_values,
list.validity().cloned(),
))
Expand Down
3 changes: 2 additions & 1 deletion src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::hash::Hash;
use std::sync::Arc;

use num_traits::{AsPrimitive, Float, ToPrimitive};

Expand Down Expand Up @@ -406,7 +407,7 @@ pub fn timestamp_to_timestamp(
from: &PrimitiveArray<i64>,
from_unit: TimeUnit,
to_unit: TimeUnit,
tz: &Option<String>,
tz: &Option<Arc<String>>,
) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
Expand Down
6 changes: 4 additions & 2 deletions src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use chrono::Datelike;

use crate::{
Expand Down Expand Up @@ -127,7 +129,7 @@ pub fn utf8_to_naive_timestamp_ns<O: Offset>(from: &Utf8Array<O>) -> PrimitiveAr

pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
from: &dyn Array,
timezone: String,
timezone: Arc<String>,
) -> Result<Box<dyn Array>> {
let from = from.as_any().downcast_ref().unwrap();
utf8_to_timestamp_ns::<O>(from, timezone)
Expand All @@ -138,7 +140,7 @@ pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
/// [`crate::temporal_conversions::utf8_to_timestamp_ns`] applied for RFC3339 formatting
pub fn utf8_to_timestamp_ns<O: Offset>(
from: &Utf8Array<O>,
timezone: String,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
utf8_to_timestamp_ns_(from, RFC3339, timezone)
}
Expand Down
Loading
Loading