Skip to content

Commit

Permalink
feat: Support USE CATALOG syntax and current_catalog() function (#16926)
Browse files Browse the repository at this point in the history
* working

* add ut

* format

* add http session catalog& add ut

* fix tests
  • Loading branch information
flashmouse authored Dec 2, 2024
1 parent b58d1eb commit f40b8c3
Show file tree
Hide file tree
Showing 22 changed files with 221 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub enum Statement {
ShowCreateCatalog(ShowCreateCatalogStmt),
CreateCatalog(CreateCatalogStmt),
DropCatalog(DropCatalogStmt),
UseCatalog {
catalog: Identifier,
},

// Databases
ShowDatabases(ShowDatabasesStmt),
Expand Down Expand Up @@ -410,6 +413,7 @@ impl Statement {
| Statement::Update(..)
| Statement::ShowCatalogs(..)
| Statement::ShowCreateCatalog(..)
| Statement::UseCatalog { .. }
| Statement::ShowDatabases(..)
| Statement::ShowDropDatabases(..)
| Statement::ShowCreateDatabase(..)
Expand Down Expand Up @@ -718,6 +722,7 @@ impl Display for Statement {
Statement::ShowCreateCatalog(stmt) => write!(f, "{stmt}")?,
Statement::CreateCatalog(stmt) => write!(f, "{stmt}")?,
Statement::DropCatalog(stmt) => write!(f, "{stmt}")?,
Statement::UseCatalog { catalog } => write!(f, "USE CATALOG {catalog}")?,
Statement::ShowDatabases(stmt) => write!(f, "{stmt}")?,
Statement::ShowDropDatabases(stmt) => write!(f, "{stmt}")?,
Statement::ShowCreateDatabase(stmt) => write!(f, "{stmt}")?,
Expand Down
12 changes: 11 additions & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
})
},
);
let use_catalog = map(
rule! {
USE ~ CATALOG ~ #ident
},
|(_, _, catalog)| Statement::UseCatalog { catalog },
);

