Skip to content

Commit

Permalink
logger: initial commit of klog functionality for segcache-rs
Browse files Browse the repository at this point in the history
Adds a new logger which has command logger specific functionality.

Adds command logging to pelikan_segcache_rs.

Changes to the memcache wire protocol to support providing the
necessary context for command logging.

Fixes `cas` operation in seg storage implementation.
  • Loading branch information
brayniac committed Aug 31, 2021
1 parent 0f9b93d commit f8d3b75
Show file tree
Hide file tree
Showing 30 changed files with 960 additions and 256 deletions.
282 changes: 154 additions & 128 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"src/rust/config",
"src/rust/core/server",
"src/rust/entrystore",
"src/rust/logger",
"src/rust/metrics",
"src/rust/protocol",
"src/rust/queues",
Expand Down
4 changes: 4 additions & 0 deletions config/segcache.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ time_type = "Memcache"
# choose from: error, warn, info, debug, trace
log_level = "info"

[klog]
klog_file = "segcache.cmd"
klog_sample = 100

[sockio]

[tcp]
Expand Down
2 changes: 1 addition & 1 deletion src/rust/config/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn log_nbuf() -> usize {
}

// struct definitions
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DebugConfig {
#[serde(with = "LevelDef")]
#[serde(default = "log_level")]
Expand Down
92 changes: 92 additions & 0 deletions src/rust/config/src/klog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2020 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use serde::{Deserialize, Serialize};

////////////////////////////////////////////////////////////////////////////////
// constants to define default values
////////////////////////////////////////////////////////////////////////////////

// log to the file path
const KLOG_FILE: Option<String> = None;

// flush interval in milliseconds
const KLOG_INTERVAL: usize = 100;

// buffer size in bytes
const KLOG_NBUF: usize = 0;

// log 1 in every N commands
const KLOG_SAMPLE: usize = 100;

////////////////////////////////////////////////////////////////////////////////
// helper functions
////////////////////////////////////////////////////////////////////////////////

fn klog_file() -> Option<String> {
KLOG_FILE
}

fn klog_interval() -> usize {
KLOG_INTERVAL
}

fn klog_nbuf() -> usize {
KLOG_NBUF
}

fn klog_sample() -> usize {
KLOG_SAMPLE
}


////////////////////////////////////////////////////////////////////////////////
// struct definitions
////////////////////////////////////////////////////////////////////////////////

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct KlogConfig {
#[serde(default = "klog_file")]
klog_file: Option<String>,
#[serde(default = "klog_interval")]
klog_interval: usize,
#[serde(default = "klog_nbuf")]
klog_nbuf: usize,
#[serde(default = "klog_sample")]
klog_sample: usize,
}

////////////////////////////////////////////////////////////////////////////////
// implementation
////////////////////////////////////////////////////////////////////////////////

impl KlogConfig {
pub fn file(&self) -> Option<String> {
self.klog_file.clone()
}

pub fn interval(&self) -> usize {
self.klog_interval
}

pub fn nbuf(&self) -> usize {
self.klog_nbuf
}

pub fn sample(&self) -> usize {
self.klog_sample
}
}

// trait implementations
impl Default for KlogConfig {
fn default() -> Self {
Self {
klog_file: klog_file(),
klog_interval: klog_interval(),
klog_nbuf: klog_nbuf(),
klog_sample: klog_sample(),
}
}
}
2 changes: 2 additions & 0 deletions src/rust/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod array;
mod buf;
mod dbuf;
mod debug;
mod klog;
mod pingserver;
pub mod seg;
mod segcache;
Expand All @@ -26,6 +27,7 @@ pub use array::ArrayConfig;
pub use buf::BufConfig;
pub use dbuf::DbufConfig;
pub use debug::DebugConfig;
pub use klog::KlogConfig;
pub use pingserver::PingserverConfig;
pub use seg::SegConfig;
pub use segcache::SegcacheConfig;
Expand Down
7 changes: 7 additions & 0 deletions src/rust/config/src/segcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub struct SegcacheConfig {
#[serde(default)]
debug: DebugConfig,
#[serde(default)]
klog: KlogConfig,
#[serde(default)]
sockio: SockioConfig,
#[serde(default)]
tcp: TcpConfig,
Expand Down Expand Up @@ -120,6 +122,10 @@ impl SegcacheConfig {
&self.debug
}

pub fn klog(&self) -> &KlogConfig {
&self.klog
}

pub fn sockio(&self) -> &SockioConfig {
&self.sockio
}
Expand Down Expand Up @@ -153,6 +159,7 @@ impl Default for SegcacheConfig {

buf: Default::default(),
debug: Default::default(),
klog: Default::default(),
sockio: Default::default(),
tcp: Default::default(),
tls: Default::default(),
Expand Down
1 change: 1 addition & 0 deletions src/rust/core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ common = { path = "../../common" }
config = { path = "../../config" }
crossbeam-channel = "0.5.0"
libc = "0.2.83"
logger = { path = "../../logger" }
metrics = { path = "../../metrics" }
mio = { version = "0.7.7", features = ["os-poll", "tcp"] }
protocol = { path = "../../protocol" }
Expand Down
4 changes: 3 additions & 1 deletion src/rust/core/server/src/process/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use logger::PelikanLogReceiver;
use super::*;
use crate::threads::*;
use crate::THREAD_PREFIX;
Expand Down Expand Up @@ -46,13 +47,14 @@ where
storage: Storage,
max_buffer_size: usize,
parser: Parser,
logger: PelikanLogReceiver,
) -> Self {
// initialize admin
let ssl_context = common::ssl::ssl_context(tls_config).unwrap_or_else(|e| {
error!("failed to initialize TLS: {}", e);
std::process::exit(1);
});
let admin = Admin::new(admin_config, ssl_context).unwrap_or_else(|e| {
let admin = Admin::new(admin_config, ssl_context, logger).unwrap_or_else(|e| {
error!("failed to initialize admin: {}", e);
std::process::exit(1);
});
Expand Down
9 changes: 7 additions & 2 deletions src/rust/core/server/src/threads/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! The admin thread, which handles admin requests to return stats, get version
//! info, etc.

use logger::PelikanLogReceiver;
use super::EventLoop;
use crate::poll::{Poll, LISTENER_TOKEN, WAKER_TOKEN};
use boring::ssl::{HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslStream};
Expand Down Expand Up @@ -37,11 +38,12 @@ pub struct Admin {
ssl_context: Option<SslContext>,
signal_queue: QueuePairs<(), Signal>,
parser: AdminRequestParser,
log_receiver: PelikanLogReceiver,
}

impl Admin {
/// Creates a new `Admin` event loop.
pub fn new(config: &AdminConfig, ssl_context: Option<SslContext>) -> Result<Self, Error> {
pub fn new(config: &AdminConfig, ssl_context: Option<SslContext>, log_receiver: PelikanLogReceiver) -> Result<Self, Error> {
let addr = config.socket_addr().map_err(|e| {
error!("{}", e);
std::io::Error::new(std::io::ErrorKind::Other, "Bad listen address")
Expand All @@ -68,6 +70,7 @@ impl Admin {
ssl_context,
signal_queue,
parser: AdminRequestParser::new(),
log_receiver,
})
}

Expand Down Expand Up @@ -116,7 +119,7 @@ impl Admin {
Ok((stream, _)) => {
// handle TLS if it is configured
if let Some(ssl_context) = &self.ssl_context {
match Ssl::new(&ssl_context).map(|v| v.accept(stream)) {
match Ssl::new(ssl_context).map(|v| v.accept(stream)) {
// handle case where we have a fully-negotiated
// TLS stream on accept()
Ok(Ok(tls_stream)) => {
Expand Down Expand Up @@ -264,6 +267,8 @@ impl Admin {
}

self.get_rusage();

self.log_receiver.flush();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/rust/core/server/src/threads/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Listener {
Ok((stream, _)) => {
// handle TLS if it is configured
if let Some(ssl_context) = &self.ssl_context {
match Ssl::new(&ssl_context).map(|v| v.accept(stream)) {
match Ssl::new(ssl_context).map(|v| v.accept(stream)) {
// handle case where we have a fully-negotiated
// TLS stream on accept()
Ok(Ok(tls_stream)) => {
Expand Down Expand Up @@ -219,7 +219,7 @@ impl Listener {
}
}
_ => {
self.handle_session_event(&event);
self.handle_session_event(event);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/rust/core/server/src/threads/worker/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where
}
}
Token(_) => {
self.handle_event(&event);
self.handle_event(event);
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/rust/core/server/src/threads/worker/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! the request using the backing storage, and then composes a response onto the
//! session buffer.

use rustcommon_time::CoarseInstant;
use super::EventLoop;
use crate::poll::{Poll, WAKER_TOKEN};
use common::signal::Signal;
Expand Down Expand Up @@ -76,11 +77,16 @@ where
/// Run the `Worker` in a loop, handling new session events
pub fn run(&mut self) {
let mut events = Events::with_capacity(self.nevent);
let mut last_expire = CoarseInstant::recent();

loop {
increment_counter!(&Stat::WorkerEventLoop);

self.storage.expire();
let now = CoarseInstant::recent();
if now != last_expire {
self.storage.expire();
last_expire = now;
}

// get events with timeout
if self.poll.poll(&mut events, self.timeout).is_err() {
Expand Down Expand Up @@ -108,7 +114,7 @@ where
}
}
_ => {
self.handle_event(&event);
self.handle_event(event);
}
}
}
Expand Down
Loading

0 comments on commit f8d3b75

Please sign in to comment.