Skip to content

Commit

Permalink
fix test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Dec 25, 2024
1 parent a0cf56c commit 7320b74
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 40 deletions.
17 changes: 10 additions & 7 deletions native-engine/datafusion-ext-plans/src/agg/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,16 @@ mod tests {
acc_col.append_item(2, &ScalarValue::Int32(Some(6)));
acc_col.append_item(2, &ScalarValue::Int32(Some(7)));

let mut spill: Box<dyn Spill> = Box::new(vec![]);
acc_col
.spill(
IdxSelection::Range(0, 3),
&mut spill.get_compressed_writer(),
)
.unwrap();
let spill = {
let mut spill: Box<dyn Spill> = Box::new(vec![]);
let mut spill_writer = spill.get_compressed_writer();
acc_col
.spill(IdxSelection::Range(0, 3), &mut spill_writer)
.unwrap();
spill_writer.finish().unwrap();
drop(spill_writer);
spill
};

let mut acc_col_unspill = AccSetColumn::empty(DataType::Int32);
acc_col_unspill
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod test {
Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_agg() -> Result<()> {
MemManager::init(10000);

Expand Down Expand Up @@ -647,7 +647,7 @@ mod fuzztest {
memmgr::MemManager,
};

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn fuzztest() -> Result<()> {
MemManager::init(1000); // small memory config to trigger spill
let session_ctx =
Expand Down
36 changes: 18 additions & 18 deletions native-engine/datafusion-ext-plans/src/joins/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {
SHJRightProbed,
];

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_inner_one() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -304,7 +304,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_inner_two() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_inner_two_two() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -385,7 +385,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_inner_with_nulls() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table_i32_nullable(
Expand Down Expand Up @@ -425,7 +425,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_left_one() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -459,7 +459,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_right_one() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -493,7 +493,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_full_one() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -530,7 +530,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_anti() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -563,7 +563,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_semi() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_with_duplicated_column_names() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -631,7 +631,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_date32() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_date_table(
Expand Down Expand Up @@ -667,7 +667,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_date64() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_date64_table(
Expand Down Expand Up @@ -704,7 +704,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_left_sort_order() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -742,7 +742,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_right_sort_order() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left = build_table(
Expand Down Expand Up @@ -776,7 +776,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_left_multiple_batches() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left_batch_1 = build_table_i32(
Expand Down Expand Up @@ -827,7 +827,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_right_multiple_batches() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let right_batch_1 = build_table_i32(
Expand Down Expand Up @@ -878,7 +878,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_full_multiple_batches() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left_batch_1 = build_table_i32(
Expand Down Expand Up @@ -931,7 +931,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn join_existence_multiple_batches() -> Result<()> {
for test_type in ALL_TEST_TYPE {
let left_batch_1 = build_table_i32(
Expand Down
32 changes: 21 additions & 11 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use bytes::Bytes;
use datafusion::{

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 23 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs
datasource::{
physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream},
schema_adapter::SchemaMapper,
},
error::Result,
execution::context::TaskContext,
Expand All @@ -33,14 +34,14 @@ use datafusion::{
PlanProperties, SendableRecordBatchStream, Statistics,
},
};
use datafusion::datasource::schema_adapter::SchemaMapper;
use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use futures_util::TryStreamExt;
use once_cell::sync::OnceCell;

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 40 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs
use orc_rust::{
arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader,
reader::metadata::FileMetadata,
arrow_reader::ArrowReaderBuilder,
projection::ProjectionMask,
reader::{metadata::FileMetadata, AsyncChunkReader},
};

use crate::{
Expand Down Expand Up @@ -221,7 +222,8 @@ impl FileOpener for OrcOpener {
builder = builder.with_file_byte_range(range);
}

let (schema_mapping, projection) = schema_adapter.map_schema(builder.file_metadata())?;
let (schema_mapping, projection) =
schema_adapter.map_schema(builder.file_metadata())?;

let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
Expand Down Expand Up @@ -268,26 +270,34 @@ struct SchemaAdapter {

impl SchemaAdapter {
pub fn new(table_schema: SchemaRef, projection: Vec<usize>) -> Self {
Self { table_schema, projection }
Self {
table_schema,
projection,
}
}

fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
fn map_schema(

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 279 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs
&self,
orc_file_meta: &FileMetadata,
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);

let mut projection = Vec::with_capacity(projected_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

Check warning on line 286 in native-engine/datafusion-ext-plans/src/orc_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/orc_exec.rs

for nameColumn in orc_file_meta.root_data_type().children() {
for named_column in orc_file_meta.root_data_type().children() {
if let Some((table_idx, _table_field)) =
projected_schema.fields().find(nameColumn.name()) {
projected_schema.fields().find(named_column.name())
{
field_mappings[table_idx] = Some(projection.len());
projection.push(nameColumn.data_type().column_index());
projection.push(named_column.data_type().column_index());
}
}

Ok((
Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(),
field_mappings,
Arc::new(BlazeSchemaMapping::new(
self.table_schema.clone(),
field_mappings,
)),
projection,
))
Expand Down
5 changes: 4 additions & 1 deletion native-engine/datafusion-ext-plans/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ pub struct BlazeSchemaMapping {

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs

Check warning on line 87 in native-engine/datafusion-ext-plans/src/scan/mod.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

Diff in /home/runner/work/blaze/blaze/native-engine/datafusion-ext-plans/src/scan/mod.rs
impl BlazeSchemaMapping {
pub fn new(table_schema: SchemaRef, field_mappings: Vec<Option<usize>>) -> Self {
Self { table_schema, field_mappings }
Self {
table_schema,
field_mappings,
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ mod test {
"| 10 | 9 | 4 |",
"| 17 | 2 | 7 |",
"| 13 | 6 | 1 |",
"| 16 | 3 | 8 |",
"| 12 | 7 | 2 |",
"| 16 | 3 | 8 |",
"| 19 | 0 | 5 |",
"| 15 | 4 | 9 |",
"| 11 | 8 | 3 |",
Expand Down

0 comments on commit 7320b74

Please sign in to comment.