Skip to content

Commit

Permalink
OrcScan adds bytes_scanned metric (#631)
Browse files Browse the repository at this point in the history
OrcScan update orc-rust dependency to supports column pruning

Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Oct 28, 2024
1 parent a246ab0 commit bcee889
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ datafusion-ext-plans = { path = "./native-engine/datafusion-ext-plans" }
# datafusion: branch=v42-blaze
datafusion = { version = "42.0.0" }

orc-rust = { version = "0.3.1" }
orc-rust = { version = "0.4.1" }

# arrow: branch=v53-blaze
arrow = { version = "53.0.0", features = ["ffi"]}
Expand All @@ -72,7 +72,7 @@ datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git",
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "c54bfb5"}
orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "9c74ac3"}

# arrow: branch=v53-blaze
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
Expand Down
65 changes: 52 additions & 13 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use datafusion::{
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue,
MetricsSet, Time,
},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning,
PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -166,11 +169,11 @@ impl ExecutionPlan for OrcExec {
};

let opener = OrcOpener {
partition_index,
projection,
batch_size: batch_size(),
_limit: self.base_config.limit,
table_schema: self.base_config.file_schema.clone(),
_metrics: self.metrics.clone(),
metrics: self.metrics.clone(),
fs_provider,
};

Expand Down Expand Up @@ -210,20 +213,31 @@ impl ExecutionPlan for OrcExec {
}

struct OrcOpener {
partition_index: usize,
projection: Vec<usize>,
batch_size: usize,
_limit: Option<usize>,
table_schema: SchemaRef,
_metrics: ExecutionPlanMetricsSet,
metrics: ExecutionPlanMetricsSet,
fs_provider: Arc<FsProvider>,
}

impl FileOpener for OrcOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new(
self.fs_provider.clone(),
file_meta.object_meta.clone(),
)?));
let reader = OrcFileReaderRef {
inner: Arc::new(InternalFileReader::try_new(
self.fs_provider.clone(),
file_meta.object_meta.clone(),
)?),
metrics: OrcFileMetrics::new(
self.partition_index,
file_meta
.object_meta
.location
.filename()
.unwrap_or("missing filename"),
&self.metrics.clone(),
),
};
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
Expand All @@ -233,7 +247,7 @@ impl FileOpener for OrcOpener {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
.or_else(|err| df_execution_err!("create orc reader error: {err}"))?;
if let Some(range) = file_meta.range.clone() {
if let Some(range) = &file_meta.range {
let range = range.start as usize..range.end as usize;
builder = builder.with_file_byte_range(range);
}
Expand Down Expand Up @@ -265,21 +279,46 @@ impl FileOpener for OrcOpener {
}

#[derive(Clone)]
struct OrcFileReaderRef(Arc<InternalFileReader>);
struct OrcFileReaderRef {
inner: Arc<InternalFileReader>,
metrics: OrcFileMetrics,
}

impl AsyncChunkReader for OrcFileReaderRef {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.0.get_meta().size as u64) }.boxed()
async move { Ok(self.inner.get_meta().size as u64) }.boxed()
}

fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
let inner = self.inner.clone();
let offset_from_start = offset_from_start as usize;
let length = length as usize;
let range = offset_from_start..(offset_from_start + length);
async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed()
self.metrics.bytes_scanned.add(length);
async move {
tokio::task::spawn_blocking(move || inner.read_fully(range).map_err(|e| e.into()))
.await
.expect("tokio spawn_blocking error")
}
.boxed()
}
}

#[derive(Clone)]
struct OrcFileMetrics {
bytes_scanned: Count,
}

impl OrcFileMetrics {
pub fn new(partition: usize, filename: &str, metrics: &ExecutionPlanMetricsSet) -> Self {
let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("bytes_scanned", partition);

Self { bytes_scanned }
}
}

0 comments on commit bcee889

Please sign in to comment.