Skip to content

Commit

Permalink
refactor: make test_watch_expired_events() less sensitive to inaccura…
Browse files Browse the repository at this point in the history
…te time (#16966)

* refactor: remove unused sled related config from databend-meta

* chore: adjust gRPC logging level

* refactor: make test_watch_expired_events() less sensitive to inaccurate time

- Fix: #16942

* refactor: add short sleep to test_meta_node_join_with_state to ensure server quits completely
  • Loading branch information
drmingdrmer authored Nov 28, 2024
1 parent 7cff135 commit 0c3c0e1
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 93 deletions.
4 changes: 2 additions & 2 deletions src/meta/binaries/metactl/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn import_v003(
raft_config: RaftConfig,
lines: impl IntoIterator<Item = Result<String, io::Error>>,
) -> anyhow::Result<Option<LogId>> {
let db = init_get_sled_db(raft_config.raft_dir.clone(), raft_config.sled_cache_size());
let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024);

let mut n = 0;
let mut max_log_id: Option<LogId> = None;
Expand Down Expand Up @@ -386,7 +386,7 @@ async fn init_new_cluster(
fn clear(args: &ImportArgs) -> anyhow::Result<()> {
eprintln!();
eprintln!("Clear All Sled Trees Before Import:");
let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 64 * 1024 * 1024 * 1024);
let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 1024 * 1024 * 1024);

let tree_names = db.tree_names();
for n in tree_names.iter() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/process/src/process_meta_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn process_sled_db<F>(config: &Config, convert: F) -> anyhow::Result<()>
where F: Fn(RaftStoreEntry) -> Result<Option<RaftStoreEntry>, anyhow::Error> {
let raft_config = &config.raft_config;

let db = init_get_sled_db(raft_config.raft_dir.clone(), 64 * 1024 * 1024 * 1024);
let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024);

let mut tree_names = db.tree_names();
tree_names.sort();
Expand Down
20 changes: 0 additions & 20 deletions src/meta/raft-store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ pub struct RaftConfig {
/// Otherwise this argument is ignored.
pub id: NodeId,

/// For test only: specifies the tree name prefix
pub sled_tree_prefix: String,

/// The maximum memory in MB that sled can use for caching.
pub sled_max_cache_size_mb: u64,

/// The node name. If the user specifies a name,
/// the user-supplied name is used, if not, the default name is used.
pub cluster_name: String,
Expand Down Expand Up @@ -190,8 +184,6 @@ impl Default for RaftConfig {
leave_via: vec![],
leave_id: None,
id: 0,
sled_tree_prefix: "".to_string(),
sled_max_cache_size_mb: 10 * 1024,
cluster_name: "foo_cluster".to_string(),
wait_leader_timeout: 70000,
}
Expand Down Expand Up @@ -298,16 +290,4 @@ impl RaftConfig {
}
Ok(())
}

/// Create a unique sled::Tree name by prepending a unique prefix.
/// So that multiple instance that depends on a sled::Tree can be used in one process.
/// sled does not allow to open multiple `sled::Db` in one process.
pub fn tree_name(&self, name: impl std::fmt::Display) -> String {
format!("{}{}", self.sled_tree_prefix, name)
}

/// Return the size of sled cache in bytes.
pub fn sled_cache_size(&self) -> u64 {
self.sled_max_cache_size_mb * 1024 * 1024
}
}
5 changes: 2 additions & 3 deletions src/meta/raft-store/src/ondisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ impl OnDisk {
return Ok(());
}

let db = init_get_sled_db(config.raft_dir.clone(), config.sled_cache_size());
let db = init_get_sled_db(config.raft_dir.clone(), 1024 * 1024 * 1024);

let tree_name = config.tree_name(TREE_HEADER);
let tree = SledTree::open(&db, &tree_name, config.is_sync())?;
let tree = SledTree::open(&db, TREE_HEADER, config.is_sync())?;
let ks = tree.key_space::<DataHeader>();

