Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OrcScan reads missing data column #716

Merged
merged 6 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use bytes::Bytes;
use datafusion::{
datasource::{
physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream},
schema_adapter::SchemaAdapter,
},
error::Result,
execution::context::TaskContext,
Expand All @@ -34,17 +33,19 @@ use datafusion::{
PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion::datasource::schema_adapter::SchemaMapper;
use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use futures_util::TryStreamExt;
use once_cell::sync::OnceCell;
use orc_rust::{
arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader,
reader::metadata::FileMetadata,
};

use crate::{
common::execution_context::ExecutionContext,
scan::{internal_file_reader::InternalFileReader, BlazeSchemaAdapter},
scan::{internal_file_reader::InternalFileReader, BlazeSchemaMapping},
};

/// Execution plan for scanning one or more Orc partitions
Expand Down Expand Up @@ -208,7 +209,8 @@ impl FileOpener for OrcOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
let schema_adapter = BlazeSchemaAdapter::new(projected_schema);

let schema_adapter = SchemaAdapter::new(projected_schema, projection);

Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
Expand All @@ -218,15 +220,9 @@ impl FileOpener for OrcOpener {
let range = range.start as usize..range.end as usize;
builder = builder.with_file_byte_range(range);
}
let file_schema = builder.schema();
let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(file_schema.as_ref())?;

// Offset by 1 since index 0 is the root
let projection = adapted_projections
.iter()
.map(|i| i + 1)
.collect::<Vec<_>>();

let (schema_mapping, projection) = schema_adapter.map_schema(builder.file_metadata())?;

let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
let stream = builder
Expand Down Expand Up @@ -264,3 +260,36 @@ impl AsyncChunkReader for OrcFileReaderRef {
async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed()
}
}

struct SchemaAdapter {
table_schema: SchemaRef,
projection: Vec<usize>,
}

impl SchemaAdapter {
pub fn new(table_schema: SchemaRef, projection: Vec<usize>) -> Self {
Self { table_schema, projection }
}

fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);

let mut projection = Vec::with_capacity(projected_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for nameColumn in orc_file_meta.root_data_type().children() {
if let Some((table_idx, _table_field)) =
projected_schema.fields().find(nameColumn.name()) {
field_mappings[table_idx] = Some(projection.len());
projection.push(nameColumn.data_type().column_index());
}
}

Ok((
Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(),
field_mappings,
)),
projection,
))
}
}
6 changes: 6 additions & 0 deletions native-engine/datafusion-ext-plans/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ pub struct BlazeSchemaMapping {
field_mappings: Vec<Option<usize>>,
}

impl BlazeSchemaMapping {
pub fn new(table_schema: SchemaRef, field_mappings: Vec<Option<usize>>) -> Self {
Self { table_schema, field_mappings }
}
}

impl SchemaMapper for BlazeSchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
Expand Down
Loading