From 351b37c83abf0e78f0a413e6f89608ee9a5ddfe4 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 15:32:11 +0800 Subject: [PATCH 1/6] fix OrcScan reads missing data columns --- .../datafusion-ext-plans/src/orc_exec.rs | 45 ++++++++++++++----- .../datafusion-ext-plans/src/scan/mod.rs | 6 +++ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 3acc0bc0..a2d1ca68 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -34,18 +34,21 @@ use datafusion::{ PlanProperties, SendableRecordBatchStream, Statistics, }, }; +use datafusion::datasource::schema_adapter::SchemaMapper; use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider}; use futures::{future::BoxFuture, FutureExt, StreamExt}; use futures_util::TryStreamExt; use once_cell::sync::OnceCell; use orc_rust::{ arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader, + reader::metadata::FileMetadata, }; use crate::{ common::execution_context::ExecutionContext, scan::{internal_file_reader::InternalFileReader, BlazeSchemaAdapter}, }; +use crate::scan::BlazeSchemaMapping; /// Execution plan for scanning one or more Orc partitions #[derive(Debug, Clone)] @@ -199,6 +202,31 @@ struct OrcOpener { fs_provider: Arc, } +impl OrcOpener { + fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc, Vec)> { + let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + + let mut projection = Vec::with_capacity(projected_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for nameColumn in orc_file_meta.root_data_type().children() { + if let Some((table_idx, _table_field)) = + projected_schema.fields().find(nameColumn.name()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(nameColumn.data_type().column_index()); + } + } + + Ok(( + Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), + field_mappings, + )), + projection, + )) + } +} + impl FileOpener for OrcOpener { fn open(&self, file_meta: FileMeta) -> Result { let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new( @@ -206,9 +234,7 @@ impl FileOpener for OrcOpener { file_meta.object_meta.clone(), )?)); let batch_size = self.batch_size; - let projection = self.projection.clone(); - let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = BlazeSchemaAdapter::new(projected_schema); + Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) @@ -218,15 +244,10 @@ impl FileOpener for OrcOpener { let range = range.start as usize..range.end as usize; builder = builder.with_file_byte_range(range); } - let file_schema = builder.schema(); - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(file_schema.as_ref())?; - - // Offset by 1 since index 0 is the root - let projection = adapted_projections - .iter() - .map(|i| i + 1) - .collect::>(); + + let (schema_mapping, projection) = + self.map_schema(builder.file_metadata())?; + let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); let stream = builder diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index 019b3a1d..068117db 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -85,6 +85,12 @@ pub struct BlazeSchemaMapping { field_mappings: Vec>, } +impl BlazeSchemaMapping { + pub fn new(table_schema: SchemaRef, field_mappings: Vec>) -> Self { + Self { table_schema, field_mappings } + } +} + impl SchemaMapper for BlazeSchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows(); From 436a201c8a30029c33ed238b943c25e0731114d0 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 16:23:59 +0800 Subject: [PATCH 2/6] fix OrcScan reads missing data columns --- native-engine/datafusion-ext-plans/src/orc_exec.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index a2d1ca68..f78f5ffa 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -46,9 +46,8 @@ use orc_rust::{ use crate::{ common::execution_context::ExecutionContext, - scan::{internal_file_reader::InternalFileReader, BlazeSchemaAdapter}, + scan::{internal_file_reader::InternalFileReader, BlazeSchemaMapping}, }; -use crate::scan::BlazeSchemaMapping; /// Execution plan for scanning one or more Orc partitions #[derive(Debug, Clone)] From 0f28ec05251572a08ad51602943e390c900346cb Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 16:54:01 +0800 Subject: [PATCH 3/6] fix OrcScan reads missing data columns --- native-engine/datafusion-ext-plans/src/orc_exec.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index f78f5ffa..286a64eb 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -23,7 +23,6 @@ use bytes::Bytes; use datafusion::{ datasource::{ physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream}, - schema_adapter::SchemaAdapter, }, error::Result, execution::context::TaskContext, @@ -202,7 +201,7 @@ struct OrcOpener { } impl OrcOpener { - fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc, Vec)> { + fn map_schema(&self, orc_file_meta: FileMetadata) -> Result<(Arc, Vec)> { let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); @@ -245,7 +244,7 @@ impl FileOpener for OrcOpener { } let (schema_mapping, projection) = - self.map_schema(builder.file_metadata())?; + self.map_schema(builder.file_metadata().clone())?; let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); From da27d5b8ca07904b0ea944200a84afe0683b28b2 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 18:29:10 +0800 Subject: [PATCH 4/6] fix OrcScan reads missing data columns --- .../datafusion-ext-plans/src/orc_exec.rs | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 286a64eb..1509279a 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -200,31 +200,6 @@ struct OrcOpener { fs_provider: Arc, } -impl OrcOpener { - fn map_schema(&self, orc_file_meta: FileMetadata) -> Result<(Arc, Vec)> { - let projection = self.projection.clone(); - let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - - let mut projection = Vec::with_capacity(projected_schema.fields().len()); - let mut field_mappings = vec![None; self.table_schema.fields().len()]; - - for nameColumn in orc_file_meta.root_data_type().children() { - if let Some((table_idx, _table_field)) = - projected_schema.fields().find(nameColumn.name()) { - field_mappings[table_idx] = Some(projection.len()); - projection.push(nameColumn.data_type().column_index()); - } - } - - Ok(( - Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), - field_mappings, - )), - projection, - )) - } -} - impl FileOpener for OrcOpener { fn open(&self, file_meta: FileMeta) -> Result { let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new( @@ -232,7 +207,10 @@ impl FileOpener for OrcOpener { file_meta.object_meta.clone(), )?)); let batch_size = self.batch_size; + let projection = self.projection.clone(); + let table_schema = self.table_schema; + let schema_adapter = SchemaAdapter { projection, table_schema }; Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) @@ -243,8 +221,7 @@ impl FileOpener for OrcOpener { builder = builder.with_file_byte_range(range); } - let (schema_mapping, projection) = - self.map_schema(builder.file_metadata().clone())?; + let (schema_mapping, projection) = schema_adapter.map_schema(builder.file_metadata())?; let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); @@ -283,3 +260,33 @@ impl AsyncChunkReader for OrcFileReaderRef { async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed() } } + +struct SchemaAdapter { + table_schema: SchemaRef, + projection: Vec, +} + +impl SchemaAdapter { + fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc, Vec)> { + let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + + let mut projection = Vec::with_capacity(projected_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for nameColumn in orc_file_meta.root_data_type().children() { + if let Some((table_idx, _table_field)) = + projected_schema.fields().find(nameColumn.name()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(nameColumn.data_type().column_index()); + } + } + + Ok(( + Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), + field_mappings, + )), + projection, + )) + } +} From 3fdade470e868f5dd3d99d262fd8f603e23c8c9f Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 20:39:33 +0800 Subject: [PATCH 5/6] fix OrcScan reads missing data columns --- native-engine/datafusion-ext-plans/src/orc_exec.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 1509279a..a57d4c35 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -208,9 +208,9 @@ impl FileOpener for OrcOpener { )?)); let batch_size = self.batch_size; let projection = self.projection.clone(); - let table_schema = self.table_schema; + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = SchemaAdapter { projection, table_schema }; + let schema_adapter = SchemaAdapter::new(projected_schema, projection); Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) @@ -267,9 +267,12 @@ struct SchemaAdapter { } impl SchemaAdapter { + pub fn new(table_schema: SchemaRef, projection: Vec) -> Self { + Self { table_schema, projection } + } + fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc, Vec)> { - let projection = self.projection.clone(); - let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); let mut projection = Vec::with_capacity(projected_schema.fields().len()); let mut field_mappings = vec![None; self.table_schema.fields().len()]; @@ -283,7 +286,7 @@ impl SchemaAdapter { } Ok(( - Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), + Arc::new(BlazeSchemaMapping::new(self.table_schema, field_mappings, )), projection, From 3a7089eea0d6fc59198b17d6d63488a5d4180e25 Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 23 Dec 2024 20:44:09 +0800 Subject: [PATCH 6/6] fix OrcScan reads missing data columns --- native-engine/datafusion-ext-plans/src/orc_exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index a57d4c35..eebbef73 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -286,7 +286,7 @@ impl SchemaAdapter { } Ok(( - Arc::new(BlazeSchemaMapping::new(self.table_schema, + Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), field_mappings, )), projection,