Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http handler support temp table. #16375

Merged
merged 6 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/ci/ci-run-sqllogic-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
if [ -z "$RUN_DIR" ]; then
target/${BUILD_PROFILE}/databend-sqllogictests --handlers "mysql" --run_dir temp_table --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --run_dir temp_table --enable_sandbox --parallel 8
fi
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee,temp_table --enable_sandbox --parallel 8
2 changes: 1 addition & 1 deletion src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub trait TableContext: Send + Sync {
lock_opt: &LockTableOption,
) -> Result<Option<Arc<LockGuard>>>;

fn get_session_id(&self) -> Result<String>;
fn get_temp_table_prefix(&self) -> Result<String>;

fn session_state(&self) -> SessionState;

Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl AuthMgr {
let user_info = user_api.get_user(&tenant, identity.clone()).await?;
session.set_authed_user(user_info, claim.auth_role).await?;
}
session.set_client_session_id(claim.session_id.clone());
SkyFan2002 marked this conversation as resolved.
Show resolved Hide resolved
Ok(Some(claim.session_id))
}
Credential::Jwt {
Expand Down
16 changes: 12 additions & 4 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::servers::http::v1::ClientSessionManager;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::stream::DataBlockStream;
use crate::stream::ProgressStream;
use crate::stream::PullingExecutorStream;
Expand Down Expand Up @@ -165,8 +167,8 @@ fn log_query_start(ctx: &QueryContext) {
InterpreterMetrics::record_query_start(ctx);
let now = SystemTime::now();
let session = ctx.get_current_session();

if session.get_type().is_user_session() {
let typ = session.get_type();
if typ.is_user_session() {
SessionManager::instance().status.write().query_start(now);
}

Expand All @@ -182,8 +184,14 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
let session = ctx.get_current_session();

session.get_status().write().query_finish();
if session.get_type().is_user_session() {
SessionManager::instance().status.write().query_finish(now)
let typ = session.get_type();
if typ.is_user_session() {
SessionManager::instance().status.write().query_finish(now);
if typ == SessionType::HTTPQuery {
if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_finish(&cid, &session)
}
}
}

if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ impl<E> HTTPSessionEndpoint<E> {
}

let client_session_id = self.auth_manager.auth(&mut session, &credential).await?;
if let Some(id) = client_session_id.clone() {
session.set_client_session_id(id)
}
let databend_token = match credential {
Credential::DatabendToken { token, .. } => Some(token),
_ => None,
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::interpreters::InterpreterQueryLog;
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::http_query::ResponseState;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::servers::http::v1::ClientSessionManager;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryAffect;
Expand Down Expand Up @@ -332,6 +333,9 @@ impl ExecuteState {
format_settings: Arc<parking_lot::RwLock<Option<FormatSettings>>>,
) -> Result<()> {
info!("http query prepare to plan sql");
if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_start(&cid, &session)
}

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use databend_common_cache::Cache;
use databend_common_cache::LruCache;
use databend_common_config::InnerConfig;
Expand All @@ -26,9 +29,13 @@ use databend_common_meta_app::principal::user_token::QueryTokenInfo;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
use databend_common_users::UserApiProvider;
use databend_storages_common_session::drop_all_temp_tables;
use databend_storages_common_session::TempTblMgrRef;
use parking_lot::Mutex;
use parking_lot::RwLock;
use sha2::Digest;
use sha2::Sha256;
use tokio::time::Instant;

use crate::servers::http::v1::session::token::unix_ts;
use crate::servers::http::v1::SessionClaim;
Expand All @@ -54,12 +61,38 @@ fn hash_token(token: &[u8]) -> String {
hex::encode_upper(Sha256::digest(token))
}

enum QueryState {
InUse,
Idle(Instant),
}

struct SessionState {
pub query_state: QueryState,
pub temp_tbl_mgr: TempTblMgrRef,
}

impl QueryState {
pub fn has_expired(&self, now: &Instant) -> bool {
match self {
QueryState::InUse => false,
QueryState::Idle(t) => (*now - *t) > Duration::from_secs(4 * 3600),
}
}
}

pub struct ClientSessionManager {
/// store hash only for hit ratio with limited memory, feasible because:
/// - token contain all info in itself.
/// - for eviction, LRU itself is enough, no need to check expired tokens specifically.
session_tokens: RwLock<LruCache<String, Option<String>>>,
refresh_tokens: RwLock<LruCache<String, Option<String>>>,

/// add: write temp table
/// rm:
/// - all temp table deleted
/// - session closed
/// - timeout
session_state: Mutex<BTreeMap<String, SessionState>>,
}

impl ClientSessionManager {
Expand All @@ -69,14 +102,40 @@ impl ClientSessionManager {

#[async_backtrace::framed]
pub async fn init(_cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(Arc::new(Self {
let mgr = Arc::new(Self {
session_tokens: RwLock::new(LruCache::with_items_capacity(1024)),
refresh_tokens: RwLock::new(LruCache::with_items_capacity(1024)),
}));

session_state: Default::default(),
});
GlobalInstance::set(mgr.clone());
Thread::spawn(move || Self::check_timeout(mgr));
Ok(())
}

async fn check_timeout(self: Arc<Self>) {
loop {
let now = Instant::now();
let expired = {
let guard = self.session_state.lock();
guard
.iter()
.filter(|(_, state)| state.query_state.has_expired(&now))
.map(|(id, state)| (id.clone(), state.temp_tbl_mgr.clone()))
.collect::<Vec<_>>()
};
{
let mut guard = self.session_state.lock();
for (id, _) in expired.iter() {
guard.remove(id);
}
}
for (_id, mgr) in expired {
drop_all_temp_tables(mgr).await.ok();
SkyFan2002 marked this conversation as resolved.
Show resolved Hide resolved
}
tokio::time::sleep(Duration::from_secs(900)).await;
}
}

/// used for both issue token for new session and renew token for existing session.
/// currently, when renewing, always return a new refresh token instead of update the TTL,
/// since now we include expire time in token, which can not be updated.
Expand Down Expand Up @@ -257,7 +316,43 @@ impl ClientSessionManager {
.drop_client_session_id(&claim.session_id)
.await
.ok();
let state = self.session_state.lock().remove(&claim.session_id);
if let Some(state) = state {
drop_all_temp_tables(state.temp_tbl_mgr).await?;
}
};
Ok(())
}

pub fn on_query_start(&self, client_session_id: &str, session: &Arc<Session>) {
let mut guard = self.session_state.lock();
guard.entry(client_session_id.to_string()).and_modify(|e| {
e.query_state = QueryState::InUse;
session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone())
});
}
pub fn on_query_finish(&self, client_session_id: &str, session: &Arc<Session>) {
let temp_tbl_mgr = session.temp_tbl_mgr();
let (is_empty, just_changed) = temp_tbl_mgr.lock().is_empty();
if !is_empty || just_changed {
let mut guard = self.session_state.lock();
match guard.entry(client_session_id.to_string()) {
Entry::Vacant(e) => {
if !is_empty {
e.insert(SessionState {
query_state: QueryState::Idle(Instant::now()),
temp_tbl_mgr,
});
}
}
Entry::Occupied(mut e) => {
if !is_empty {
e.get_mut().query_state = QueryState::Idle(Instant::now())
} else {
e.remove();
}
}
}
}
}
}
35 changes: 35 additions & 0 deletions src/query/service/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::convert_byte_size;
Expand All @@ -28,6 +29,7 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::SendableDataBlockStream;
use databend_common_io::prelude::FormatSettings;
use databend_common_meta_app::principal::client_session::ClientSession;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_metrics::mysql::*;
use databend_common_users::CertifiedInfo;
Expand Down Expand Up @@ -71,6 +73,7 @@ pub struct InteractiveWorker {
version: String,
salt: [u8; 20],
client_addr: String,
keep_alive_task_started: bool,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -213,6 +216,9 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
}

let mut writer = DFQueryResultWriter::create(writer, self.base.session.clone());
if !self.keep_alive_task_started {
self.start_keep_alive().await
}

let instant = Instant::now();
let query_result = self
Expand Down Expand Up @@ -476,8 +482,37 @@ impl InteractiveWorker {
salt: scramble,
version: format!("{}-{}", MYSQL_VERSION, *DATABEND_COMMIT_VERSION),
client_addr,
keep_alive_task_started: false,
}
}

async fn start_keep_alive(&mut self) {
let session = &self.base.session;
let tenant = session.get_current_tenant();
let session_id = session.get_id();
let user_name = session
.get_current_user()
.expect("mysql handler should be authed when call")
.name;
self.keep_alive_task_started = true;

databend_common_base::runtime::spawn(async move {
loop {
UserApiProvider::instance()
.client_session_api(&tenant)
.upsert_client_session_id(
&session_id,
ClientSession {
user_name: user_name.clone(),
},
Duration::from_secs(3600 + 600),
)
.await
.ok();
tokio::time::sleep(Duration::from_secs(3600)).await;
}
});
}
}

struct ContextProgressReporter {
Expand Down
16 changes: 15 additions & 1 deletion src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use databend_common_base::runtime::TrySpawn;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ToErrorCode;
use databend_common_users::UserApiProvider;
use databend_storages_common_session::drop_all_temp_tables;
use log::error;
use log::warn;
use opensrv_mysql::plain_run_with_options;
Expand Down Expand Up @@ -64,11 +66,13 @@ impl MySQLConnection {
}
};

let mut interactive_worker = InteractiveWorker::create(session, client_addr);
let mut interactive_worker =
InteractiveWorker::create(session.clone(), client_addr);
let opts = IntermediaryOptions {
process_use_statement_on_query: true,
reject_connection_on_dbname_absence: false,
};

let (r, w) = non_blocking_stream.into_split();
let mut w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w);

Expand All @@ -87,6 +91,16 @@ impl MySQLConnection {
}
_ => plain_run_with_options(interactive_worker, w, opts, init_params).await,
}
.ok();

let tenant = session.get_current_tenant();
let session_id = session.get_id();
UserApiProvider::instance()
.client_session_api(&tenant)
.drop_client_session_id(&session_id)
.await
.ok();
drop_all_temp_tables(session.temp_tbl_mgr()).await
});
let _ = futures::executor::block_on(join_handle);
});
Expand Down
13 changes: 2 additions & 11 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1335,17 +1335,8 @@ impl TableContext for QueryContext {
Ok(lock_guard)
}

fn get_session_id(&self) -> Result<String> {
let session_type = self.shared.session.get_type();
match session_type {
SessionType::MySQL => Ok(self.shared.session.id.clone()),
_ => self
.shared
.session
.session_ctx
.get_client_session_id()
.ok_or(ErrorCode::Internal("No client session id".to_string())),
}
fn get_temp_table_prefix(&self) -> Result<String> {
self.shared.session.get_temp_table_prefix()
}

fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool {
Expand Down
Loading
Loading