Skip to content

Commit

Permalink
fix(query): make memory engine as non-local table (#16955)
Browse files Browse the repository at this point in the history
* fix(query): disallow insert into local table in cluster mode

* fix(query): make null table / memory table distributed

* update

* update

* update
  • Loading branch information
sundy-li authored Nov 28, 2024
1 parent fb89467 commit 06c4ab6
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 51 deletions.
4 changes: 4 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub trait Table: Sync + Send {
false
}

fn support_distributed_insert(&self) -> bool {
false
}

/// whether table has the exact number of total rows
fn has_exact_total_row_count(&self) -> bool {
false
Expand Down
13 changes: 7 additions & 6 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Interpreter for InsertInterpreter {
}
InsertInputSource::SelectPlan(plan) => {
let table1 = table.clone();
let (mut select_plan, select_column_bindings, metadata) = match plan.as_ref() {
let (select_plan, select_column_bindings, metadata) = match plan.as_ref() {
Plan::Query {
s_expr,
metadata,
Expand All @@ -166,10 +166,11 @@ impl Interpreter for InsertInterpreter {
dml_build_update_stream_req(self.ctx.clone(), metadata).await?;

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match select_plan {
PhysicalPlan::Exchange(ref mut exchange) => {
// insert can be dispatched to different nodes
let insert_select_plan = match (select_plan, table.support_distributed_insert()) {
(PhysicalPlan::Exchange(ref mut exchange), true) => {
// insert can be dispatched to different nodes if table support_distributed_insert
let input = exchange.input.clone();

exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new(
DistributedInsertSelect {
// TODO(leiysky): we reuse the id of exchange here,
Expand All @@ -183,9 +184,9 @@ impl Interpreter for InsertInterpreter {
cast_needed: self.check_schema_cast(plan)?,
},
)));
select_plan
PhysicalPlan::Exchange(exchange.clone())
}
other_plan => {
(other_plan, _) => {
// insert should wait until all nodes finished
PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect {
// TODO: we reuse the id of other plan here,
Expand Down
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ impl Table for FuseTable {
true
}

fn support_distributed_insert(&self) -> bool {
true
}

fn has_exact_total_row_count(&self) -> bool {
true
}
Expand Down
18 changes: 6 additions & 12 deletions src/query/storages/memory/src/memory_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ use std::any::Any;
use std::sync::Arc;

use databend_common_catalog::plan::PartInfo;
use databend_common_catalog::plan::PartInfoPtr;

/// Memory table lazy partition information.
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct MemoryPartInfo {
pub total: usize,
pub part_start: usize,
pub part_end: usize,
}
pub struct MemoryPartInfo {}

#[typetag::serde(name = "memory")]
#[typetag::serde(name = "memory_part")]
impl PartInfo for MemoryPartInfo {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -42,11 +40,7 @@ impl PartInfo for MemoryPartInfo {
}

impl MemoryPartInfo {
pub fn create(start: usize, end: usize, total: usize) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(MemoryPartInfo {
total,
part_start: start,
part_end: end,
}))
pub fn create() -> PartInfoPtr {
Arc::new(Box::new(MemoryPartInfo {}))
}
}
53 changes: 20 additions & 33 deletions src/query/storages/memory/src/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,31 +123,6 @@ impl MemoryTable {

Arc::new(Mutex::new(read_data_blocks))
}

pub fn generate_memory_parts(start: usize, workers: usize, total: usize) -> Partitions {
let part_size = total / workers;
let part_remain = total % workers;

let mut partitions = Vec::with_capacity(workers);
if part_size == 0 {
partitions.push(MemoryPartInfo::create(start, total, total));
} else {
for part in 0..workers {
let mut part_begin = part * part_size;
if part == 0 && start > 0 {
part_begin = start;
}
let mut part_end = (part + 1) * part_size;
if part == (workers - 1) && part_remain > 0 {
part_end += part_remain;
}

partitions.push(MemoryPartInfo::create(part_begin, part_end, total));
}
}

Partitions::create(PartitionsShuffleKind::Seq, partitions)
}
}

#[async_trait::async_trait]
Expand All @@ -160,6 +135,12 @@ impl Table for MemoryTable {
&self.table_info
}

/// MemoryTable could be distributed table, yet we only insert data in one node per query
/// Because commit_insert did not support distributed transaction
fn is_local(&self) -> bool {
false
}

fn support_column_projection(&self) -> bool {
true
}
Expand All @@ -176,8 +157,7 @@ impl Table for MemoryTable {
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
let blocks = self.blocks.read();

let statistics = match push_downs {
let mut statistics = match push_downs {
Some(push_downs) => {
let projection_filter: Box<dyn Fn(usize) -> bool> = match push_downs.projection {
Some(prj) => {
Expand Down Expand Up @@ -214,12 +194,19 @@ impl Table for MemoryTable {
}
};

let parts = Self::generate_memory_parts(
0,
ctx.get_settings().get_max_threads()? as usize,
blocks.len(),
);
Ok((statistics, parts))
let cluster = ctx.get_cluster();
if !cluster.is_empty() {
statistics.read_bytes = statistics.read_bytes.max(cluster.nodes.len());
statistics.read_rows = statistics.read_rows.max(cluster.nodes.len());
statistics.partitions_total = statistics.partitions_total.max(cluster.nodes.len());
statistics.partitions_scanned = statistics.partitions_scanned.max(cluster.nodes.len());
}

let parts = vec![MemoryPartInfo::create()];
return Ok((
statistics,
Partitions::create(PartitionsShuffleKind::Broadcast, parts),
));
}

fn read_data(
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/null/src/null_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ impl Table for NullTable {
&self.table_info
}

/// Null do not keep data, it's safe to make it non-local.
fn is_local(&self) -> bool {
false
}

#[async_backtrace::framed]
async fn read_partitions(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,24 @@ select count(*) from test_order;
----
4000000

statement ok
create or replace table test_memory engine = Memory as select number, (number + 1) d from numbers(100000);

query I
select count() from test_memory
----
100000

statement ok
insert into test_memory select number, sum(number) from numbers(100000) group by number;

query I
select count() from test_memory
----
200000

statement ok
drop table test_memory;

statement ok
set enable_distributed_copy_into = 0;

0 comments on commit 06c4ab6

Please sign in to comment.