Skip to content

Commit

Permalink
feat(query): speed up fetching mysql data from dictionaries via batch…
Browse files Browse the repository at this point in the history
… processing. (#16948)

* opt mysql dictionary via batch processing

* update code

* add todo notes
  • Loading branch information
Dragonliu2018 authored Nov 29, 2024
1 parent 21e1d9a commit 25d615f
Show file tree
Hide file tree
Showing 3 changed files with 526 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::string::String;
use std::sync::Arc;

Expand All @@ -32,6 +33,7 @@ use databend_common_expression::types::StringType;
use databend_common_expression::types::ValueType;
use databend_common_expression::with_integer_mapped_type;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
Expand All @@ -53,6 +55,92 @@ use crate::sql::plans::DictGetFunctionArgument;
use crate::sql::plans::DictionarySource;
use crate::sql::IndexType;

macro_rules! sqlx_fetch_optional {
($pool:expr, $sql:expr, $key_type:ty, $val_type:ty, $format_val_fn:expr) => {{
let res: Option<($key_type, $val_type)> =
sqlx::query_as(&$sql).fetch_optional($pool).await?;
Ok(res.map(|(_, v)| $format_val_fn(v)))
}};
}

macro_rules! fetch_single_row_by_sqlx {
($pool:expr, $sql:expr, $key_scalar:expr, $val_type:ty, $format_val_fn:expr) => {{
match $key_scalar {
DataType::Boolean => {
sqlx_fetch_optional!($pool, $sql, bool, $val_type, $format_val_fn)
}
DataType::String => {
sqlx_fetch_optional!($pool, $sql, String, $val_type, $format_val_fn)
}
DataType::Number(num_ty) => with_integer_mapped_type!(|KEY_NUM_TYPE| match num_ty {
NumberDataType::KEY_NUM_TYPE => {
sqlx_fetch_optional!($pool, $sql, KEY_NUM_TYPE, $val_type, $format_val_fn)
}
NumberDataType::Float32 => {
sqlx_fetch_optional!($pool, $sql, f32, $val_type, $format_val_fn)
}
NumberDataType::Float64 => {
sqlx_fetch_optional!($pool, $sql, f64, $val_type, $format_val_fn)
}
}),
_ => Err(ErrorCode::DictionarySourceError(format!(
"MySQL dictionary operator currently does not support value type {}",
$key_scalar,
))),
}
}};
}

macro_rules! fetch_all_rows_by_sqlx {
($pool:expr, $sql:expr, $key_scalar:expr, $val_type:ty, $format_key_fn:expr) => {
match $key_scalar {
DataType::Boolean => {
let res: Vec<(bool, $val_type)> = sqlx::query_as($sql).fetch_all($pool).await?;
res.into_iter()
.map(|(k, v)| ($format_key_fn(ScalarRef::Boolean(k)), v))
.collect()
}
DataType::String => {
let res: Vec<(String, $val_type)> = sqlx::query_as($sql).fetch_all($pool).await?;
res.into_iter()
.map(|(k, v)| ($format_key_fn(ScalarRef::String(&k)), v))
.collect()
}
DataType::Number(num_ty) => {
with_integer_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE => {
let res: Vec<(NUM_TYPE, $val_type)> =
sqlx::query_as($sql).fetch_all($pool).await?;
res.into_iter()
.map(|(k, v)| (format!("{}", k), v))
.collect()
}
NumberDataType::Float32 => {
let res: Vec<(f32, $val_type)> =
sqlx::query_as($sql).fetch_all($pool).await?;
res.into_iter()
.map(|(k, v)| (format!("{}", k), v))
.collect()
}
NumberDataType::Float64 => {
let res: Vec<(f64, $val_type)> =
sqlx::query_as($sql).fetch_all($pool).await?;
res.into_iter()
.map(|(k, v)| (format!("{}", k), v))
.collect()
}
})
}
_ => {
return Err(ErrorCode::DictionarySourceError(format!(
"MySQL dictionary operator currently does not support value type: {}",
$key_scalar
)));
}
}
};
}

