diff --git a/Cargo.lock b/Cargo.lock index 829bd058b945..fc797c7259f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" dependencies = [ "cfg-if", - "const-random", "getrandom", "once_cell", "serde", @@ -192,15 +191,14 @@ dependencies = [ [[package]] name = "arrow2" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a4c5b03335bc1cb0fd9f5297f8fd3bbfd6fb04f3cb0bc7d6c91b7128cb8336a" +version = "0.17.0" +source = "git+https://github.com/rerun-io/arrow2?branch=cmc/arc_datatype#3c3c6ed183fb7c1f625ed2bcae6b00265141b727" dependencies = [ "ahash 0.8.2", "arrow-format", "bytemuck", "chrono", - "comfy-table 5.0.1", + "comfy-table", "dyn-clone", "either", "ethnum", @@ -217,9 +215,8 @@ dependencies = [ [[package]] name = "arrow2_convert" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5c08c10a0089a07e63614a21e9c581175a934bbbecc0f41983b83810fe6fc97" +version = "0.5.0" +source = "git+https://github.com/rerun-io/arrow2-convert?branch=cmc/arc_datatype#55f4afd287306ee2f3b909195883f9faf4393515" dependencies = [ "arrow2", "arrow2_convert_derive", @@ -229,9 +226,8 @@ dependencies = [ [[package]] name = "arrow2_convert_derive" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9847142964fcd99c30d56bf51ab4e1b0a3daa1ff7d59f44527088cdd42fad49f" +version = "0.5.0" +source = "git+https://github.com/rerun-io/arrow2-convert?branch=cmc/arc_datatype#55f4afd287306ee2f3b909195883f9faf4393515" dependencies = [ "proc-macro-error", "proc-macro2", @@ -634,13 +630,8 @@ version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ - "iana-time-zone", - "js-sys", "num-integer", "num-traits", - "time 0.1.44", - "wasm-bindgen", - "winapi", ] [[package]] @@ -808,17 +799,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "comfy-table" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b103d85ca6e209388771bfb7aa6b68a7aeec4afbf6f0a0264bfbf50360e5212e" -dependencies = [ - "strum 0.23.0", - "strum_macros 0.23.1", - "unicode-width", -] - [[package]] name = "comfy-table" version = "6.1.4" @@ -826,8 +806,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d" dependencies = [ "crossterm", - "strum 0.24.1", - "strum_macros 0.24.3", + "strum", + "strum_macros", "unicode-width", ] @@ -863,28 +843,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "const-random" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" -dependencies = [ - "const-random-macro", - "proc-macro-hack", -] - -[[package]] -name = "const-random-macro" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" -dependencies = [ - "getrandom", - "once_cell", - "proc-macro-hack", - "tiny-keccak", -] - [[package]] name = "core-foundation" version = "0.9.3" @@ -1104,50 +1062,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" -[[package]] -name = "cxx" -version = "1.0.82" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4a41a86530d0fe7f5d9ea779916b7cadd2d4f9add748b99c2c029cbbdfaf453" -dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.82" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06416d667ff3e3ad2df1cd8cd8afae5da26cf9cec4d0825040f88b5ca659a2f0" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.82" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "820a9a2af1669deeef27cb271f476ffd196a2c4b6731336011e0ba63e2c7cf71" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.82" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08a6e2fcc370a089ad3b4aaf54db3b1b4cee38ddabce5896b33eb693275f470" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "d3d12" version = "0.6.0" @@ -1872,7 +1786,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -2186,30 +2100,6 @@ dependencies = [ "want", ] -[[package]] -name = "iana-time-zone" -version = "0.1.53" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c122667b287044802d6ce17ee2ddf13207ed924c712de9a66a5814d5b64765" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "winapi", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" -dependencies = [ - "cxx", - "cxx-build", -] - [[package]] name = "id-arena" version = "2.2.1" @@ -2541,15 +2431,6 @@ dependencies = [ "libc", ] -[[package]] -name = "link-cplusplus" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369" -dependencies = [ - "cc", -] - [[package]] name = "linked-hash-map" version = "0.5.6" @@ -2752,7 +2633,7 @@ checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys 0.42.0", ] @@ -2764,22 +2645,24 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "multiversion" -version = "0.6.1" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "025c962a3dd3cc5e0e520aa9c612201d127dcdf28616974961a649dca64f5373" +checksum = "e6a87eede2251ca235e5573086d01d2ab6b59dfaea54c2be10f9320980f7e8f7" dependencies = [ "multiversion-macros", + "target-features", ] [[package]] name = "multiversion-macros" -version = "0.6.1" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af" +checksum = "1af1abf82261d780d114014eff4b555e47d823f3b84f893c4388572b40e089fb" dependencies = [ "proc-macro2", "quote", "syn", + "target-features", ] [[package]] @@ -2923,31 +2806,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" -dependencies = [ - "num-bigint", - "num-complex", - "num-integer", - "num-iter", - "num-rational", - "num-traits", -] - -[[package]] -name = "num-bigint" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-complex" version = "0.4.2" @@ -2978,17 +2836,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-iter" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-rational" version = "0.4.1" @@ -2996,7 +2843,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" dependencies = [ "autocfg", - "num-bigint", "num-integer", "num-traits", ] @@ -3362,33 +3208,35 @@ dependencies = [ [[package]] name = "polars-arrow" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38fad08b4d4d3e21e8935d5d3d4dfcbb5ca65ffc5fa19364c254751bc1d62f93" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" dependencies = [ "arrow2", "hashbrown 0.13.1", - "num", + "multiversion", + "num-traits", + "polars-error", "thiserror", ] [[package]] name = "polars-core" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2256086865cfa7db31af5e66a088f0089bff1ba9692f7195d1661497cebdca05" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" dependencies = [ "ahash 0.8.2", - "anyhow", "arrow2", "bitflags", "chrono", - "comfy-table 6.1.4", + "comfy-table", + "either", "hashbrown 0.13.1", "indexmap", - "num", + "num-traits", "once_cell", "polars-arrow", + "polars-error", + "polars-row", "polars-utils", "rayon", "regex", @@ -3398,26 +3246,48 @@ dependencies = [ "xxhash-rust", ] +[[package]] +name = "polars-error" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" +dependencies = [ + "arrow2", + "regex", + "thiserror", +] + [[package]] name = "polars-ops" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31168b91a6edddb8dc4dcc4b53af15e9ffefc23f0bff054c1975423e01f3ae15" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" dependencies = [ "arrow2", + "either", + "memchr", "polars-arrow", "polars-core", "polars-utils", + "smartstring", +] + +[[package]] +name = "polars-row" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" +dependencies = [ + "arrow2", + "polars-error", + "polars-utils", ] [[package]] name = "polars-utils" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda7fb126f8c77d0a106620fd525bc4fdd1c7e32cb100aa9a82ba3f7c969485a" +version = "0.28.0" +source = "git+https://github.com/rerun-io/polars?branch=cmc/arc_datatype#4a3e5e6d61f42f8c9d33da88b950a40833835ae6" dependencies = [ "once_cell", "rayon", + "smartstring", ] [[package]] @@ -3500,12 +3370,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.47" @@ -3783,7 +3647,7 @@ dependencies = [ "serde_json", "sha2", "thiserror", - "time 0.3.20", + "time", "ureq", "uuid", "web-sys", @@ -3821,7 +3685,7 @@ name = "re_build_build_info" version = "0.4.0" dependencies = [ "anyhow", - "time 0.3.20", + "time", ] [[package]] @@ -3871,7 +3735,7 @@ version = "0.4.0" dependencies = [ "arrow2", "arrow2_convert", - "comfy-table 6.1.4", + "comfy-table", "re_tuid", ] @@ -3957,7 +3821,7 @@ dependencies = [ "serde_bytes", "smallvec", "thiserror", - "time 0.3.20", + "time", "typenum", "uuid", ] @@ -4145,8 +4009,8 @@ dependencies = [ "re_log", "serde", "serde_json", - "strum 0.24.1", - "strum_macros 0.24.3", + "strum", + "strum_macros", "sublime_fuzzy", ] @@ -4201,7 +4065,7 @@ dependencies = [ "slotmap", "smallvec", "thiserror", - "time 0.3.20", + "time", "uuid", "vec1", "wasm-bindgen-futures", @@ -4555,12 +4419,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "scratch" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" - [[package]] name = "sct" version = "0.7.0" @@ -4856,32 +4714,13 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" - [[package]] name = "strum" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros 0.24.3", -] - -[[package]] -name = "strum_macros" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn", + "strum_macros", ] [[package]] @@ -4940,6 +4779,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "target-features" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24840de800c1707d75c800893dbd727a5e1501ce921944e602f0698167491e36" + [[package]] name = "target-lexicon" version = "0.12.5" @@ -5035,17 +4880,6 @@ dependencies = [ "weezl", ] -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "time" version = "0.3.20" @@ -5074,15 +4908,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tiny-skia" version = "0.8.2" @@ -5503,12 +5328,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 6a433c6954ee..a5bf5f0e9098 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,8 +52,8 @@ rerun = { path = "crates/rerun", version = "0.4.0" } ahash = "0.8" anyhow = "1.0" -arrow2 = "0.16" -arrow2_convert = "0.4.2" +arrow2 = "0.17" +arrow2_convert = "0.5.0" clap = "4.0" comfy-table = { version = "6.1", default-features = false } ctrlc = { version = "3.0", features = ["termination"] } @@ -76,9 +76,9 @@ macaw = "0.18" mimalloc = "0.1.29" ndarray = "0.15" parking_lot = "0.12" -polars-core = "0.27.1" -polars-lazy = "0.27.1" -polars-ops = "0.27.1" +polars-core = "0.28.0" +polars-lazy = "0.28.0" +polars-ops = "0.28.0" puffin = "0.14" smallvec = { version = "1.0", features = ["const_generics", "union"] } thiserror = "1.0" @@ -112,3 +112,7 @@ debug = true # If that is not possible, patch to a branch that has a PR open on the upstream repo. # As a last resport, patch with a commit to our own repository. # ALWAYS document what PR the commit hash is part of, or when it was merged into the upstream trunk. +arrow2 = { git = "https://github.com/rerun-io/arrow2", branch = "cmc/arc_datatype" } +arrow2_convert = { git = "https://github.com/rerun-io/arrow2-convert", branch = "cmc/arc_datatype" } +polars-core = { git = "https://github.com/rerun-io/polars", branch = "cmc/arc_datatype" } +polars-ops = { git = "https://github.com/rerun-io/polars", branch = "cmc/arc_datatype" } diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 6e9aeda7a922..3f20e2b9a7de 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -322,7 +322,7 @@ fn build_table(n: usize, packed: bool) -> DataTable { // Do a serialization roundtrip to pack everything in contiguous memory. if packed { let (schema, columns) = table.serialize().unwrap(); - table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap(); + table = DataTable::deserialize(TableId::ZERO, &schema, columns, None).unwrap(); } table diff --git a/crates/re_arrow_store/src/arrow_util.rs b/crates/re_arrow_store/src/arrow_util.rs index ef119bd51b31..3555545089ab 100644 --- a/crates/re_arrow_store/src/arrow_util.rs +++ b/crates/re_arrow_store/src/arrow_util.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use ahash::HashSet; use arrow2::{ array::{ growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray, @@ -63,7 +66,7 @@ impl ArrayExt for dyn Array { // Recursively clean the contents let typed_arr = self.as_any().downcast_ref::>().unwrap(); let clean_vals = typed_arr.values().as_ref().clean_for_polars(); - let clean_data = DataType::List(Box::new(Field::new( + let clean_data = DataType::List(Arc::new(Field::new( &field.name, clean_vals.data_type().clone(), field.is_nullable, @@ -81,7 +84,7 @@ impl ArrayExt for dyn Array { // Recursively clean the contents let typed_arr = self.as_any().downcast_ref::>().unwrap(); let clean_vals = typed_arr.values().as_ref().clean_for_polars(); - let clean_data = DataType::LargeList(Box::new(Field::new( + let clean_data = DataType::LargeList(Arc::new(Field::new( &field.name, clean_vals.data_type().clone(), field.is_nullable, @@ -99,7 +102,7 @@ impl ArrayExt for dyn Array { // Recursively clean the contents and convert `FixedSizeListArray` -> `ListArray` let typed_arr = self.as_any().downcast_ref::().unwrap(); let clean_vals = typed_arr.values().as_ref().clean_for_polars(); - let clean_data = DataType::List(Box::new(Field::new( + let clean_data = DataType::List(Arc::new(Field::new( &field.name, clean_vals.data_type().clone(), field.is_nullable, @@ -123,10 +126,10 @@ impl ArrayExt for dyn Array { .iter() .map(|v| v.as_ref().clean_for_polars()) .collect_vec(); - let clean_fields = itertools::izip!(fields, &clean_vals) + let clean_fields = itertools::izip!(fields.iter(), &clean_vals) .map(|(f, v)| Field::new(&f.name, v.data_type().clone(), f.is_nullable)) .collect_vec(); - let clean_data = DataType::Struct(clean_fields); + let clean_data = DataType::Struct(Arc::new(clean_fields)); StructArray::try_new(clean_data, clean_vals, typed_arr.validity().cloned()) .unwrap() .boxed() @@ -144,12 +147,12 @@ impl ArrayExt for dyn Array { let ids = ids .clone() - .unwrap_or_else(|| (0i32..(clean_vals.len() as i32)).collect_vec()); + .unwrap_or_else(|| Arc::new((0i32..(clean_vals.len() as i32)).collect_vec())); // For Dense Unions, the value-arrays need to be padded to the // correct length, which we do by growing using the existing type // table. - let padded_vals = itertools::izip!(&clean_vals, &ids) + let padded_vals = itertools::izip!(clean_vals.iter(), ids.iter()) .map(|(dense, id)| { let mut next = 0; let mut grow = make_growable(&[dense.as_ref()], true, self.len()); @@ -165,12 +168,12 @@ impl ArrayExt for dyn Array { }) .collect_vec(); - let clean_field_types = itertools::izip!(fields, &clean_vals) + let clean_field_types = itertools::izip!(fields.iter(), &clean_vals) .map(|(f, v)| Field::new(&f.name, v.data_type().clone(), f.is_nullable)) .collect_vec(); // The new type will be a struct - let clean_data = DataType::Struct(clean_field_types); + let clean_data = DataType::Struct(Arc::new(clean_field_types)); StructArray::try_new(clean_data, padded_vals, typed_arr.validity().cloned()) .unwrap() @@ -189,11 +192,11 @@ impl ArrayExt for dyn Array { let ids = ids .clone() - .unwrap_or_else(|| (0i32..(clean_vals.len() as i32)).collect_vec()); + .unwrap_or_else(|| Arc::new((0i32..(clean_vals.len() as i32)).collect_vec())); // For Sparse Unions, the value-arrays is already the right // correct length, but should have a validity derived from the types array. - let padded_vals = itertools::izip!(&clean_vals, &ids) + let padded_vals = itertools::izip!(&clean_vals, ids.iter()) .map(|(sparse, id)| { let validity = Bitmap::from( typed_arr @@ -206,12 +209,12 @@ impl ArrayExt for dyn Array { }) .collect_vec(); - let clean_field_types = itertools::izip!(fields, &clean_vals) + let clean_field_types = itertools::izip!(fields.iter(), &clean_vals) .map(|(f, v)| Field::new(&f.name, v.data_type().clone(), f.is_nullable)) .collect_vec(); // The new type will be a struct - let clean_data = DataType::Struct(clean_field_types); + let clean_data = DataType::Struct(Arc::new(clean_field_types)); StructArray::try_new(clean_data, padded_vals, typed_arr.validity().cloned()) .unwrap() @@ -246,15 +249,15 @@ fn test_clean_for_polars_modify() { assert_eq!( *cell.datatype(), DataType::Union( - vec![ + Arc::new(vec![ Field::new("Unknown", DataType::Boolean, false), Field::new( "Rigid3", - DataType::Struct(vec![ + DataType::Struct(Arc::new(vec![ Field::new( "rotation", DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), + Arc::new(Field::new("item", DataType::Float32, false)), 4 ), false @@ -262,21 +265,21 @@ fn test_clean_for_polars_modify() { Field::new( "translation", DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), + Arc::new(Field::new("item", DataType::Float32, false)), 3 ), false ) - ]), + ])), false ), Field::new( "Pinhole", - DataType::Struct(vec![ + DataType::Struct(Arc::new(vec![ Field::new( "image_from_cam", DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), + Arc::new(Field::new("item", DataType::Float32, false)), 9 ), false, @@ -284,15 +287,15 @@ fn test_clean_for_polars_modify() { Field::new( "resolution", DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), + Arc::new(Field::new("item", DataType::Float32, false)), 2 ), true, ), - ]), + ])), false ) - ], + ]), None, UnionMode::Dense ), @@ -302,40 +305,40 @@ fn test_clean_for_polars_modify() { assert_eq!( *cleaned.data_type(), - DataType::Struct(vec![ + DataType::Struct(Arc::new(vec![ Field::new("Unknown", DataType::Boolean, false), Field::new( "Rigid3", - DataType::Struct(vec![ + DataType::Struct(Arc::new(vec![ Field::new( "rotation", - DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + DataType::List(Arc::new(Field::new("item", DataType::Float32, false)),), false ), Field::new( "translation", - DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + DataType::List(Arc::new(Field::new("item", DataType::Float32, false)),), false ) - ]), + ])), false ), Field::new( "Pinhole", - DataType::Struct(vec![ + DataType::Struct(Arc::new(vec![ Field::new( "image_from_cam", - DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + DataType::List(Arc::new(Field::new("item", DataType::Float32, false))), false, ), Field::new( "resolution", - DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + DataType::List(Arc::new(Field::new("item", DataType::Float32, false))), true, ), - ]), + ])), false ) - ],), + ],)), ); } diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 3e62a48367bb..d87337fc5cce 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -1,5 +1,11 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use re_data_store::log_db::collect_datatypes; +use re_log_types::{ + datagen::{build_frame_nr, build_some_point2d}, + DataCell, LogMsg, TimeInt, TimePoint, Timeline, +}; + thread_local! { static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0); } @@ -48,18 +54,13 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, DataRow, RecordingId, RowId}; +use re_log_types::{entity_path, DataRow, DataTable, RecordingId, RowId, TableId}; fn main() { log_messages(); } fn log_messages() { - use re_log_types::{ - datagen::{build_frame_nr, build_some_point2d}, - LogMsg, TimeInt, TimePoint, Timeline, - }; - // Note: we use Box in this function so that we also count the "static" // part of all the data, i.e. its `std::mem::size_of`. @@ -89,6 +90,7 @@ fn log_messages() { bytes_used } + const NUM_ROWS: usize = 100_000; const NUM_POINTS: usize = 1_000; let recording_id = RecordingId::random(); @@ -104,55 +106,67 @@ fn log_messages() { drop(entity_path); } - { + fn arrow_payload(recording_id: RecordingId, num_rows: usize, num_points: usize, packed: bool) { + println!("--- {num_rows} rows each containing {num_points} points (packed={packed}) ---"); let used_bytes_start = live_bytes(); - let table = Box::new( - DataRow::from_cells1( - RowId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - 1, - build_some_point2d(1), - ) - .into_table(), - ); + let table = Box::new(create_table(num_rows, num_points, packed)); let table_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg( recording_id, table.to_arrow_msg().unwrap(), )); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); + println!( + "Arrow payload containing {num_points}x Pos2 uses {} bytes in RAM", + re_format::format_bytes(table_bytes as _) + ); let encoded = encode_log_msg(&log_msg); println!( - "Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", - size_decoded(&encoded), encoded.len() + "Arrow LogMsg containing {num_points}x Pos2 uses {}-{} bytes in RAM, and {} bytes encoded", + re_format::format_bytes(size_decoded(&encoded) as _), + re_format::format_bytes(log_msg_bytes as _), + re_format::format_bytes(encoded.len() as _), ); + println!(); } + let num_rows = [1, NUM_ROWS]; + let num_points = [1, NUM_POINTS]; + let packed = [false, true]; + + for (num_rows, num_points, packed) in num_rows + .into_iter() + .flat_map(|num_row| std::iter::repeat(num_row).zip(num_points)) + .flat_map(|num_row| std::iter::repeat(num_row).zip(packed)) + .map(|((a, b), c)| (a, b, c)) { - let used_bytes_start = live_bytes(); - let table = Box::new( - DataRow::from_cells1( - RowId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - NUM_POINTS as _, - build_some_point2d(NUM_POINTS), - ) - .into_table(), - ); - let table_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg( - recording_id, - table.to_arrow_msg().unwrap(), - )); - let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); - let encoded = encode_log_msg(&log_msg); - println!( - "Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", - size_decoded(&encoded), encoded.len() - ); + arrow_payload(recording_id, num_rows, num_points, packed); } } + +fn create_table(num_rows: usize, num_points: usize, packed: bool) -> DataTable { + let rows = (0..num_rows).map(|i| { + DataRow::from_cells1( + RowId::random(), + entity_path!("points"), + [build_frame_nr((i as i64).into())], + num_points as _, + build_some_point2d(num_points), + ) + }); + let mut table = DataTable::from_rows(TableId::random(), rows); + + // Do a serialization roundtrip to pack everything in contiguous memory. + if packed { + let (schema, columns) = table.serialize().unwrap(); + + let mut datatypes = Default::default(); + for column in columns.arrays() { + collect_datatypes(&mut datatypes, &**column); + } + + table = DataTable::deserialize(TableId::ZERO, &schema, columns, Some(&datatypes)).unwrap(); + } + + table +} diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index b520262aa4f9..a4d9e6de3c92 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -1,12 +1,15 @@ use std::collections::BTreeMap; +use ahash::HashSet; use nohash_hasher::IntMap; use re_arrow_store::{DataStoreConfig, TimeInt}; use re_log_types::{ - component_types::InstanceKey, ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, - DataCell, DataRow, DataTable, EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, - RecordingId, RecordingInfo, RowId, TimePoint, Timeline, + component_types::InstanceKey, + external::arrow2::{chunk::Chunk, datatypes::DataType}, + ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, + EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, RecordingId, RecordingInfo, RowId, + TimePoint, Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -26,6 +29,8 @@ pub struct EntityDb { /// Stores all components for all entities for all timelines. pub data_store: re_arrow_store::DataStore, + + pub datatypes: HashSet, } impl Default for EntityDb { @@ -38,6 +43,7 @@ impl Default for EntityDb { InstanceKey::name(), DataStoreConfig::default(), ), + datatypes: Default::default(), } } } @@ -54,11 +60,34 @@ impl EntityDb { .or_insert_with(|| entity_path.clone()); } - fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { + fn try_add_arrow_msg(&mut self, msg: ArrowMsg) -> Result<(), Error> { crate::profile_function!(); + let ArrowMsg { + table_id, + timepoint_max, + schema, + chunk, + } = msg; + + // TODO: move everything in datatable? + // TODO: this obviously cannot be doing all these crazy clones + { + crate::profile_scope!("collect_datatypes"); + for column in chunk.arrays() { + collect_datatypes(&mut self.datatypes, &**column); + } + } + + let msg = ArrowMsg { + table_id, + timepoint_max, + schema, + chunk, + }; + // TODO(#1760): Compute the size of the datacells in the batching threads on the clients. - let mut table = DataTable::from_arrow_msg(msg)?; + let mut table = DataTable::from_arrow_msg(msg, Some(&self.datatypes))?; table.compute_all_size_bytes(); // TODO(#1619): batch all of this @@ -142,6 +171,7 @@ impl EntityDb { times_per_timeline, tree, data_store: _, // purged before this function is called + datatypes: _, } = self; { @@ -212,17 +242,17 @@ impl LogDb { self.num_rows() == 0 } - pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> { + pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> { crate::profile_function!(); - match &msg { + match msg { LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg), LogMsg::EntityPathOpMsg(_, msg) => { let EntityPathOpMsg { row_id, time_point, path_op, - } = msg; + } = &msg; self.entity_op_msgs.insert(*row_id, msg.clone()); self.entity_db.add_path_op(*row_id, time_point, path_op); } @@ -233,8 +263,8 @@ impl LogDb { Ok(()) } - fn add_begin_recording_msg(&mut self, msg: &BeginRecordingMsg) { - self.recording_msg = Some(msg.clone()); + fn add_begin_recording_msg(&mut self, msg: BeginRecordingMsg) { + self.recording_msg = Some(msg); } /// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `LogDb`. @@ -278,3 +308,140 @@ impl LogDb { entity_db.purge(&cutoff_times, &drop_row_ids); } } + +// --- + +use itertools::Itertools; +use re_log_types::external::arrow2::{ + array::{ + growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray, + }, + bitmap::Bitmap, + datatypes::{Field, UnionMode}, + offset::Offsets, +}; +use std::sync::Arc; + +pub fn collect_datatypes(datatypes: &mut HashSet, array: &dyn Array) { + crate::profile_function!(); + + fn fill(datatypes: &mut HashSet, array: &dyn Array) { + let datatype = array.data_type().clone(); + + if !datatypes.insert(datatype.clone()) { + return; + } + + match &datatype { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + fill(datatypes, array.values().as_ref()); + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + array + .values() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .for_each(|_| {}); + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + array + .fields() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .for_each(|_| {}); + } + _ => {} + } + } + + fill(datatypes, array); +} + +// TODO +// - we shouldnt have to pay for crazy expensive virtual clones... datatype should be overridable +fn dedupe_datatypes(datatypes: &mut HashSet, array: &dyn Array) -> Box { + crate::profile_function!(); + + fn fill(datatypes: &mut HashSet, mut array: &dyn Array) -> Box { + let datatype = if let Some(datatype) = datatypes.get(array.data_type()) { + datatype.clone() + } else { + let datatype = array.data_type().clone(); + datatypes.insert(datatype.clone()); + datatype + }; + + match array.data_type() { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + FixedSizeListArray::new( + datatype, + fill(datatypes, array.values().as_ref()), + array.validity().cloned(), + ) + .boxed() + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + StructArray::new( + datatype, + array + .values() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .collect_vec(), + array.validity().cloned(), + ) + .boxed() + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + UnionArray::new( + datatype, + array.types().clone(), + array + .fields() + .iter() + .map(|v| fill(datatypes, v.as_ref())) + .collect_vec(), + array.offsets().cloned(), + ) + .boxed() + } + _ => array.to_boxed(), + } + } + + fill(datatypes, array) +} diff --git a/crates/re_format/src/arrow.rs b/crates/re_format/src/arrow.rs index fcc8a4133cee..434168ef8b63 100644 --- a/crates/re_format/src/arrow.rs +++ b/crates/re_format/src/arrow.rs @@ -183,7 +183,10 @@ impl std::fmt::Display for DisplayDataType { DataType::Decimal(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal256", DataType::Extension(name, data_type, _) => { - let s = format!("extension<{name}>[{}]", DisplayDataType(*data_type.clone())); + let s = format!( + "extension<{name}>[{}]", + DisplayDataType((**data_type).clone()) + ); return f.write_str(&s); } }; diff --git a/crates/re_log_encoding/benches/msg_encode_benchmark.rs b/crates/re_log_encoding/benches/msg_encode_benchmark.rs index ddc7fc9740d9..abddad67b499 100644 --- a/crates/re_log_encoding/benches/msg_encode_benchmark.rs +++ b/crates/re_log_encoding/benches/msg_encode_benchmark.rs @@ -54,7 +54,7 @@ fn decode_tables(messages: &[LogMsg]) -> Vec { .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg { - DataTable::from_arrow_msg(arrow_msg).unwrap() + DataTable::from_arrow_msg(arrow_msg.clone(), None).unwrap() // TODO } else { unreachable!() } diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 54e5c01b68e5..8de641aa114f 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -155,7 +155,7 @@ mod tests { let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); let table_out = { - let mut table = DataTable::from_arrow_msg(&msg_out).unwrap(); + let mut table = DataTable::from_arrow_msg(msg_out.clone(), None).unwrap(); table.compute_all_size_bytes(); table }; diff --git a/crates/re_log_types/src/component_types/arrow_convert_shims.rs b/crates/re_log_types/src/component_types/arrow_convert_shims.rs index 8d196842169d..7088b7f66109 100644 --- a/crates/re_log_types/src/component_types/arrow_convert_shims.rs +++ b/crates/re_log_types/src/component_types/arrow_convert_shims.rs @@ -11,6 +11,7 @@ use arrow2_convert::{ ArrowField, ArrowSerialize, }; +// TODO: ? /// Shim to enable zero-copy arrow deserialization for `Buffer` /// Can be removed when: [arrow2-convert#103](https://github.com/DataEngineeringLabs/arrow2-convert/pull/103) lands #[derive(Clone, Debug, PartialEq, ArrowField, ArrowSerialize)] @@ -71,7 +72,7 @@ impl<'a> Iterator for BufferBinaryArrayIter<'a> { } let (start, end) = self.array.offsets().start_end(self.index); self.index += 1; - Some(Some(self.array.values().clone().slice(start, end - start))) + Some(Some(self.array.values().clone().sliced(start, end - start))) } } } diff --git a/crates/re_log_types/src/component_types/mod.rs b/crates/re_log_types/src/component_types/mod.rs index 562357f64a1b..1498af735bbb 100644 --- a/crates/re_log_types/src/component_types/mod.rs +++ b/crates/re_log_types/src/component_types/mod.rs @@ -3,6 +3,8 @@ //! The SDK is responsible for submitting component columns that conforms to these schemas. The //! schemas are additionally documented in doctests. +use std::sync::Arc; + use arrow2::{ array::{FixedSizeListArray, MutableFixedSizeListArray, PrimitiveArray}, datatypes::{DataType, Field}, @@ -140,7 +142,7 @@ where #[inline] fn data_type() -> DataType { - arrow2::datatypes::DataType::FixedSizeList(Box::new(::field("item")), SIZE) + arrow2::datatypes::DataType::FixedSizeList(Arc::new(::field("item")), SIZE) } } diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index d8b19a9eabc4..ba4d4823d193 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -717,10 +717,10 @@ fn test_arrow_estimated_size_bytes() { let array = { let x = Float64Array::from_vec(data.iter().map(|p| p.x).collect()).boxed(); let y = Float64Array::from_vec(data.iter().map(|p| p.y).collect()).boxed(); - let fields = vec![ + let fields = Arc::new(vec![ Field::new("x", DataType::Float64, false), Field::new("y", DataType::Float64, false), - ]; + ]); StructArray::new(DataType::Struct(fields), vec![x, y], None).boxed() }; @@ -754,10 +754,10 @@ fn test_arrow_estimated_size_bytes() { Float64Array::from_vec(data.iter().flatten().map(|p| p.x).collect()).boxed(); let y = Float64Array::from_vec(data.iter().flatten().map(|p| p.y).collect()).boxed(); - let fields = vec![ + let fields = Arc::new(vec![ Field::new("x", DataType::Float64, false), Field::new("y", DataType::Float64, false), - ]; + ]); StructArray::new(DataType::Struct(fields), vec![x, y], None) }; diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index adc9df0e0344..3168a89591c0 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1,6 +1,6 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; -use ahash::HashMap; +use ahash::{HashMap, HashSet}; use itertools::Itertools as _; use nohash_hasher::{IntMap, IntSet}; use smallvec::SmallVec; @@ -820,7 +820,7 @@ impl DataTable { field }; - let datatype = DataType::List(Box::new(field)); + let datatype = DataType::List(Arc::new(field)); let offsets = Offsets::try_from_lengths(column.iter().map(|cell| { cell.as_ref() .map_or(0, |cell| cell.num_instances() as usize) @@ -879,7 +879,8 @@ impl DataTable { pub fn deserialize( table_id: TableId, schema: &Schema, - chunk: &Chunk>, + chunk: Chunk>, + datatypes: Option<&HashSet>, ) -> DataTableResult { crate::profile_function!(); @@ -933,22 +934,25 @@ impl DataTable { // --- Components --- + let mut columns = chunk.into_arrays(); + let columns: DataTableResult<_> = schema .fields .iter() .enumerate() .filter_map(|(i, field)| { - field.metadata.get(METADATA_KIND).and_then(|kind| { - (kind == METADATA_KIND_DATA).then_some((field.name.as_str(), i)) - }) + field + .metadata + .get(METADATA_KIND) + .and_then(|kind| (kind == METADATA_KIND_DATA).then_some((field, i))) }) - .map(|(name, index)| { - let component: ComponentName = name.into(); - chunk - .get(index) - .ok_or(DataTableError::MissingColumn(name.to_owned())) + .map(|(field, index)| { + let component: ComponentName = field.name.as_str().into(); + columns + .get_mut(index) + .ok_or(DataTableError::MissingColumn(field.name.clone())) .and_then(|column| { - Self::deserialize_data_column(component, &**column) + Self::deserialize_data_column(component, &**column, datatypes) .map(|data| (component, data)) }) }) @@ -998,17 +1002,43 @@ impl DataTable { fn deserialize_data_column( component: ComponentName, column: &dyn Array, + datatypes: Option<&HashSet>, ) -> DataTableResult { crate::profile_function!(); + + let column = column + .as_any() + .downcast_ref::>() + .ok_or(DataTableError::NotAColumn(component.to_string()))?; + + // let datatype = datatypes.and_then(|datatypes| { + // datatypes.get(ListArray::::get_child_type(column.data_type())) + // }); + + // let chunk = { + // crate::profile_scope!("dedupe_datatypes"); + // let mut columns = chunk.into_arrays(); + // for column in &mut columns { + // *column = dedupe_datatypes(&mut self.datatypes, column.as_ref()); + // } + // Chunk::new(columns) + // }; + Ok(DataCellColumn( column - .as_any() - .downcast_ref::>() - .ok_or(DataTableError::NotAColumn(component.to_string()))? .iter() // TODO(#1805): Schema metadata gets cloned in every single array. // This'll become a problem as soon as we enable batching. - .map(|array| array.map(|values| DataCell::from_arrow(component, values))) + .map(|array| { + array.map(|values| { + if let Some(datatypes) = datatypes { + let values = swap_datatypes(datatypes, values); + DataCell::from_arrow(component, values) + } else { + DataCell::from_arrow(component, values) + } + }) + }) .collect(), )) } @@ -1019,7 +1049,10 @@ impl DataTable { impl DataTable { /// Deserializes the contents of an [`ArrowMsg`] into a `DataTable`. #[inline] - pub fn from_arrow_msg(msg: &ArrowMsg) -> DataTableResult { + pub fn from_arrow_msg( + msg: ArrowMsg, + datatypes: Option<&HashSet>, + ) -> DataTableResult { let ArrowMsg { table_id, timepoint_max: _, @@ -1027,7 +1060,7 @@ impl DataTable { chunk, } = msg; - Self::deserialize(*table_id, schema, chunk) + Self::deserialize(table_id, &schema, chunk, datatypes) } /// Serializes the contents of a `DataTable` into an [`ArrowMsg`]. @@ -1130,3 +1163,83 @@ impl DataTable { table } } + +// --- Deduplication --- + +use arrow2::array::{FixedSizeListArray, StructArray, UnionArray}; +use itertools::Itertools; + +// TODO +// - we shouldnt have to pay for crazy expensive virtual clones... datatype should be overridable +fn swap_datatypes(datatypes: &HashSet, array: Box) -> Box { + crate::profile_function!(); + + fn fill(datatypes: &HashSet, array: &Box) -> Box { + let datatype = datatypes + .get(array.data_type()) + .cloned() + .unwrap_or(array.data_type().clone()); + + match array.data_type() { + DataType::List(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::>().unwrap(); + ListArray::::new( + datatype, + array.offsets().clone(), + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + FixedSizeListArray::new( + datatype, + fill(datatypes, array.values()), + array.validity().cloned(), + ) + .boxed() + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + StructArray::new( + datatype, + array + .values() + .iter() + .map(|v| fill(datatypes, v)) + .collect_vec(), + array.validity().cloned(), + ) + .boxed() + } + DataType::Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + UnionArray::new( + datatype, + array.types().clone(), + array + .fields() + .iter() + .map(|v| fill(datatypes, v)) + .collect_vec(), + array.offsets().cloned(), + ) + .boxed() + } + _ => array.to_boxed(), + } + } + + fill(datatypes, &array) +} diff --git a/crates/re_log_types/src/path/entity_path.rs b/crates/re_log_types/src/path/entity_path.rs index d23b257aa64b..3d2a0b46a543 100644 --- a/crates/re_log_types/src/path/entity_path.rs +++ b/crates/re_log_types/src/path/entity_path.rs @@ -234,7 +234,7 @@ impl ArrowField for EntityPath { fn data_type() -> DataType { DataType::Extension( "rerun.entity_path".to_owned(), - Box::new(DataType::Utf8), + Arc::new(DataType::Utf8), None, ) } diff --git a/crates/re_log_types/src/size_bytes.rs b/crates/re_log_types/src/size_bytes.rs index a670eee44d74..8521f93b06ac 100644 --- a/crates/re_log_types/src/size_bytes.rs +++ b/crates/re_log_types/src/size_bytes.rs @@ -1,4 +1,7 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; use arrow2::datatypes::{DataType, Field}; use smallvec::SmallVec; @@ -87,6 +90,13 @@ impl SizeBytes for Option { } } +impl SizeBytes for Arc { + #[inline] + fn heap_size_bytes(&self) -> u64 { + (**self).heap_size_bytes() + } +} + // NOTE: `impl SizeBytesExt for T {}` would be nice but violates orphan rules. macro_rules! impl_size_bytes_pod { ($ty:ty) => { diff --git a/crates/re_query/src/entity_view.rs b/crates/re_query/src/entity_view.rs index a9bf71e39f1a..9b2335435afc 100644 --- a/crates/re_query/src/entity_view.rs +++ b/crates/re_query/src/entity_view.rs @@ -110,7 +110,7 @@ impl ComponentWithInstances { keys.binary_search(&instance_key.0).ok()? as u32 }; - Some(self.values.as_arrow_ref().slice(offset as _, 1)) + Some(self.values.as_arrow_ref().sliced(offset as _, 1)) } /// Produce a `ComponentWithInstances` from native component types diff --git a/crates/re_tuid/src/lib.rs b/crates/re_tuid/src/lib.rs index a23a971fbc32..856d6c7d34db 100644 --- a/crates/re_tuid/src/lib.rs +++ b/crates/re_tuid/src/lib.rs @@ -6,6 +6,8 @@ #![doc = document_features::document_features!()] //! +use std::sync::Arc; + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[cfg_attr( feature = "arrow2_convert", @@ -30,11 +32,11 @@ impl arrow2_convert::field::ArrowField for Tuid { type Type = Self; fn data_type() -> arrow2::datatypes::DataType { - let datatype = arrow2::datatypes::DataType::Struct(<[_]>::into_vec(Box::new([ + let datatype = arrow2::datatypes::DataType::Struct(Arc::new(<[_]>::into_vec(Box::new([ ::field("time_ns"), ::field("inc"), - ]))); - arrow2::datatypes::DataType::Extension("rerun.tuid".into(), Box::new(datatype), None) + ])))); + arrow2::datatypes::DataType::Extension("rerun.tuid".into(), Arc::new(datatype), None) } } diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index 092c612305bd..06b454136ed2 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -714,7 +714,7 @@ impl App { log_db.data_source = Some(self.rx.source().clone()); } - if let Err(err) = log_db.add(&msg) { + if let Err(err) = log_db.add(msg) { re_log::error!("Failed to add incoming msg: {err}"); }; @@ -1820,7 +1820,7 @@ fn load_rrd_to_log_db(mut read: impl std::io::Read) -> anyhow::Result { let mut log_db = LogDb::default(); for msg in decoder { - log_db.add(&msg?)?; + log_db.add(msg?)?; } Ok(log_db) } diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index c4c982c2d066..4c886d5a779c 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -101,7 +101,7 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - let table = match DataTable::from_arrow_msg(self) { + let table = match DataTable::from_arrow_msg(self.clone() /* TODO */, None) { Ok(table) => table, Err(err) => { ui.label( diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 979c05705ff7..85c3aa6fa6ee 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -469,7 +469,7 @@ fn receive_into_log_db(rx: &Receiver) -> anyhow::Result { re_log::info_once!("Received first message."); let is_goodbye = matches!(msg, re_log_types::LogMsg::Goodbye(_)); - db.add(&msg)?; + db.add(msg)?; num_messages += 1; if is_goodbye { db.entity_db.data_store.sanity_check()?;