Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
projection node working
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 18, 2024
1 parent 01e81c5 commit 8c31000
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 32 deletions.
1 change: 1 addition & 0 deletions sql_dfphysicalplan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

[dependencies]
arrow = { version = "50", features = ["prettyprint"] }
async-trait = "0.1"
datafusion = "35"
datafusion-common = "35"
futures = "0.3"
Expand Down
19 changes: 7 additions & 12 deletions sql_dfphysicalplan/queries/test_query.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
SELECT
L_RETURNFLAG,
L_LINESTATUS,
SUM(L_QUANTITY) AS SUM_QTY,
AVG(L_QUANTITY) AS AVG_QTY,
COUNT(*) AS COUNT_ORDER
FROM
LINEITEM
-- WHERE
-- L_SHIPDATE <= DATE '1998-12-01' - INTERVAL '93 DAY'
GROUP BY
L_RETURNFLAG,
L_LINESTATUS
o_totalprice, c_custkey, o_orderdate
FROM
orders, customer
WHERE
orders.o_custkey = customer.c_custkey AND
orders.o_totalprice < 900.00
;
154 changes: 134 additions & 20 deletions sql_dfphysicalplan/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,103 @@
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use async_trait::async_trait;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::prelude::*;
use datafusion_common::Result;
use futures::stream::StreamExt;
use std::io;
use std::sync::Arc;
use tokio::sync::broadcast::channel;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::broadcast::Sender;

async fn remove_projection(physical_plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let _ = physical_plan
#[async_trait]
pub(crate) trait UnaryOperator: Send {
type In;
type Out;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>>;

async fn execute(&self, rx: Receiver<Self::In>, tx: Sender<Self::Out>);
}

struct Projection {
output_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
input_schema: SchemaRef, // TODO
output_schema: SchemaRef,
}

impl Projection {
fn new(input_schema: SchemaRef, projection_plan: ProjectionExec) -> Self {
Self {
output_expr: Vec::from(projection_plan.expr()),
input_schema,
output_schema: projection_plan.schema(),
}
}

fn apply_projection(&self, rb: RecordBatch) -> Result<RecordBatch> {
assert_eq!(rb.schema(), self.input_schema);

let num_rows = rb.num_rows();

let mut columns = Vec::with_capacity(self.output_expr.len());

for (expr, name) in &self.output_expr {
let col_val = expr.evaluate(&rb).expect("expr.evaluate() fails");
let column = col_val.into_array(num_rows)?;
columns.push((name, column, expr.nullable(&self.input_schema)?));
}

Ok(RecordBatch::try_from_iter_with_nullable(columns)?)
}
}

#[async_trait]
impl UnaryOperator for Projection {
type In = RecordBatch;
type Out = RecordBatch;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Box::new(self)
}

async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) {
loop {
match rx.recv().await {
Ok(batch) => {
let projected_batch = self
.apply_projection(batch)
.expect("Unable to apply projection");

tx.send(projected_batch)
.expect("Unable to send the projected batch");
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}
}
}

fn get_projection_plan(physical_plan: Arc<dyn ExecutionPlan>) -> ProjectionExec {
physical_plan
.clone()
.as_any()
.downcast_ref::<ProjectionExec>()
.expect("Unable to downcast_ref to ProjectionExec");
.expect("Unable to downcast_ref to ProjectionExec")
.clone()
}

async fn remove_top_node(physical_plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let children = physical_plan.children();
assert_eq!(children.len(), 1);

Expand All @@ -40,40 +122,72 @@ async fn main() -> Result<()> {

let stdin = io::read_to_string(io::stdin())?;

// let results: Vec<_> = df.collect().await?;

// let pretty_results = pretty::pretty_format_batches(&results)?.to_string();

// println!("{}", pretty_results);

let df = ctx
.sql(&stdin)
.await
.unwrap_or_else(|err| panic!("{:#?}", err));

// let logical_plan = df.clone().into_unoptimized_plan();
// println!("Logical Plan:\n{:#?}", logical_plan);
let results: Vec<_> = df.collect().await?;
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?.to_string();
println!("{}", pretty_results);
todo!();

let physical_plan = df.clone().create_physical_plan().await?;

let physical_plan = remove_projection(physical_plan).await;
let child_plan = remove_top_node(physical_plan.clone()).await;

let partitioning = physical_plan.output_partitioning();
let partitioning = child_plan.output_partitioning();
let partitions = match partitioning {
Partitioning::RoundRobinBatch(c) => c,
Partitioning::Hash(_, h) => h,
Partitioning::UnknownPartitioning(p) => p,
};

for i in 0..partitions {
let batch_stream = physical_plan.execute(i, Default::default())?;
// Send everything from the vector `batches` into `base_tx`
// move `base_rx` into the Projection node
let (base_tx, base_rx) = channel(64);
let (out_tx, mut out_rx) = channel(64);

let projection_node = Projection::new(
child_plan.schema(),
get_projection_plan(physical_plan.clone()),
);

tokio::spawn(async move { projection_node.execute(base_rx, out_tx).await });

tokio::spawn(async move {
for i in 0..partitions {
let batch_stream = child_plan.execute(i, Default::default()).unwrap();

let results = batch_stream.collect::<Vec<_>>().await;
for batch in results {
let batch = batch.unwrap();
if batch.num_rows() == 0 {
continue;
let results = batch_stream.collect::<Vec<_>>().await;

for batch in results {
let batch = batch.unwrap();
if batch.num_rows() == 0 {
continue;
}

base_tx
.send(batch.clone())
.expect("Unable to send rb to project node");
}
}
});

let pretty_results = pretty::pretty_format_batches(&[batch])?.to_string();
println!("{}", pretty_results);
loop {
match out_rx.recv().await {
Ok(batch) => {
let pretty_results = pretty::pretty_format_batches(&[batch]).unwrap().to_string();
println!("{}", pretty_results);
}
Err(e) => match e {
tokio::sync::broadcast::error::RecvError::Closed => {
break;
}
tokio::sync::broadcast::error::RecvError::Lagged(_) => todo!(),
},
}
}

Expand Down

0 comments on commit 8c31000

Please sign in to comment.