Skip to content

Commit

Permalink
refactor: use temp table to refactor materialized cte (#16900)
Browse files Browse the repository at this point in the history
* refactor: use temp table to refactor materialized cte

* remove useless code

* specify database name

* find reason and add todo

* clear cache

* fix stream

* fix with consume

* temp table uses fuse engine if cluster mode

* fix

* update test

* remove materialized keyword for stream

* make temp table name unique

* clear temp table

* fix all tests

* remove useless code

* address some comments

* add basic table stats for memery table

* update test
  • Loading branch information
xudong963 authored Dec 4, 2024
1 parent 6ede5a6 commit bea64df
Show file tree
Hide file tree
Showing 50 changed files with 284 additions and 1,378 deletions.
22 changes: 6 additions & 16 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -78,8 +77,6 @@ use crate::runtime_filter_info::RuntimeFilterReady;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;

pub type MaterializedCtesBlocks = Arc<RwLock<HashMap<(usize, usize), Arc<RwLock<Vec<DataBlock>>>>>>;

pub struct ContextError;

#[derive(Debug)]
Expand Down Expand Up @@ -264,6 +261,8 @@ pub trait TableContext: Send + Sync {
async fn get_table(&self, catalog: &str, database: &str, table: &str)
-> Result<Arc<dyn Table>>;

fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()>;

async fn get_table_with_batch(
&self,
catalog: &str,
Expand All @@ -281,19 +280,6 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn set_materialized_cte(
&self,
idx: (usize, usize),
mem_table: Arc<RwLock<Vec<DataBlock>>>,
) -> Result<()>;

fn get_materialized_cte(
&self,
idx: (usize, usize),
) -> Result<Option<Arc<RwLock<Vec<DataBlock>>>>>;

fn get_materialized_ctes(&self) -> MaterializedCtesBlocks;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
Expand Down Expand Up @@ -389,6 +375,10 @@ pub trait TableContext: Send + Sync {
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/prefetch_async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ impl<T: 'static + PrefetchAsyncSource> Processor for PrefetchAsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_inner_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
15 changes: 14 additions & 1 deletion src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_base::base::short_sql;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_base::runtime::profile::ProfileDesc;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -230,7 +231,9 @@ async fn plan_sql(
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_query_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
Arc::new(ServiceQueryExecutor::new(QueryContext::create_from(
ctx.clone(),
))),
);

// Parse the SQL query, get extract additional information.
Expand Down Expand Up @@ -316,6 +319,8 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
})?;
}

hook_clear_m_cte_temp_table(&query_ctx)?;

hook_vacuum_temp_files(&query_ctx)?;

hook_disk_temp_dir(&query_ctx)?;
Expand Down Expand Up @@ -358,3 +363,11 @@ fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
_ => false,
}
}

fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
let _ = GlobalIORuntime::instance().block_on(async move {
query_ctx.drop_m_cte_temp_table().await?;
Ok(())
});
Ok(())
}
73 changes: 0 additions & 73 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ use databend_common_exception::Result;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::Sinker;
use databend_common_sql::executor::physical_plans::HashJoin;
use databend_common_sql::executor::physical_plans::MaterializedCte;
use databend_common_sql::executor::physical_plans::RangeJoin;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::ColumnBinding;
use databend_common_sql::IndexType;

