From bfab1da2059edb14574ed82f6ff09a9a7f3914cc Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 19 Nov 2023 22:36:29 -0500 Subject: [PATCH 1/2] wip improved arrow1 async api --- Cargo.lock | 24 +-- Cargo.toml | 2 +- src/arrow1/metadata.rs | 230 +++++++++++++++++++++++++++ src/arrow1/mod.rs | 12 +- src/arrow1/reader_async.rs | 311 ++++++++++++++++++++++++++++++++++++- 5 files changed, 553 insertions(+), 26 deletions(-) create mode 100644 src/arrow1/metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 5ed649ff..748e6976 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -1741,9 +1741,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" dependencies = [ "cfg-if", "serde", @@ -1753,9 +1753,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" dependencies = [ "bumpalo", "log", @@ -1780,9 +1780,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1790,9 +1790,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2", "quote", @@ -1803,9 +1803,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" [[package]] name = "wasm-bindgen-test" diff --git a/Cargo.toml b/Cargo.toml index 9e9fd7d8..bb915afe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ full = [ ] [dependencies] -wasm-bindgen = { version = "0.2.83", features = ["serde-serialize"] } +wasm-bindgen = { version = "0.2.88", features = ["serde-serialize"] } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires diff --git a/src/arrow1/metadata.rs b/src/arrow1/metadata.rs new file mode 100644 index 00000000..6db18781 --- /dev/null +++ b/src/arrow1/metadata.rs @@ -0,0 +1,230 @@ +use crate::arrow1::error::WasmResult; +use wasm_bindgen::prelude::*; + +/// Global Parquet metadata. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct ParquetMetaData(parquet::file::metadata::ParquetMetaData); + +#[wasm_bindgen] +impl ParquetMetaData { + /// Returns file metadata as reference. + #[wasm_bindgen] + pub fn file_metadata(&self) -> FileMetaData { + self.0.file_metadata().clone().into() + } + + /// Returns number of row groups in this file. + #[wasm_bindgen] + pub fn num_row_groups(&self) -> usize { + self.0.num_row_groups() + } + + /// Returns row group metadata for `i`th position. + /// Position should be less than number of row groups `num_row_groups`. + #[wasm_bindgen] + pub fn row_group(&self, i: usize) -> RowGroupMetaData { + self.0.row_group(i).clone().into() + } + + // /// Returns the column index for this file if loaded + // pub fn column_index(&self) -> Option { + // self.0.column_index() + // } +} + +impl From for ParquetMetaData { + fn from(value: parquet::file::metadata::ParquetMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::ParquetMetaData { + fn from(value: ParquetMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet file. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct FileMetaData(parquet::file::metadata::FileMetaData); + +#[wasm_bindgen] +impl FileMetaData { + /// Returns version of this file. + #[wasm_bindgen] + pub fn version(&self) -> i32 { + self.0.version() + } + + /// Returns number of rows in the file. + #[wasm_bindgen] + pub fn num_rows(&self) -> i64 { + self.0.num_rows() + } + + /// String message for application that wrote this file. + /// + /// This should have the following format: + /// ` version (build )`. + /// + /// ```shell + /// parquet-mr version 1.8.0 (build 0fda28af84b9746396014ad6a415b90592a98b3b) + /// ``` + #[wasm_bindgen] + pub fn created_by(&self) -> Option { + self.0.created_by().map(|s| s.to_string()) + } + + /// Returns key_value_metadata of this file. + #[wasm_bindgen] + pub fn key_value_metadata(&self) -> Result { + let map = js_sys::Map::new(); + if let Some(metadata) = self.0.key_value_metadata() { + for meta in metadata { + if let Some(value) = &meta.value { + map.set(&JsValue::from_str(&meta.key), &JsValue::from_str(value)); + } + } + } + Ok(map) + } +} + +impl From for FileMetaData { + fn from(value: parquet::file::metadata::FileMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::FileMetaData { + fn from(value: FileMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet row group. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct RowGroupMetaData(parquet::file::metadata::RowGroupMetaData); + +#[wasm_bindgen] +impl RowGroupMetaData { + /// Number of columns in this row group. + #[wasm_bindgen] + pub fn num_columns(&self) -> usize { + self.0.num_columns() + } + + /// Returns column chunk metadata for `i`th column. + #[wasm_bindgen] + pub fn column(&self, i: usize) -> ColumnChunkMetaData { + self.0.column(i).clone().into() + } + + /// Number of rows in this row group. + #[wasm_bindgen] + pub fn num_rows(&self) -> i64 { + self.0.num_rows() + } + + /// Total byte size of all uncompressed column data in this row group. + #[wasm_bindgen] + pub fn total_byte_size(&self) -> i64 { + self.0.total_byte_size() + } + + /// Total size of all compressed column data in this row group. + #[wasm_bindgen] + pub fn compressed_size(&self) -> i64 { + self.0.compressed_size() + } +} + +impl From for RowGroupMetaData { + fn from(value: parquet::file::metadata::RowGroupMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::RowGroupMetaData { + fn from(value: RowGroupMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet column chunk. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct ColumnChunkMetaData(parquet::file::metadata::ColumnChunkMetaData); + +#[wasm_bindgen] +impl ColumnChunkMetaData { + /// File where the column chunk is stored. + /// + /// If not set, assumed to belong to the same file as the metadata. + /// This path is relative to the current file. + #[wasm_bindgen] + pub fn file_path(&self) -> Option { + self.0.file_path().map(|s| s.to_string()) + } + + /// Byte offset in `file_path()`. + #[wasm_bindgen] + pub fn file_offset(&self) -> i64 { + self.0.file_offset() + } + + // /// Type of this column. Must be primitive. + // pub fn column_type(&self) -> Type { + // self.column_descr.physical_type() + // } + + /// Path (or identifier) of this column. + #[wasm_bindgen] + pub fn column_path(&self) -> Vec { + let path = self.0.column_path(); + path.parts().to_vec() + } + + // /// All encodings used for this column. + // pub fn encodings(&self) -> &Vec { + // &self.encodings + // } + + /// Total number of values in this column chunk. + #[wasm_bindgen] + pub fn num_values(&self) -> i64 { + self.0.num_values() + } + + // /// Compression for this column. + // pub fn compression(&self) -> Compression { + // self.compression + // } + + /// Returns the total compressed data size of this column chunk. + #[wasm_bindgen] + pub fn compressed_size(&self) -> i64 { + self.0.compressed_size() + } + + /// Returns the total uncompressed data size of this column chunk. + #[wasm_bindgen] + pub fn uncompressed_size(&self) -> i64 { + self.0.uncompressed_size() + } +} + +impl From for ColumnChunkMetaData { + fn from(value: parquet::file::metadata::ColumnChunkMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::ColumnChunkMetaData { + fn from(value: ColumnChunkMetaData) -> Self { + value.0 + } +} diff --git a/src/arrow1/mod.rs b/src/arrow1/mod.rs index e9edc85c..34682629 100644 --- a/src/arrow1/mod.rs +++ b/src/arrow1/mod.rs @@ -1,15 +1,11 @@ +pub mod error; +pub mod metadata; #[cfg(feature = "reader")] pub mod reader; - +#[cfg(all(feature = "reader", feature = "async"))] +pub mod reader_async; pub mod wasm; - #[cfg(feature = "writer")] pub mod writer; - #[cfg(feature = "writer")] pub mod writer_properties; - -pub mod error; - -#[cfg(all(feature = "reader", feature = "async"))] -pub mod reader_async; diff --git a/src/arrow1/reader_async.rs b/src/arrow1/reader_async.rs index bef91365..1b70e60f 100644 --- a/src/arrow1/reader_async.rs +++ b/src/arrow1/reader_async.rs @@ -1,15 +1,316 @@ +use futures::channel::oneshot; +use futures::future::BoxFuture; +use std::ops::Range; use std::sync::Arc; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::spawn_local; -use crate::arrow1::error::Result; -use crate::common::fetch::{create_reader, get_content_length}; +use crate::arrow1::error::{Result, WasmResult}; +use crate::common::fetch::{ + create_reader, get_content_length, range_from_end, range_from_start_and_length, +}; use arrow::ipc::writer::StreamWriter; -use futures::StreamExt; -use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}; +use arrow_wasm::arrow1::{RecordBatch, Table}; +use bytes::Bytes; +use futures::TryStreamExt; +use futures::{stream, FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::async_reader::{ + AsyncFileReader, ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder, +}; use async_compat::{Compat, CompatExt}; -use parquet::file::metadata::FileMetaData; +use async_trait::async_trait; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::{FileMetaData, ParquetMetaData}; use range_reader::RangedAsyncReader; +use reqwest::Client; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen] +pub struct AsyncParquetFile { + reader: HTTPFileReader, + meta: ArrowReaderMetadata, +} + +#[wasm_bindgen] +impl AsyncParquetFile { + #[wasm_bindgen(constructor)] + pub async fn new(url: String) -> WasmResult { + let client = Client::new(); + let mut reader = HTTPFileReader::new(url.clone(), client.clone(), 1024); + let meta = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; + Ok(Self { reader, meta }) + } + + #[wasm_bindgen] + pub fn metadata(&self) -> WasmResult { + Ok(self.meta.metadata().as_ref().to_owned().into()) + } + + #[wasm_bindgen] + pub async fn read_row_group(&self, i: usize) -> WasmResult { + let builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + self.reader.clone(), + self.meta.clone(), + ); + let stream = builder.with_row_groups(vec![i]).build()?; + let mut results = stream.try_collect::>().await.unwrap(); + + // NOTE: This is not only one batch by default due to arrow-rs's default rechunking. + // assert_eq!(results.len(), 1, "Expected one record batch"); + // Ok(RecordBatch::new(results.pop().unwrap())) + Ok(Table::new(results)) + } +} + +#[derive(Debug, Clone)] +pub struct HTTPFileReader { + url: String, + client: Client, + coalesce_byte_size: usize, +} + +impl HTTPFileReader { + pub fn new(url: String, client: Client, coalesce_byte_size: usize) -> Self { + Self { + url, + client, + coalesce_byte_size, + } + } +} + +#[async_trait] +impl AsyncFileReader for HTTPFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + async move { + let range_str = + range_from_start_and_length(range.start as u64, (range.end - range.start) as u64); + + // Map reqwest error to parquet error + // let map_err = |err| parquet::errors::ParquetError::External(Box::new(err)); + + let bytes = make_range_request_with_client( + self.url.to_string(), + self.client.clone(), + range_str, + ) + .await + .unwrap(); + + Ok(bytes) + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let fetch_ranges = merge_ranges(&ranges, self.coalesce_byte_size); + + // NOTE: This still does _sequential_ requests, but it should be _fewer_ requests if they + // can be merged. + async move { + let mut fetched = Vec::with_capacity(ranges.len()); + + for range in fetch_ranges.iter() { + let data = self.get_bytes(range.clone()).await?; + fetched.push(data); + } + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + fetch_bytes.slice(start..end) + }) + .collect()) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + let meta = fetch_parquet_metadata(self.url.as_str(), &self.client, None).await?; + Ok(Arc::new(meta)) + } + .boxed() + } +} + +pub async fn make_range_request_with_client( + url: String, + client: Client, + range_str: String, +) -> std::result::Result { + let (sender, receiver) = oneshot::channel(); + spawn_local(async move { + let resp = client + .get(url) + .header("Range", range_str) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let bytes = resp.bytes().await.unwrap(); + sender.send(bytes).unwrap(); + }); + let data = receiver.await.unwrap(); + Ok(data) +} + +// async fn make_request( +// url: &str, +// client: &Client, +// range: Range, +// ) -> parquet::errors::Result { +// todo!() +// } + +// async fn get_bytes<'a>( +// url: &'a str, +// client: &'a Client, +// range: Range, +// ) -> BoxFuture<'a, parquet::errors::Result> { +// async move { +// let range_str = +// range_from_start_and_length(range.start as u64, (range.end - range.start) as u64); + +// // Map reqwest error to parquet error +// let map_err = |err| parquet::errors::ParquetError::External(Box::new(err)); + +// let resp = client +// .get(url) +// .header("Range", range_str) +// .send() +// .await +// .map_err(map_err)? +// .error_for_status() +// .map_err(map_err)?; +// let bytes = resp.bytes().await.map_err(map_err)?; +// Ok(bytes) +// } +// .boxed() +// } + +// async fn get_byte_ranges<'a>( +// url: &'a str, +// client: &'a Client, +// ranges: Vec>, +// coalesce_byte_size: usize, +// ) -> BoxFuture<'a, parquet::errors::Result>> { +// let fetch_ranges = merge_ranges(&ranges, coalesce_byte_size); + +// let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) +// .map(move |range| get_bytes(url, client, range)) +// .buffered(10) +// .try_collect() +// .await?; + +// todo!() +// // let bodies = stream::iter(fetch_ranges) +// // .map(|range| { +// // let client = &client; +// // async move { +// // let resp = client.get(url).send().await?; +// // resp.bytes().await +// // } +// // }) +// // .buffer_unordered(10); +// } + +/// Returns a sorted list of ranges that cover `ranges` +/// +/// Copied from object-store +/// https://github.com/apache/arrow-rs/blob/61da64a0557c80af5bb43b5f15c6d8bb6a314cb2/object_store/src/util.rs#L132C1-L169C1 +fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} + +// Derived from: +// https://github.com/apache/arrow-rs/blob/61da64a0557c80af5bb43b5f15c6d8bb6a314cb2/parquet/src/arrow/async_reader/metadata.rs#L54-L57 +pub async fn fetch_parquet_metadata( + url: &str, + client: &Client, + prefetch: Option, +) -> parquet::errors::Result { + let suffix_length = prefetch.unwrap_or(8); + let range_str = range_from_end(suffix_length as u64); + + // Map reqwest error to parquet error + // let map_err = |err| parquet::errors::ParquetError::External(Box::new(err)); + + let suffix = make_range_request_with_client(url.to_string(), client.clone(), range_str) + .await + .unwrap(); + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let metadata_byte_length = decode_footer(&footer)?; + + // Did not fetch the entire file metadata in the initial read, need to make a second request + let metadata = if metadata_byte_length > suffix_len - 8 { + let metadata_range_str = range_from_end((metadata_byte_length + 8) as u64); + + let meta_bytes = + make_range_request_with_client(url.to_string(), client.clone(), metadata_range_str) + .await + .unwrap(); + + decode_metadata(&meta_bytes[0..meta_bytes.len() - 8])? + } else { + let metadata_start = suffix_len - metadata_byte_length - 8; + + let slice = &suffix[metadata_start..suffix_len - 8]; + decode_metadata(slice)? + }; + + Ok(metadata) +} pub async fn read_metadata_async( url: String, From 118f48139c17050d819e13284ccec17712566602 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 19 Nov 2023 22:49:25 -0500 Subject: [PATCH 2/2] remove async_trait --- src/arrow1/reader_async.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/arrow1/reader_async.rs b/src/arrow1/reader_async.rs index 1b70e60f..4f44b055 100644 --- a/src/arrow1/reader_async.rs +++ b/src/arrow1/reader_async.rs @@ -21,7 +21,6 @@ use parquet::arrow::async_reader::{ }; use async_compat::{Compat, CompatExt}; -use async_trait::async_trait; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::{FileMetaData, ParquetMetaData}; use range_reader::RangedAsyncReader; @@ -82,7 +81,6 @@ impl HTTPFileReader { } } -#[async_trait] impl AsyncFileReader for HTTPFileReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { async move {