let header = ks.get(&Self::KEY_HEADER.to_string()).map_err(|e| {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/raft-store/src/ondisk/upgrade_to_v004.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl OnDisk {
let raft_log = RaftLogV004::open(raft_log_config)?;
let mut importer = importer::Importer::new(raft_log);

let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size());
let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024);

// Read the purged index
let first_log_index = {
Expand Down Expand Up @@ -213,7 +213,7 @@ impl OnDisk {

self.progress(format_args!(" Remove V003 log from sled db",));

let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size());
let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024);
for tree_name in db.tree_names() {
if tree_name == "__sled__default" {
continue;
Expand Down
7 changes: 3 additions & 4 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use futures::stream::TryChunksError;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use log::info;
use prost::Message;
use tokio_stream;
use tokio_stream::Stream;
Expand Down Expand Up @@ -109,7 +108,7 @@ impl MetaServiceImpl {
#[fastrace::trace]
async fn handle_kv_api(&self, request: Request<RaftRequest>) -> Result<RaftReply, Status> {
let req: MetaGrpcReq = request.try_into()?;
info!("{}: Received MetaGrpcReq: {:?}", func_name!(), req);
debug!("{}: Received MetaGrpcReq: {:?}", func_name!(), req);

let m = &self.meta_node;
let reply = match &req {
Expand All @@ -134,7 +133,7 @@ impl MetaServiceImpl {
) -> Result<(Option<Endpoint>, BoxStream<StreamItem>), Status> {
let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?;

info!("{}: Received ReadRequest: {:?}", func_name!(), req);
debug!("{}: Received ReadRequest: {:?}", func_name!(), req);

let req = ForwardRequest::new(1, req);

Expand All @@ -156,7 +155,7 @@ impl MetaServiceImpl {
) -> Result<(Option<Endpoint>, TxnReply), Status> {
let txn = request.into_inner();

info!("{}: Received TxnRequest: {}", func_name!(), txn);
debug!("{}: Received TxnRequest: {}", func_name!(), txn);

let ent = LogEntry::new(Cmd::Transaction(txn.clone()));

Expand Down
20 changes: 1 addition & 19 deletions src/meta/service/src/configs/outer_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ pub struct ConfigViaEnv {
pub kvsrv_single: bool,
pub metasrv_join: Vec<String>,
pub kvsrv_id: u64,
pub sled_tree_prefix: String,
pub sled_max_cache_size_mb: u64,
pub cluster_name: String,
}

Expand Down Expand Up @@ -363,8 +361,6 @@ impl From<Config> for ConfigViaEnv {
kvsrv_single: cfg.raft_config.single,
metasrv_join: cfg.raft_config.join,
kvsrv_id: cfg.raft_config.id,
sled_tree_prefix: cfg.raft_config.sled_tree_prefix,
sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb,
cluster_name: cfg.raft_config.cluster_name,
}
}
Expand Down Expand Up @@ -405,8 +401,6 @@ impl Into<Config> for ConfigViaEnv {
// Do not allow to leave via environment variable
leave_id: None,
id: self.kvsrv_id,
sled_tree_prefix: self.sled_tree_prefix,
sled_max_cache_size_mb: self.sled_max_cache_size_mb,
cluster_name: self.cluster_name,
};
let log_config = LogConfig {
Expand Down Expand Up @@ -539,7 +533,7 @@ pub struct RaftConfig {

/// The total cache size for snapshot blocks.
///
/// By default it is 1GB.
/// By default, it is 1GB.
#[clap(long, default_value = "1073741824")]
pub snapshot_db_block_cache_size: u64,

Expand Down Expand Up @@ -574,14 +568,6 @@ pub struct RaftConfig {
#[clap(long, default_value = "0")]
pub id: u64,

/// For test only: specifies the tree name prefix
#[clap(long, default_value = "")]
pub sled_tree_prefix: String,

/// The maximum memory in MB that sled can use for caching. Default is 10GB
#[clap(long, default_value = "10240")]
pub sled_max_cache_size_mb: u64,

/// The node name. If the user specifies a name, the user-supplied name is used,
/// if not, the default name is used
#[clap(long, default_value = "foo_cluster")]
Expand Down Expand Up @@ -630,8 +616,6 @@ impl From<RaftConfig> for InnerRaftConfig {
leave_via: x.leave_via,
leave_id: x.leave_id,
id: x.id,
sled_tree_prefix: x.sled_tree_prefix,
sled_max_cache_size_mb: x.sled_max_cache_size_mb,
cluster_name: x.cluster_name,
wait_leader_timeout: x.wait_leader_timeout,
}
Expand Down Expand Up @@ -668,8 +652,6 @@ impl From<InnerRaftConfig> for RaftConfig {
leave_via: inner.leave_via,
leave_id: inner.leave_id,
id: inner.id,
sled_tree_prefix: inner.sled_tree_prefix,
sled_max_cache_size_mb: inner.sled_max_cache_size_mb,
cluster_name: inner.cluster_name,
wait_leader_timeout: inner.wait_leader_timeout,
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/service/tests/it/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ cluster_name = "foo_cluster"
assert!(!cfg.raft_config.single);
assert_eq!(cfg.raft_config.join, vec!["j1", "j2"]);
assert_eq!(cfg.raft_config.id, 20);
assert_eq!(cfg.raft_config.sled_tree_prefix, "sled_foo");
assert_eq!(cfg.raft_config.cluster_name, "foo_cluster");
});

Expand Down
48 changes: 19 additions & 29 deletions src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,14 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
// - Before applying, 32 expired keys will be cleaned.
// - When applying, touched expired keys will be cleaned.

fn sec(x: u64) -> Duration {
Duration::from_secs(x)
}

let (_tc, addr) = crate::tests::start_metasrv().await?;

let watch_prefix = "w_";
let now_sec = now();
let expire = now_sec + 11;
// dbg!(now_sec, expire);

info!("--- prepare data that are gonna expire");
{
Expand All @@ -332,31 +334,19 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
for i in 0..(32 + 1) {
let k = format!("w_auto_gc_{}", i);
txn.if_then
.push(TxnOp::put_with_ttl(&k, b(&k), Some(Duration::from_secs(1))));
.push(TxnOp::put_with_ttl(&k, b(&k), Some(sec(1))));
}

// Expired key won't be cleaned when they are read, although read returns None.

txn.if_then.push(TxnOp::put_with_ttl(
"w_b1",
b("w_b1"),
Some(Duration::from_secs(6)),
));
txn.if_then.push(TxnOp::put_with_ttl(
"w_b2",
b("w_b2"),
Some(Duration::from_secs(6)),
));
txn.if_then.push(TxnOp::put_with_ttl(
"w_b3a",
b("w_b3a"),
Some(Duration::from_secs(6)),
));
txn.if_then.push(TxnOp::put_with_ttl(
"w_b3b",
b("w_b3b"),
Some(Duration::from_secs(11)),
));
txn.if_then
.push(TxnOp::put_with_ttl("w_b1", b("w_b1"), Some(sec(6))));
txn.if_then
.push(TxnOp::put_with_ttl("w_b2", b("w_b2"), Some(sec(6))));
txn.if_then
.push(TxnOp::put_with_ttl("w_b3a", b("w_b3a"), Some(sec(6))));
txn.if_then
.push(TxnOp::put_with_ttl("w_b3b", b("w_b3b"), Some(sec(15))));

client.transaction(txn).await?;
}
Expand All @@ -373,8 +363,8 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
watch_client.request(watch).await?
};

info!("--- sleep {} for expiration", expire - now_sec);
tokio::time::sleep(Duration::from_secs(10)).await;
info!("--- sleep 10 for expiration");
tokio::time::sleep(sec(10)).await;

info!("--- apply another txn in another thread to override keys");
{
Expand Down Expand Up @@ -430,20 +420,20 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
"w_b3b",
seq + 3,
"w_b3b",
Some(KvMeta::new_expire(now_sec + 16)),
Some(KvMeta::new_expire(now_sec + 15)),
), // expired
];

// remove the millisecond part of expire_at
// The evaluated expire_at could not equal to the real expire_at, so we need to tidy the expire_at.
fn tidy(mut ev: Event) -> Event {
if let Some(ref mut prev) = ev.prev {
if let Some(ref mut meta) = prev.meta {
meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000);
meta.expire_at = meta.expire_at.map(|x| x / 10 * 10);
}
}
if let Some(ref mut current) = ev.current {
if let Some(ref mut meta) = current.meta {
meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000);
meta.expire_at = meta.expire_at.map(|x| x / 10 * 10);
}
}
ev
Expand Down
23 changes: 14 additions & 9 deletions src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,20 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
tc2.config.raft_config.single = false;
tc2.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];

let meta_node = MetaNode::start(&tc0.config).await?;
// Initial log, leader blank log, add node-0.
let n1 = MetaNode::start(&tc0.config).await?;
// Initial membership log, leader blank log, add node-0 log.
let mut log_index = 3;

let res = meta_node
let res = n1
.join_cluster(
&tc0.config.raft_config,
tc0.config.grpc_api_advertise_address(),
)
.await?;
assert_eq!(Err("Did not join: --join is empty".to_string()), res);

let meta_node1 = MetaNode::start(&tc1.config).await?;
let res = meta_node1
let n1 = MetaNode::start(&tc1.config).await?;
let res = n1
.join_cluster(
&tc1.config.raft_config,
tc1.config.grpc_api_advertise_address(),
Expand All @@ -342,8 +342,7 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {

// Two membership logs, one add-node log;
log_index += 3;
meta_node1
.raft
n1.raft
.wait(timeout())
.applied_index(Some(log_index), "node-1 join cluster")
.await?;
Expand All @@ -354,6 +353,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
n2.stop().await?;
}

// Wait a second to ensure server quits completely.
sleep(Duration::from_secs(1)).await;

info!("--- Allow to join node-2 with initialized store");
{
let n2 = MetaNode::start(&tc2.config).await?;
Expand All @@ -368,8 +370,8 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
// Two membership logs, one add-node log;
log_index += 3;

// Add this barrier to ensure all of the logs are applied before quit.
// Otherwise the next time node-2 starts it can not see the applied
// Add this barrier to ensure all the logs are applied before quit.
// Otherwise, the next time node-2 starts it can not see the applied
// membership and believes it has not yet joined into a cluster.
n2.raft
.wait(timeout())
Expand All @@ -379,6 +381,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
n2.stop().await?;
}

// Wait a second to ensure server quits completely.
sleep(Duration::from_secs(1)).await;

info!("--- Not allowed to join node-2 with store with membership");
{
let n2 = MetaNode::start(&tc2.config).await?;
Expand Down
3 changes: 0 additions & 3 deletions src/meta/service/tests/it/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ impl MetaSrvTestContext {

let host = "127.0.0.1";

// We use a single sled db for all unit test. Every unit test need a unique prefix so that it opens different tree.
config.raft_config.sled_tree_prefix = format!("test-{}-", config_id);

{
let grpc_port = next_port();
config.grpc_api_address = format!("{}:{}", host, grpc_port);
Expand Down

0 comments on commit 0c3c0e1

Please sign in to comment.