Skip to content

Commit

Permalink
add logging timestamp; server reconnect to manager (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
luanshaotong authored Aug 21, 2023
1 parent e98fd83 commit 97e15cf
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 34 deletions.
1 change: 0 additions & 1 deletion intercept/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use sealfs::common::serialization::{
use sealfs::rpc::client::TcpStreamCreator;
use sealfs::{offset_of, rpc};
pub struct Client {
// TODO replace with a thread safe data structure
pub client: Arc<
rpc::client::RpcClient<
tokio::net::tcp::OwnedReadHalf,
Expand Down
3 changes: 2 additions & 1 deletion intercept/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod syscall_intercept;
pub mod test_log;

use client::CLIENT;
use env_logger::fmt;
use file_desc::{FdAttr, FdType};
use lazy_static::lazy_static;
use libc::{
Expand Down Expand Up @@ -69,7 +70,7 @@ extern "C" fn initialize() {
let log_level = std::env::var("SEALFS_LOG_LEVEL").unwrap_or("warn".to_string());
let mut builder = env_logger::Builder::from_default_env();
builder
.format_timestamp(None)
.format_timestamp(Some(fmt::TimestampPrecision::Millis))
.filter(None, log::LevelFilter::from_str(&log_level).unwrap());
builder.init();

Expand Down
11 changes: 7 additions & 4 deletions src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use env_logger::fmt;
use log::{error, info, warn};
use sealfs::manager::manager_service::update_server_status;
use sealfs::{manager::manager_service::ManagerService, rpc::server::RpcServer};
Expand Down Expand Up @@ -92,10 +93,12 @@ async fn main() -> anyhow::Result<()> {
},
};

builder.format_timestamp(None).filter(
None,
log::LevelFilter::from_str(&properties.log_level).unwrap(),
);
builder
.format_timestamp(Some(fmt::TimestampPrecision::Millis))
.filter(
None,
log::LevelFilter::from_str(&properties.log_level).unwrap(),
);
builder.init();

info!("Starting manager with log level: {}", properties.log_level);
Expand Down
21 changes: 10 additions & 11 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use env_logger::fmt;
use log::info;
use sealfs::server;
use serde::{Deserialize, Serialize};
Expand All @@ -27,8 +28,6 @@ struct Args {
#[arg(required = true, long)]
storage_path: Option<String>,
#[arg(long)]
heartbeat: Option<bool>,
#[arg(long)]
log_level: Option<String>,
}

Expand All @@ -40,7 +39,6 @@ struct Properties {
cache_capacity: usize,
write_buffer_size: usize,
storage_path: String,
heartbeat: bool,
log_level: String,
}

Expand All @@ -56,18 +54,19 @@ async fn main() -> anyhow::Result<(), Box<dyn std::error::Error>> {
cache_capacity: args.cache_capacity.unwrap_or(13421772),
write_buffer_size: args.write_buffer_size.unwrap_or(0x4000000),
storage_path: args.storage_path.unwrap(),
heartbeat: args.heartbeat.unwrap_or(false),
log_level: args.log_level.unwrap_or("warn".to_owned()),
};

let mut builder = env_logger::Builder::from_default_env();
builder.format_timestamp(None).filter(
None,
match log::LevelFilter::from_str(&properties.log_level) {
Ok(level) => level,
Err(_) => log::LevelFilter::Warn,
},
);
builder
.format_timestamp(Some(fmt::TimestampPrecision::Millis))
.filter(
None,
match log::LevelFilter::from_str(&properties.log_level) {
Ok(level) => level,
Err(_) => log::LevelFilter::Warn,
},
);
builder.init();

info!("start server with properties: {:?}", properties);
Expand Down
3 changes: 2 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod daemon;
pub mod fuse_client;

use clap::{Parser, Subcommand};
use env_logger::fmt;
use fuser::{
Filesystem, ReplyAttr, ReplyCreate, ReplyData, ReplyDirectory, ReplyEntry, ReplyOpen,
ReplyWrite, Request,
Expand Down Expand Up @@ -369,7 +370,7 @@ pub async fn run_command() -> Result<(), Box<dyn std::error::Error>> {
};
let mut builder = env_logger::Builder::from_default_env();
builder
.format_timestamp(None)
.format_timestamp(Some(fmt::TimestampPrecision::Millis))
.filter(None, log::LevelFilter::from_str(&log_level).unwrap());
builder.init();

Expand Down
1 change: 1 addition & 0 deletions src/common/info_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub trait ClientStatusMonitor: InfoSyncer {
ClusterStatus::PreFinish => self.get_new_address(path),
ClusterStatus::Finishing => self.get_address(path),
ClusterStatus::StatusError => todo!(),
ClusterStatus::Unkown => todo!(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub enum ClusterStatus {
PreFinish = 306,
Finishing = 307,
StatusError = 308,
Unkown = 309,
}

impl TryFrom<u32> for ClusterStatus {
Expand All @@ -292,6 +293,7 @@ impl TryFrom<u32> for ClusterStatus {
306 => Ok(ClusterStatus::PreFinish),
307 => Ok(ClusterStatus::Finishing),
308 => Ok(ClusterStatus::StatusError),
309 => Ok(ClusterStatus::Unkown),
_ => Err(format!("Unkown value: {}", value)),
}
}
Expand All @@ -309,6 +311,7 @@ impl From<ClusterStatus> for u32 {
ClusterStatus::PreFinish => 306,
ClusterStatus::Finishing => 307,
ClusterStatus::StatusError => 308,
ClusterStatus::Unkown => 309,
}
}
}
Expand All @@ -327,6 +330,7 @@ impl TryFrom<i32> for ClusterStatus {
306 => Ok(ClusterStatus::PreFinish),
307 => Ok(ClusterStatus::Finishing),
308 => Ok(ClusterStatus::StatusError),
309 => Ok(ClusterStatus::Unkown),
_ => Err(format!("Unkown value: {}", value)),
}
}
Expand All @@ -344,6 +348,7 @@ impl From<ClusterStatus> for i32 {
ClusterStatus::PreFinish => 306,
ClusterStatus::Finishing => 307,
ClusterStatus::StatusError => 308,
ClusterStatus::Unkown => 309,
}
}
}
Expand All @@ -360,6 +365,7 @@ impl Display for ClusterStatus {
Self::PreFinish => write!(f, "PreFinish"),
Self::Finishing => write!(f, "DeleteNodes"),
Self::StatusError => write!(f, "StatusError"),
Self::Unkown => write!(f, "Unkown"),
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<
return Ok(());
}
Err(e) => {
error!("wait for callback failed: {}", e);
error!("wait for callback failed: {}, batch: {}, id {}, operation type: {}, path: {}", e, batch, id, operation_type, path);
continue;
}
}
Expand Down Expand Up @@ -293,13 +293,16 @@ pub async fn parse_response<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin>(
let result = {
match pool.lock_if_not_timeout(batch, id) {
Ok(_) => Ok(()),
Err(_) => Err("lock timeout"),
Err(e) => Err(e),
}
};
match result {
Ok(_) => {}
Err(e) => {
error!("parse_response lock timeout: {}", e);
error!(
"parse_response lock timeout: {}, batch: {}, id: {}",
e, batch, id
);
let result = connection
.clean_response(&mut read_stream, total_length)
.await;
Expand Down
7 changes: 7 additions & 0 deletions src/rpc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ impl<W: AsyncWriteExt + Unpin, R: AsyncReadExt + Unpin> ServerConnection<W, R> {
self.name_id.clone()
}

pub async fn close(&self) -> Result<(), String> {
let mut stream = self.write_stream.lock().await;
stream.shutdown().await.map_err(|e| e.to_string())?;
info!("close connection {}", self.name_id);
Ok(())
}

// response
// | batch | id | status | flags | total_length | meta_data_lenght | data_length | meta_data | data |
// | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 0~ | 0~ |
Expand Down
10 changes: 7 additions & 3 deletions src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn handle<
connection.id,
header.r#type,
header.flags,
path,
path.clone(),
data,
metadata,
)
Expand All @@ -61,11 +61,15 @@ pub async fn handle<
)
.await
{
error!("handle, send response error: {}", e);
error!("handle connection: {} , send response error: {}, batch: {}, id: {}, operation_type: {}, flags: {}, path: {:?}", connection.id, e, header.batch, header.id, header.r#type, header.flags, std::str::from_utf8(&path));
let _ = connection.close().await;
}
}
Err(e) => {
error!("handle, dispatch error: {}", e);
error!(
"handle connection: {} , dispatch error: {}",
connection.id, e
);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/server/distributed_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
meta_engine,
client: client.clone(),
sender: Sender::new(client),
cluster_status: AtomicI32::new(ClusterStatus::Initializing.into()),
cluster_status: AtomicI32::new(ClusterStatus::Unkown.into()),
hash_ring: Arc::new(RwLock::new(None)),
new_hash_ring: Arc::new(RwLock::new(None)),
manager_address: Arc::new(Mutex::new("".to_string())),
Expand Down Expand Up @@ -471,6 +471,7 @@ where
}
ClusterStatus::Finishing => (self.get_address(path), false),
ClusterStatus::StatusError => todo!(),
ClusterStatus::Unkown => todo!(),
//s => panic!("get forward address failed, invalid cluster status: {}", s),
}
}
Expand Down Expand Up @@ -893,6 +894,11 @@ where
) -> Result<Vec<u8>, i32> {
let path = get_full_path(parent, name);

debug!(
"create file, parent_dir: {}, file_name: {}, oflag: {}, umask: {}, mode: {}",
parent, name, oflag, umask, mode
);

if self.lock_file(parent)?.insert(name.to_owned(), 0).is_some() {
if (oflag & O_EXCL) != 0 {
debug!("create file failed, file exists, path: {}", path);
Expand Down
28 changes: 23 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ pub async fn run(

tokio::spawn(sync_cluster_status(Arc::clone(&engine)));

while <i32 as TryInto<ClusterStatus>>::try_into(engine.cluster_status.load(Ordering::Relaxed))
.unwrap()
== ClusterStatus::Unkown
{
sleep(Duration::from_secs(1)).await;
}

let handler = Arc::new(FileRequestHandler::new(engine.clone()));
let server = RpcServer::new(handler, &server_address);

Expand Down Expand Up @@ -319,12 +326,23 @@ pub async fn run(
.write()
.replace(HashRing::new(all_servers_address));
info!("Init: Update Hash Ring Success.");
match engine.update_server_status(ServerStatus::Finished).await {
Ok(_) => {
info!("Update Server Status to Finish Success.");

match <i32 as TryInto<ClusterStatus>>::try_into(engine.cluster_status.load(Ordering::Relaxed))
.unwrap()
{
ClusterStatus::Initializing => {
match engine.update_server_status(ServerStatus::Finished).await {
Ok(_) => {
info!("Update Server Status to Finish Success.");
}
Err(e) => {
panic!("Update Server Status to Finish Failed. Error = {}", e);
}
}
}
Err(e) => {
panic!("Update Server Status to Finish Failed. Error = {}", e);
ClusterStatus::Idle => {}
e => {
panic!("Cluster Status Unexpected. Status = {:?}", e as u32);
}
}
info!("Init: Start Transferring Data.");
Expand Down
11 changes: 7 additions & 4 deletions src/server/storage_engine/file_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ impl StorageEngine for FileEngine {
return Err(f_errno);
};
debug!(
"read_file path: {}, size: {}, offset: {}, data: {:?}",
path, real_size, offset, data
"read_file path: {}, size: {}, offset: {}, data_length: {:?}",
path,
real_size,
offset,
data.len()
);

// this is a temporary solution, which results in an extra memory copy.
Expand Down Expand Up @@ -154,7 +157,7 @@ impl StorageEngine for FileEngine {
};
if fd < 0 {
let f_errno = errno();
error!("read file error: {:?}", status_to_string(f_errno));
error!("write file error: {:?}", status_to_string(f_errno));
return Err(f_errno);
}
self.cache
Expand Down Expand Up @@ -201,7 +204,7 @@ impl StorageEngine for FileEngine {
};
if fd < 0 {
let f_errno = errno();
error!("read file error: {:?}", status_to_string(f_errno));
error!("create_file error: {:?}", status_to_string(f_errno));
return Err(f_errno);
}
self.cache
Expand Down

0 comments on commit 97e15cf

Please sign in to comment.