diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 3acc0bc0..eebbef73 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -23,7 +23,6 @@ use bytes::Bytes; use datafusion::{ datasource::{ physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream}, - schema_adapter::SchemaAdapter, }, error::Result, execution::context::TaskContext, @@ -34,17 +33,19 @@ use datafusion::{ PlanProperties, SendableRecordBatchStream, Statistics, }, }; +use datafusion::datasource::schema_adapter::SchemaMapper; use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider}; use futures::{future::BoxFuture, FutureExt, StreamExt}; use futures_util::TryStreamExt; use once_cell::sync::OnceCell; use orc_rust::{ arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader, + reader::metadata::FileMetadata, }; use crate::{ common::execution_context::ExecutionContext, - scan::{internal_file_reader::InternalFileReader, BlazeSchemaAdapter}, + scan::{internal_file_reader::InternalFileReader, BlazeSchemaMapping}, }; /// Execution plan for scanning one or more Orc partitions @@ -208,7 +209,8 @@ impl FileOpener for OrcOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = BlazeSchemaAdapter::new(projected_schema); + + let schema_adapter = SchemaAdapter::new(projected_schema, projection); Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader) @@ -218,15 +220,9 @@ impl FileOpener for OrcOpener { let range = range.start as usize..range.end as usize; builder = builder.with_file_byte_range(range); } - let file_schema = builder.schema(); - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(file_schema.as_ref())?; - - // Offset by 1 since index 0 is the root - let projection = adapted_projections - .iter() - .map(|i| i + 1) - .collect::>(); + + let (schema_mapping, projection) = schema_adapter.map_schema(builder.file_metadata())?; + let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); let stream = builder @@ -264,3 +260,36 @@ impl AsyncChunkReader for OrcFileReaderRef { async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed() } } + +struct SchemaAdapter { + table_schema: SchemaRef, + projection: Vec, +} + +impl SchemaAdapter { + pub fn new(table_schema: SchemaRef, projection: Vec) -> Self { + Self { table_schema, projection } + } + + fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc, Vec)> { + let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); + + let mut projection = Vec::with_capacity(projected_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for nameColumn in orc_file_meta.root_data_type().children() { + if let Some((table_idx, _table_field)) = + projected_schema.fields().find(nameColumn.name()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(nameColumn.data_type().column_index()); + } + } + + Ok(( + Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(), + field_mappings, + )), + projection, + )) + } +} diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index 019b3a1d..068117db 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -85,6 +85,12 @@ pub struct BlazeSchemaMapping { field_mappings: Vec>, } +impl BlazeSchemaMapping { + pub fn new(table_schema: SchemaRef, field_mappings: Vec>) -> Self { + Self { table_schema, field_mappings } + } +} + impl SchemaMapper for BlazeSchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { let batch_rows = batch.num_rows();