pub(crate) enum DictionaryOperator {
Redis(ConnectionManager),
Mysql((MySqlPool, String)),
Expand Down Expand Up @@ -95,21 +183,14 @@ impl DictionaryOperator {
DictionaryOperator::Mysql((pool, sql)) => match value {
Value::Scalar(scalar) => {
let value = self
.get_data_from_mysql(scalar.as_ref(), data_type, pool, sql)
.get_scalar_value_from_mysql(scalar.as_ref(), data_type, pool, sql)
.await?
.unwrap_or(default_value.clone());
Ok(Value::Scalar(value))
}
Value::Column(column) => {
let mut builder = ColumnBuilder::with_capacity(data_type, column.len());
for scalar_ref in column.iter() {
let value = self
.get_data_from_mysql(scalar_ref, data_type, pool, sql)
.await?
.unwrap_or(default_value.clone());
builder.push(value.as_ref());
}
Ok(Value::Column(builder.build()))
self.get_column_values_from_mysql(column, data_type, default_value, pool, sql)
.await
}
},
}
Expand Down Expand Up @@ -239,72 +320,174 @@ impl DictionaryOperator {
}
}

async fn get_data_from_mysql(
async fn get_scalar_value_from_mysql(
&self,
key: ScalarRef<'_>,
data_type: &DataType,
value_type: &DataType,
pool: &MySqlPool,
sql: &String,
) -> Result<Option<Scalar>> {
if key == ScalarRef::Null {
return Ok(None);
}
match data_type.remove_nullable() {
let new_sql = format!("{} ({}) LIMIT 1", sql, self.format_key(key.clone()));
let key_type = key.infer_data_type().remove_nullable();
match value_type.remove_nullable() {
DataType::Boolean => {
let value: Option<bool> = sqlx::query_scalar(sql)
.bind(self.format_key(key))
.fetch_optional(pool)
.await?;
Ok(value.map(Scalar::Boolean))
fetch_single_row_by_sqlx!(pool, new_sql, key_type, bool, Scalar::Boolean)
}
DataType::String => {
let value: Option<String> = sqlx::query_scalar(sql)
.bind(self.format_key(key))
.fetch_optional(pool)
.await?;
Ok(value.map(Scalar::String))
fetch_single_row_by_sqlx!(pool, new_sql, key_type, String, Scalar::String)
}
DataType::Number(num_ty) => {
with_integer_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE => {
let value: Option<NUM_TYPE> = sqlx::query_scalar(&sql)
.bind(self.format_key(key))
.fetch_optional(pool)
.await?;
Ok(value.map(|v| Scalar::Number(NUM_TYPE::upcast_scalar(v))))
fetch_single_row_by_sqlx!(pool, new_sql, key_type, NUM_TYPE, |v| {
Scalar::Number(NUM_TYPE::upcast_scalar(v))
})
}
NumberDataType::Float32 => {
let value: Option<f32> = sqlx::query_scalar(sql)
.bind(self.format_key(key))
.fetch_optional(pool)
.await?;
Ok(value.map(|v| Scalar::Number(NumberScalar::Float32(v.into()))))
fetch_single_row_by_sqlx!(pool, new_sql, key_type, f32, |v: f32| {
Scalar::Number(NumberScalar::Float32(v.into()))
})
}
NumberDataType::Float64 => {
let value: Option<f64> = sqlx::query_scalar(sql)
.bind(self.format_key(key))
.fetch_optional(pool)
.await?;
Ok(value.map(|v| Scalar::Number(NumberScalar::Float64(v.into()))))
fetch_single_row_by_sqlx!(pool, new_sql, key_type, f64, |v: f64| {
Scalar::Number(NumberScalar::Float64(v.into()))
})
}
})
}
_ => Err(ErrorCode::DictionarySourceError(format!(
"MySQL dictionary operator currently does not support value type {data_type}"
"MySQL dictionary operator currently does not support value type {value_type}"
))),
}
}