let show_databases = map(
rule! {
Expand Down Expand Up @@ -2281,6 +2287,11 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
| #set_priority: "`SET PRIORITY (HIGH | MEDIUM | LOW) <object_id>`"
| #system_action: "`SYSTEM (ENABLE | DISABLE) EXCEPTION_BACKTRACE`"
),
// use
rule!(
#use_catalog: "`USE CATALOG <catalog>`"
| #use_database : "`USE <database>`"
),
// database
rule!(
#show_databases : "`SHOW [FULL] DATABASES [(FROM | IN) <catalog>] [<show_limit>]`"
Expand All @@ -2290,7 +2301,6 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
| #create_database : "`CREATE [OR REPLACE] DATABASE [IF NOT EXISTS] <database> [ENGINE = <engine>]`"
| #drop_database : "`DROP DATABASE [IF EXISTS] <database>`"
| #alter_database : "`ALTER DATABASE [IF EXISTS] <action>`"
| #use_database : "`USE <database>`"
),
// network policy / password policy
rule!(
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ fn test_statement() {
r#"drop table if exists a."b";"#,
r#"use "a";"#,
r#"create catalog ctl type=hive connection=(url='<hive-meta-store>' thrift_protocol='binary');"#,
r#"select current_catalog();"#,
r#"use catalog ctl;"#,
r#"create database if not exists a;"#,
r#"create database ctl.t engine = Default;"#,
r#"create database t engine = Default;"#,
Expand Down
77 changes: 77 additions & 0 deletions src/query/ast/tests/it/testdata/stmt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2954,6 +2954,83 @@ CreateCatalog(
)


---------- Input ----------
select current_catalog();
---------- Output ---------
SELECT current_catalog()
---------- AST ------------
Query(
Query {
span: Some(
0..24,
),
with: None,
body: Select(
SelectStmt {
span: Some(
0..24,
),
hints: None,
distinct: false,
top_n: None,
select_list: [
AliasedExpr {
expr: FunctionCall {
span: Some(
7..24,
),
func: FunctionCall {
distinct: false,
name: Identifier {
span: Some(
7..22,
),
name: "current_catalog",
quote: None,
ident_type: None,
},
args: [],
params: [],
window: None,
lambda: None,
},
},
alias: None,
},
],
from: [],
selection: None,
group_by: None,
having: None,
window_list: None,
qualify: None,
},
),
order_by: [],
limit: [],
offset: None,
ignore_result: false,
},
)


---------- Input ----------
use catalog ctl;
---------- Output ---------
USE CATALOG ctl
---------- AST ------------
UseCatalog {
catalog: Identifier {
span: Some(
12..15,
),
name: "ctl",
quote: None,
ident_type: None,
},
}


---------- Input ----------
create database if not exists a;
---------- Output ---------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ impl AccessChecker for PrivilegeAccess {
Plan::ShowCreateCatalog(_)
| Plan::CreateCatalog(_)
| Plan::DropCatalog(_)
| Plan::UseCatalog(_)
| Plan::CreateFileFormat(_)
| Plan::DropFileFormat(_)
| Plan::ShowFileFormats(_)
Expand Down
62 changes: 62 additions & 0 deletions src/query/service/src/interpreters/interpreter_catalog_use.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_sql::plans::UseCatalogPlan;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;

pub struct UseCatalogInterpreter {
ctx: Arc<QueryContext>,
plan: UseCatalogPlan,
}

impl UseCatalogInterpreter {
pub fn create(ctx: Arc<QueryContext>, plan: UseCatalogPlan) -> Self {
UseCatalogInterpreter { ctx, plan }
}
}

#[async_trait::async_trait]
impl Interpreter for UseCatalogInterpreter {
fn name(&self) -> &str {
"UseCatalogInterpreter"
}

fn is_ddl(&self) -> bool {
false
}

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if self.plan.catalog.trim().is_empty() {
return Err(ErrorCode::UnknownCatalog("No catalog selected"));
}
self.ctx
.set_current_catalog(self.plan.catalog.clone())
.await?;
self.ctx.set_affect(QueryAffect::UseCatalog {
name: self.plan.catalog.clone(),
});
let _schema = Arc::new(DataSchema::empty());
Ok(PipelineBuildResult::create())
}
}
3 changes: 3 additions & 0 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ impl InterpreterFactory {
Plan::DropCatalog(plan) => {
Ok(Arc::new(DropCatalogInterpreter::create(ctx, *plan.clone())))
}
Plan::UseCatalog(plan) => {
Ok(Arc::new(UseCatalogInterpreter::create(ctx, *plan.clone())))
}

// Databases
Plan::ShowCreateDatabase(show_create_database) => Ok(Arc::new(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod interpreter;
mod interpreter_catalog_create;
mod interpreter_catalog_drop;
mod interpreter_catalog_show_create;
mod interpreter_catalog_use;
mod interpreter_cluster_key_alter;
mod interpreter_cluster_key_drop;
mod interpreter_clustering_history;
Expand Down Expand Up @@ -153,6 +154,7 @@ pub use hook::HookOperator;
pub use interpreter::interpreter_plan_sql;
pub use interpreter::Interpreter;
pub use interpreter::InterpreterPtr;
pub use interpreter_catalog_use::UseCatalogInterpreter;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
pub use interpreter_clustering_history::InterpreterClusteringHistory;
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub struct Executor {
// may store these new session state, and pass it to the next http query request.
#[derive(Debug, Clone)]
pub struct ExecutorSessionState {
pub current_catalog: String,
pub current_database: String,
pub current_role: Option<String>,
pub secondary_roles: Option<Vec<String>>,
Expand All @@ -158,6 +159,7 @@ pub struct ExecutorSessionState {
impl ExecutorSessionState {
pub fn new(session: Arc<Session>) -> Self {
Self {
current_catalog: session.get_current_catalog(),
current_database: session.get_current_database(),
current_role: session.get_current_role().map(|r| r.name),
secondary_roles: session.get_secondary_roles(),
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ where D: Deserializer<'de> {

#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct HttpSessionConf {
#[serde(skip_serializing_if = "Option::is_none")]
pub catalog: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub database: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -412,10 +414,14 @@ impl HttpQuery {

// Read the session variables in the request, and set them to the current session.
// the session variables includes:
// - the current catalog
// - the current database
// - the current role
// - the session-level settings, like max_threads, http_handler_result_timeout_secs, etc.
if let Some(session_conf) = &request.session {
if let Some(catalog) = &session_conf.catalog {
session.set_current_catalog(catalog.clone());
}
if let Some(db) = &session_conf.database {
session.set_current_database(db.clone());
}
Expand Down Expand Up @@ -649,6 +655,7 @@ impl HttpQuery {
.filter(|item| matches!(item.level, ScopeLevel::Session))
.map(|item| (item.name.to_string(), item.user_value.as_string()))
.collect::<BTreeMap<_, _>>();
let catalog = session_state.current_catalog.clone();
let database = session_state.current_database.clone();
let role = session_state.current_role.clone();
let secondary_roles = session_state.secondary_roles.clone();
Expand Down Expand Up @@ -721,6 +728,7 @@ impl HttpQuery {
let need_keep_alive = need_sticky || has_temp_table;

Ok(HttpSessionConf {
catalog: Some(catalog),
database: Some(database),
role,
secondary_roles,
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_affect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum QueryAffect {
UseDB {
name: String,
},
UseCatalog {
name: String,
},
ChangeSettings {
keys: Vec<String>,
values: Vec<String>,
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ impl QueryContext {
StageTable::try_create(table_info.clone())
}

#[async_backtrace::framed]
pub async fn set_current_catalog(&self, new_catalog_name: String) -> Result<()> {
let _catalog = self.get_catalog(&new_catalog_name).await?;
self.shared.set_current_catalog(new_catalog_name);

Ok(())
}

#[async_backtrace::framed]
pub async fn set_current_database(&self, new_database_name: String) -> Result<()> {
let tenant_id = self.get_tenant();
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ impl QueryContextShared {
self.session.get_current_catalog()
}

pub fn set_current_catalog(&self, catalog_name: String) {
self.session.set_current_catalog(catalog_name)
}

pub fn get_aborting(&self) -> Arc<AtomicBool> {
self.aborting.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ impl Session {
self.session_ctx.get_current_catalog()
}

pub fn set_current_catalog(&self, catalog_name: String) {
self.session_ctx.set_current_catalog(catalog_name)
}

pub fn get_current_tenant(&self) -> Tenant {
self.session_ctx.get_current_tenant()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ async fn test_affect() -> Result<()> {
is_globals: vec![false],
}),
Some(HttpSessionConf {
catalog: Some("default".to_string()),
database: Some("default".to_string()),
role: Some("account_admin".to_string()),
secondary_roles: None,
Expand All @@ -1409,6 +1410,7 @@ async fn test_affect() -> Result<()> {
is_globals: vec![false],
}),
Some(HttpSessionConf {
catalog: Some("default".to_string()),
database: Some("default".to_string()),
role: Some("account_admin".to_string()),
secondary_roles: None,
Expand All @@ -1429,6 +1431,7 @@ async fn test_affect() -> Result<()> {
serde_json::json!({"sql": "create database if not exists db2", "session": {"settings": {"max_threads": "6"}}}),
None,
Some(HttpSessionConf {
catalog: Some("default".to_string()),
database: Some("default".to_string()),
role: Some("account_admin".to_string()),
secondary_roles: None,
Expand All @@ -1451,6 +1454,7 @@ async fn test_affect() -> Result<()> {
name: "db2".to_string(),
}),
Some(HttpSessionConf {
catalog: Some("default".to_string()),
database: Some("db2".to_string()),
role: Some("account_admin".to_string()),
secondary_roles: None,
Expand All @@ -1475,6 +1479,7 @@ async fn test_affect() -> Result<()> {
is_globals: vec![true],
}),
Some(HttpSessionConf {
catalog: Some("default".to_string()),
database: Some("default".to_string()),
role: Some("account_admin".to_string()),
secondary_roles: None,
Expand Down
Loading

0 comments on commit f40b8c3

Please sign in to comment.