Skip to content

Commit

Permalink
refactor: remove obsolete gRPC read API (#16909)
Browse files Browse the repository at this point in the history
* refactor: list_background_tasks() use structured return values

* chore: simplify list_udf

* refactor: remove obsolete gRPC read API

* chore: skip damaged version 1.2.287. Use 1.2.288 instead

* chore: try 300

* chore: adapt to new version of sqllogictest

* chore: fixup
  • Loading branch information
drmingdrmer authored Nov 22, 2024
1 parent 6bc928e commit e4bf8b2
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 103 deletions.
3 changes: 2 additions & 1 deletion src/meta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ The following is an illustration of the latest query-meta compatibility:
| [1.1.32, 1.2.63) |||||
| [1.2.63, 1.2.226) |||||
| [1.2.226, 1.2.258) |||||
| [1.2.258, +∞) |||||
| [1.2.258, 1.2.663) |||||
| [1.2.663, +∞) |||||

History versions that are not included in the above chart:

Expand Down
3 changes: 2 additions & 1 deletion src/meta/api/src/background_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use databend_common_meta_app::background::BackgroundJobInfo;
use databend_common_meta_app::background::BackgroundTaskIdent;
use databend_common_meta_app::background::BackgroundTaskInfo;
use databend_common_meta_app::background::CreateBackgroundJobReply;
use databend_common_meta_app::background::CreateBackgroundJobReq;
Expand Down Expand Up @@ -70,7 +71,7 @@ pub trait BackgroundApi: Send + Sync {
async fn list_background_tasks(
&self,
req: ListBackgroundTasksReq,
) -> Result<Vec<(u64, String, BackgroundTaskInfo)>, KVAppError>;
) -> Result<Vec<(BackgroundTaskIdent, SeqV<BackgroundTaskInfo>)>, KVAppError>;

async fn update_background_task(
&self,
Expand Down
20 changes: 3 additions & 17 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
Expand All @@ -54,7 +53,6 @@ use futures::TryStreamExt;
use log::debug;

use crate::background_api::BackgroundApi;
use crate::deserialize_struct;
use crate::fetch_id;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
Expand Down Expand Up @@ -243,22 +241,10 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
async fn list_background_tasks(
&self,
req: ListBackgroundTasksReq,
) -> Result<Vec<(u64, String, BackgroundTaskInfo)>, KVAppError> {
) -> Result<Vec<(BackgroundTaskIdent, SeqV<BackgroundTaskInfo>)>, KVAppError> {
let ident = BackgroundTaskIdent::new(&req.tenant, "dummy");
let prefix = ident.tenant_prefix();

let reply = self.prefix_list_kv(&prefix).await?;
let mut res = vec![];
for (k, v) in reply {
let ident = BackgroundTaskIdent::from_str_key(k.as_str()).map_err(|e| {
KVAppError::MetaError(MetaError::from(InvalidReply::new(
"list_background_tasks",
&e,
)))
})?;
let val: BackgroundTaskInfo = deserialize_struct(&v.data)?;
res.push((v.seq, ident.name().to_string(), val));
}
let dir = DirName::new(ident);
let res = self.list_pb_vec(&dir).await?;
Ok(res)
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/background_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ impl BackgroundApiTestSuite {
info!("update log res: {:?}", res);
let res = res.unwrap();
assert_eq!(1, res.len(), "there is one task");
assert_eq!(task_id, res[0].1, "task name");
assert_eq!(task_id, res[0].0.name(), "task name");
assert_eq!(
BackgroundTaskState::DONE,
res[0].2.task_state,
res[0].1.task_state,
"first state is done"
);
}
Expand Down
14 changes: 0 additions & 14 deletions src/meta/client/src/grpc_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ pub trait RequestFor: Clone + fmt::Debug {
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
pub enum MetaGrpcReq {
UpsertKV(UpsertKVReq),

GetKV(GetKVReq),
MGetKV(MGetKVReq),
ListKV(ListKVReq),
}

impl TryInto<MetaGrpcReq> for Request<RaftRequest> {
Expand Down Expand Up @@ -125,16 +121,6 @@ impl RequestFor for MetaGrpcReadReq {
type Reply = BoxStream<StreamItem>;
}

impl From<MetaGrpcReadReq> for MetaGrpcReq {
fn from(v: MetaGrpcReadReq) -> Self {
match v {
MetaGrpcReadReq::GetKV(v) => MetaGrpcReq::GetKV(v),
MetaGrpcReadReq::MGetKV(v) => MetaGrpcReq::MGetKV(v),
MetaGrpcReadReq::ListKV(v) => MetaGrpcReq::ListKV(v),
}
}
}

impl From<MetaGrpcReadReq> for RaftRequest {
fn from(v: MetaGrpcReadReq) -> Self {
let raft_request = RaftRequest {
Expand Down
5 changes: 5 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// - 2024-03-04: since: 1.2.361
/// 👥 client: `MetaSpec` use `ttl`, remove `expire_at`, require 1.2.258
///
/// - 2024-11-22: since 1.2.663
/// 🖥 server: remove `MetaGrpcReq::GetKV/MGetKV/ListKV`,
/// require the client to call kv_read_v1 for get/mget/list,
/// which is added `2024-01-07: since 1.2.287`
///
/// Server feature set:
/// ```yaml
/// server_features:
Expand Down
21 changes: 0 additions & 21 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,6 @@ impl MetaServiceImpl {
.await;
RaftReply::from(res)
}
MetaGrpcReq::GetKV(a) => {
let res = m
.get_kv(&a.key)
.log_elapsed_info(format!("GetKV: {:?}", a))
.await;
RaftReply::from(res)
}
MetaGrpcReq::MGetKV(a) => {
let res = m
.mget_kv(&a.keys)
.log_elapsed_info(format!("MGetKV: {:?}", a))
.await;
RaftReply::from(res)
}
MetaGrpcReq::ListKV(a) => {
let res = m
.prefix_list_kv(&a.prefix)
.log_elapsed_info(format!("ListKV: {:?}", a))
.await;
RaftReply::from(res)
}
};

network_metrics::incr_request_result(reply.error.is_empty());
Expand Down
4 changes: 3 additions & 1 deletion src/meta/service/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub static METASRV_SEMVER: LazyLock<Version> = LazyLock::new(|| {
});

/// Oldest compatible nightly meta-client version
pub static MIN_METACLI_SEMVER: Version = Version::new(0, 9, 41);
///
/// It should be 1.2.287 but 1.2.287 does not contain complete binaries
pub static MIN_METACLI_SEMVER: Version = Version::new(1, 2, 288);

/// The min meta-server version that can be deployed together in a cluster,
/// i.e., the network APIs are compatible.
Expand Down
29 changes: 9 additions & 20 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
Expand Down Expand Up @@ -135,25 +134,15 @@ impl UdfMgr {
#[async_backtrace::framed]
#[fastrace::trace]
pub async fn list_udf_fallback(&self) -> Result<Vec<UserDefinedFunction>, ErrorCode> {
let key = UdfIdent::new(&self.tenant, "");
let values = self.kv_api.prefix_list_kv(&key.to_string_key()).await?;

let mut udfs = Vec::with_capacity(values.len());
// At begin udf is serialize to json. https://github.com/datafuselabs/databend/pull/12729/files#diff-9c992028e59caebc313d761b8488b17f142618fb89db64c51c1655689d68c41b
// But we can not deserialize the UserDefinedFunction from json now,
// because add a new field created_on and the field `definition` refactor to a ENUM type.
for (name, value) in values {
let udf = crate::deserialize_struct(&value.data, ErrorCode::IllegalUDFFormat, || {
format!(
"Encountered invalid json data for LambdaUDF '{}', \
please drop this invalid udf and re-create it. \n\
Example: `DROP FUNCTION <invalid_udf>;` then `CREATE FUNCTION <invalid_udf> AS <udf_definition>;`\n\
",
name
)
})?;
udfs.push(udf);
}
let key = UdfIdent::new(&self.tenant, "dummy");
let dir = DirName::new(key);
let udfs = self
.kv_api
.list_pb_values(&dir)
.await?
.try_collect::<Vec<_>>()
.await?;

Ok(udfs)
}

Expand Down
21 changes: 12 additions & 9 deletions src/query/service/src/servers/admin/v1/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,33 @@ async fn load_background_tasks(
tasks.len()
);
let mut task_infos = Vec::with_capacity(tasks.len());
for (_, name, task) in tasks {
if params.task_state.is_some() && task.task_state != *params.task_state.as_ref().unwrap() {
for (name, seq_task) in tasks {
if params.task_state.is_some()
&& seq_task.task_state != *params.task_state.as_ref().unwrap()
{
continue;
}
if params.task_type.is_some() && task.task_type != *params.task_type.as_ref().unwrap() {
if params.task_type.is_some() && seq_task.task_type != *params.task_type.as_ref().unwrap() {
continue;
}
if task.task_type == BackgroundTaskType::COMPACTION {
if task.compaction_task_stats.is_none() {
if seq_task.task_type == BackgroundTaskType::COMPACTION {
if seq_task.compaction_task_stats.is_none() {
continue;
}
if params.table_id.is_some()
&& task.compaction_task_stats.as_ref().unwrap().table_id != params.table_id.unwrap()
&& seq_task.compaction_task_stats.as_ref().unwrap().table_id
!= params.table_id.unwrap()
{
continue;
}
}
if params.timestamp.as_ref().is_some()
&& task.last_updated.is_some()
&& task.last_updated.unwrap() < params.timestamp.unwrap()
&& seq_task.last_updated.is_some()
&& seq_task.last_updated.unwrap() < params.timestamp.unwrap()
{
continue;
}
task_infos.push((name, task));
task_infos.push((name.name().to_string(), seq_task.data));
}
Ok(ListBackgroundTasksResponse {
task_infos,
Expand Down
31 changes: 19 additions & 12 deletions src/query/storages/system/src/background_tasks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,24 @@ impl AsyncSystemTable for BackgroundTaskTable {
let mut trigger = Vec::with_capacity(tasks.len());
let mut create_timestamps = Vec::with_capacity(tasks.len());
let mut update_timestamps = Vec::with_capacity(tasks.len());
for (_, name, task) in tasks {
names.push(name);
types.push(task.task_type.to_string());
stats.push(task.task_state.to_string());
messages.push(task.message);
for (name, seq_task) in tasks {
names.push(name.name().to_string());
types.push(seq_task.task_type.to_string());
stats.push(seq_task.task_state.to_string());
messages.push(seq_task.message.to_string());
compaction_stats.push(
task.compaction_task_stats
seq_task
.compaction_task_stats
.as_ref()
.map(|s| serde_json::to_vec(s).unwrap_or_default()),
);
vacuum_stats.push(
task.vacuum_stats
seq_task
.vacuum_stats
.as_ref()
.map(|s| serde_json::to_vec(s).unwrap_or_default()),
);
if let Some(compact_stats) = task.compaction_task_stats.as_ref() {
if let Some(compact_stats) = seq_task.compaction_task_stats.as_ref() {
database_ids.push(compact_stats.db_id);
table_ids.push(compact_stats.table_id);
task_run_secs.push(compact_stats.total_compaction_time.map(|s| s.as_secs()));
Expand All @@ -98,10 +100,15 @@ impl AsyncSystemTable for BackgroundTaskTable {
table_ids.push(0);
task_run_secs.push(None);
}
creators.push(task.creator.map(|s| s.to_string()));
trigger.push(task.manual_trigger.map(|s| s.trigger.display().to_string()));
create_timestamps.push(task.created_at.timestamp_micros());
update_timestamps.push(task.last_updated.unwrap_or_default().timestamp_micros());
creators.push(seq_task.creator.as_ref().map(|s| s.to_string()));
trigger.push(
seq_task
.manual_trigger
.as_ref()
.map(|s| s.trigger.display().to_string()),
);
create_timestamps.push(seq_task.created_at.timestamp_micros());
update_timestamps.push(seq_task.last_updated.unwrap_or_default().timestamp_micros());
}
Ok(DataBlock::new_from_columns(vec![
StringType::from_data(names),
Expand Down
26 changes: 21 additions & 5 deletions tests/compat/test-compat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ binary_url() {
echo "https://github.com/datafuselabs/databend/releases/download/v${ver}-nightly/databend-v${ver}-nightly-x86_64-unknown-linux-gnu.tar.gz"
}

test_suite_url() {
local ver="$1"
echo "https://github.com/databendlabs/databend/releases/download/v${ver}-nightly/databend-testsuite-v${ver}-nightly-x86_64-unknown-linux-gnu.tar.gz"
}

# output: 0.7.58
# Without prefix `v` and `-nightly`
find_min_query_ver() {
Expand Down Expand Up @@ -104,9 +109,20 @@ git_partial_clone() {
download_test_suite() {
local ver="$1"

echo " === Download test suites from $ver:$query_test_path"
echo " === Download test suites $ver"

local url="$(test_suite_url $ver)"
local fn="databend-testsuite-v${ver}.tar.gz"

git_partial_clone "$bend_repo_url" "v$ver-nightly" "$query_test_path" old_suite
curl --connect-timeout 5 --retry-all-errors --retry 5 --retry-delay 1 -L "$url" -o "$fn"

mkdir -p ./testsuite/$ver
tar -xf "$fn" -C ./testsuite/$ver

echo " === Extracted test suites to ./testsuite/$ver:"
ls ./testsuite/$ver

chmod +x ./testsuite/$ver/bin/*
}

# Download config.toml for a specific version of query.
Expand Down Expand Up @@ -221,15 +237,15 @@ run_test() {
$sqllogictests --handlers mysql --run_dir 05_ddl
else
(
# download suites into ./old_suite
# download suites into ./testsuite/$query_ver/{bin/,suites/}
download_test_suite $query_ver
)

# Replace suites
rm -rf "tests/sqllogictests/suites"
mv "old_suite/tests/sqllogictests/suites" "tests/sqllogictests/suites"
mv "./testsuite/$query_ver/suites" "tests/sqllogictests/suites"

$sqllogictests --handlers mysql --run_dir 05_ddl
./testsuite/$query_ver/bin/databend-sqllogictests --handlers mysql --run_dir 05_ddl
cd -
fi
}
Expand Down

0 comments on commit e4bf8b2

Please sign in to comment.