use crate::pipelines::processors::transforms::range_join::RangeJoinState;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::MaterializedCteSink;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::pipelines::processors::HashJoinDesc;
Expand Down Expand Up @@ -77,8 +72,6 @@ impl PipelineBuilder {
right_side_context,
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand Down Expand Up @@ -148,8 +141,6 @@ impl PipelineBuilder {
build_side_context,
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

Expand Down Expand Up @@ -224,68 +215,4 @@ impl PipelineBuilder {

Ok(())
}

pub(crate) fn build_materialized_cte(
&mut self,
materialized_cte: &MaterializedCte,
) -> Result<()> {
self.cte_scan_offsets.insert(
materialized_cte.cte_idx,
materialized_cte.cte_scan_offset.clone(),
);
self.expand_materialized_side_pipeline(
&materialized_cte.right,
materialized_cte.cte_idx,
&materialized_cte.materialized_output_columns,
)?;
self.build_pipeline(&materialized_cte.left)
}

fn expand_materialized_side_pipeline(
&mut self,
materialized_side: &PhysicalPlan,
cte_idx: IndexType,
materialized_output_columns: &[ColumnBinding],
) -> Result<()> {
let materialized_side_ctx = QueryContext::create_from(self.ctx.clone());
let state = Arc::new(MaterializedCteState::new(self.ctx.clone()));
self.cte_state.insert(cte_idx, state.clone());
let mut materialized_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
materialized_side_ctx,
self.main_pipeline.get_scopes(),
);
materialized_side_builder.cte_state = self.cte_state.clone();
materialized_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
materialized_side_builder.hash_join_states = self.hash_join_states.clone();
let mut materialized_side_pipeline =
materialized_side_builder.finalize(materialized_side)?;
assert!(
materialized_side_pipeline
.main_pipeline
.is_pulling_pipeline()?
);

PipelineBuilder::build_result_projection(
&self.func_ctx,
materialized_side.output_schema()?,
materialized_output_columns,
&mut materialized_side_pipeline.main_pipeline,
false,
)?;

materialized_side_pipeline.main_pipeline.add_sink(|input| {
let transform = Sinker::<MaterializedCteSink>::create(
input,
MaterializedCteSink::create(self.ctx.clone(), cte_idx, state.clone())?,
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines
.push(materialized_side_pipeline.main_pipeline.finalize());
self.pipelines
.extend(materialized_side_pipeline.sources_pipelines);
Ok(())
}
}
22 changes: 0 additions & 22 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::CacheScan;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CteScan;
use databend_common_sql::executor::physical_plans::ExpressionScan;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::plans::CacheSource;

use crate::pipelines::processors::transforms::CacheSourceState;
use crate::pipelines::processors::transforms::HashJoinCacheState;
use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::processors::transforms::TransformCacheScan;
use crate::pipelines::processors::transforms::TransformExpressionScan;
Expand Down Expand Up @@ -76,26 +74,6 @@ impl PipelineBuilder {
Ok(())
}

pub(crate) fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> {
let max_threads = self.settings.get_max_threads()?;
self.main_pipeline.add_source(
|output| {
MaterializedCteSource::create(
self.ctx.clone(),
output,
cte_scan.cte_idx,
self.cte_state.get(&cte_scan.cte_idx.0).unwrap().clone(),
cte_scan.offsets.clone(),
self.cte_scan_offsets
.get(&cte_scan.cte_idx.0)
.unwrap()
.clone(),
)
},
max_threads as usize,
)
}

pub(crate) fn build_constant_table_scan(&mut self, scan: &ConstantTableScan) -> Result<()> {
self.main_pipeline.add_source(
|output| {
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/builders/builder_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ impl PipelineBuilder {
union_ctx,
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;
Expand Down
13 changes: 0 additions & 13 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ use databend_common_pipeline_core::processors::PlanScopeGuard;
use databend_common_pipeline_core::Pipeline;
use databend_common_settings::Settings;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::IndexType;

use super::PipelineBuilderData;
use crate::interpreters::CreateTableInterpreter;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::exchange::DefaultExchangeInjector;
Expand All @@ -49,11 +47,6 @@ pub struct PipelineBuilder {
pub merge_into_probe_data_fields: Option<Vec<DataField>>,
pub join_state: Option<Arc<HashJoinBuildState>>,

// The cte state of each materialized cte.
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
// The column offsets used by cte scan
pub cte_scan_offsets: HashMap<IndexType, Vec<usize>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,
Expand All @@ -78,8 +71,6 @@ impl PipelineBuilder {
pipelines: vec![],
main_pipeline: Pipeline::with_scopes(scopes),
exchange_injector: DefaultExchangeInjector::create(),
cte_state: HashMap::new(),
cte_scan_offsets: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
Expand Down Expand Up @@ -162,7 +153,6 @@ impl PipelineBuilder {

match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
PhysicalPlan::ConstantTableScan(scan) => self.build_constant_table_scan(scan),
PhysicalPlan::Filter(filter) => self.build_filter(filter),
PhysicalPlan::EvalScalar(eval_scalar) => self.build_eval_scalar(eval_scalar),
Expand All @@ -189,9 +179,6 @@ impl PipelineBuilder {
"Invalid physical plan with PhysicalPlan::Exchange",
)),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::MaterializedCte(materialized_cte) => {
self.build_materialized_cte(materialized_cte)
}
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
PhysicalPlan::ExpressionScan(expression_scan) => {
self.build_expression_scan(expression_scan)
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mod transform_dictionary;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
mod transform_materialized_cte;
mod transform_merge_block;
mod transform_null_if;
mod transform_recursive_cte_scan;
Expand All @@ -55,9 +54,6 @@ pub use transform_create_sets::TransformCreateSets;
pub use transform_expression_scan::TransformExpressionScan;
pub use transform_filter::TransformFilter;
pub use transform_limit::TransformLimit;
pub use transform_materialized_cte::MaterializedCteSink;
pub use transform_materialized_cte::MaterializedCteSource;
pub use transform_materialized_cte::MaterializedCteState;
pub use transform_merge_block::TransformMergeBlock;
pub use transform_null_if::TransformNullIf;
pub use transform_recursive_cte_scan::TransformRecursiveCteScan;
Expand Down
Loading

0 comments on commit bea64df

Please sign in to comment.