async fn get_column_values_from_mysql(
&self,
column: &Column,
value_type: &DataType,
default_value: &Scalar,
pool: &MySqlPool,
sql: &String,
) -> Result<Value<AnyType>> {
// todo: The current method formats the key as a string, which causes some performance overhead.
// The next step is to use the key's native types directly, such as bool, i32, etc.
let key_cnt = column.len();
let mut all_keys = Vec::with_capacity(key_cnt);
let mut key_set = HashSet::with_capacity(key_cnt);
for item in column.iter() {
if item != ScalarRef::Null {
key_set.insert(item.clone());
}
all_keys.push(self.format_key(item));
}

let mut builder = ColumnBuilder::with_capacity(value_type, key_cnt);
if key_set.is_empty() {
for _ in 0..key_cnt {
builder.push(default_value.as_ref());
}
return Ok(Value::Column(builder.build()));
}
let new_sql = format!("{} ({})", sql, self.format_keys(key_set));
let key_type = column.data_type().remove_nullable();
match value_type.remove_nullable() {
DataType::Boolean => {
let kv_pairs: HashMap<String, bool> =
fetch_all_rows_by_sqlx!(pool, &new_sql, key_type, bool, |k| self.format_key(k));
for key in all_keys {
match kv_pairs.get(&key) {
Some(v) => builder.push(Scalar::Boolean(*v).as_ref()),
None => builder.push(default_value.as_ref()),
}
}
}
DataType::String => {
let kv_pairs: HashMap<String, String> =
fetch_all_rows_by_sqlx!(pool, &new_sql, key_type, String, |k| self
.format_key(k));
for key in all_keys {
match kv_pairs.get(&key) {
Some(v) => builder.push(Scalar::String(v.to_string()).as_ref()),
None => builder.push(default_value.as_ref()),
}
}
}
DataType::Number(num_ty) => {
with_integer_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE => {
let kv_pairs: HashMap<String, NUM_TYPE> =
fetch_all_rows_by_sqlx!(pool, &new_sql, key_type, NUM_TYPE, |k| self
.format_key(k));
for key in all_keys {
match kv_pairs.get(&key) {
Some(v) => builder
.push(Scalar::Number(NUM_TYPE::upcast_scalar(*v)).as_ref()),
None => builder.push(default_value.as_ref()),
}
}
}
NumberDataType::Float32 => {
let kv_pairs: HashMap<String, f32> =
fetch_all_rows_by_sqlx!(pool, &new_sql, key_type, f32, |k| self
.format_key(k));
for key in all_keys {
match kv_pairs.get(&key) {
Some(v) => builder.push(
Scalar::Number(NumberScalar::Float32((*v).into())).as_ref(),
),
None => builder.push(default_value.as_ref()),
}
}
}
NumberDataType::Float64 => {
let kv_pairs: HashMap<String, f64> =
fetch_all_rows_by_sqlx!(pool, &new_sql, key_type, f64, |k| self
.format_key(k));
for key in all_keys {
match kv_pairs.get(&key) {
Some(v) => builder.push(
Scalar::Number(NumberScalar::Float64((*v).into())).as_ref(),
),
None => builder.push(default_value.as_ref()),
}
}
}
})
}
_ => {
return Err(ErrorCode::DictionarySourceError(format!(
"MySQL dictionary operator currently does not support value type {value_type}"
)));
}
}
Ok(Value::Column(builder.build()))
}

#[inline]
fn format_key(&self, key: ScalarRef<'_>) -> String {
match key {
ScalarRef::String(s) => s.to_string(),
ScalarRef::String(s) => format!("'{}'", s.replace("'", "\\'")),
ScalarRef::Date(d) => format!("{}", date_to_string(d as i64, &TimeZone::UTC)),
ScalarRef::Timestamp(t) => {
format!("{}", timestamp_to_string(t, &TimeZone::UTC))
}
_ => format!("{}", key),
}
}

#[inline]
fn format_keys(&self, keys: HashSet<ScalarRef>) -> String {
format!(
"{}",
keys.into_iter()
.map(|key| self.format_key(key))
.collect::<Vec<String>>()
.join(",")
)
}
}

impl TransformAsyncFunction {
Expand Down Expand Up @@ -339,8 +522,11 @@ impl TransformAsyncFunction {
sqlx::MySqlPool::connect(&sql_source.connection_url),
)?;
let sql = format!(
"SELECT {} FROM {} WHERE {} = ? LIMIT 1",
&sql_source.value_field, &sql_source.table, &sql_source.key_field
"SELECT {}, {} FROM {} WHERE {} in",
&sql_source.key_field,
&sql_source.value_field,
&sql_source.table,
&sql_source.key_field
);
operators.insert(i, Arc::new(DictionaryOperator::Mysql((mysql_pool, sql))));
}
Expand Down
Loading

0 comments on commit 25d615f

Please sign in to comment.