Skip to content

Commit

Permalink
segcache_rs stats cleanup (#355)
Browse files Browse the repository at this point in the history
Changes RSS units to bytes instead of kilobytes.

Changes how segment acquire metrics work.

Adds gauge for count of TCP connections.

Fixes live/dead item tracking, consolidates stats increments and adds
comments about logic.
  • Loading branch information
brayniac authored Nov 1, 2021
1 parent 9aae7a4 commit 9f48b7c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 22 deletions.
20 changes: 10 additions & 10 deletions src/rust/core/server/src/threads/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ static_metrics! {
static RU_NIVCSW: Counter;
}

const KB: u64 = 1024; // one kilobyte in bytes
const S: u64 = 1_000_000_000; // one second in nanoseconds
const US: u64 = 1_000; // one microsecond in nanoseconds

/// A `Admin` is used to bind to a given socket address and handle out-of-band
/// admin requests.
pub struct Admin {
Expand Down Expand Up @@ -351,16 +355,12 @@ impl Admin {
};

if unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut rusage) } == 0 {
RU_UTIME.set(
rusage.ru_utime.tv_sec as u64 * 1000000000 + rusage.ru_utime.tv_usec as u64 * 1000,
);
RU_STIME.set(
rusage.ru_stime.tv_sec as u64 * 1000000000 + rusage.ru_stime.tv_usec as u64 * 1000,
);
RU_MAXRSS.set(rusage.ru_maxrss as i64);
RU_IXRSS.set(rusage.ru_ixrss as i64);
RU_IDRSS.set(rusage.ru_idrss as i64);
RU_ISRSS.set(rusage.ru_isrss as i64);
RU_UTIME.set(rusage.ru_utime.tv_sec as u64 * S + rusage.ru_utime.tv_usec as u64 * US);
RU_STIME.set(rusage.ru_stime.tv_sec as u64 * S + rusage.ru_stime.tv_usec as u64 * US);
RU_MAXRSS.set(rusage.ru_maxrss * KB as i64);
RU_IXRSS.set(rusage.ru_ixrss * KB as i64);
RU_IDRSS.set(rusage.ru_idrss * KB as i64);
RU_ISRSS.set(rusage.ru_isrss * KB as i64);
RU_MINFLT.set(rusage.ru_minflt as u64);
RU_MAJFLT.set(rusage.ru_majflt as u64);
RU_NSWAP.set(rusage.ru_nswap as u64);
Expand Down
2 changes: 2 additions & 0 deletions src/rust/session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static_metrics! {

static TCP_ACCEPT: Counter;
static TCP_CLOSE: Counter;
static TCP_CONN_CURR: Gauge;
static TCP_RECV_BYTE: Counter;
static TCP_SEND_BYTE: Counter;
static TCP_SEND_PARTIAL: Counter;
Expand Down Expand Up @@ -122,6 +123,7 @@ impl Session {
/// Create a new `Session`
fn new(stream: Stream, min_capacity: usize, max_capacity: usize) -> Self {
TCP_ACCEPT.increment();
TCP_CONN_CURR.add(1);
Self {
token: Token(0),
stream,
Expand Down
3 changes: 2 additions & 1 deletion src/rust/session/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::net::SocketAddr;
use boring::ssl::{HandshakeError, MidHandshakeSslStream, SslStream};

use super::TcpStream;
use crate::TCP_CLOSE;
use crate::{TCP_CLOSE, TCP_CONN_CURR};

pub struct Stream {
inner: Option<StreamType>,
Expand Down Expand Up @@ -78,6 +78,7 @@ impl Stream {

pub fn close(&mut self) {
TCP_CLOSE.increment();
TCP_CONN_CURR.sub(1);
if let Some(stream) = self.inner.take() {
self.inner = match stream {
StreamType::Plain(s) => {
Expand Down
19 changes: 19 additions & 0 deletions src/rust/storage/seg/src/seg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@

use crate::*;

use metrics::{static_metrics, Counter};

const RESERVE_RETRIES: usize = 3;

static_metrics! {
static SEGMENT_REQUEST: Counter;
static SEGMENT_REQUEST_FAILURE: Counter;
static SEGMENT_REQUEST_SUCCESS: Counter;
}

/// A pre-allocated key-value store with eager expiration. It uses a
/// segment-structured design that stores data in fixed-size segments, grouping
/// objects with nearby expiration time into the same segment, and lifting most
Expand Down Expand Up @@ -130,18 +138,29 @@ impl Seg {
return Err(SegError::ItemOversized { size, key });
}
Err(TtlBucketsError::NoFreeSegments) => {
if retries == RESERVE_RETRIES {
// first attempt to acquire a free segment, increment
// the stats
SEGMENT_REQUEST.increment();
}
if self
.segments
.evict(&mut self.ttl_buckets, &mut self.hashtable)
.is_err()
{
retries -= 1;
} else {
// we successfully got a segment, increment the stat and
// return to start of loop to reserve the item
SEGMENT_REQUEST_SUCCESS.increment();
continue;
}
}
}
if retries == 0 {
// segment acquire failed, increment the stat and return with
// an error
SEGMENT_REQUEST_FAILURE.increment();
return Err(SegError::NoFreeSegments);
}
retries -= 1;
Expand Down
33 changes: 27 additions & 6 deletions src/rust/storage/seg/src/segments/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<'a> Segment<'a> {
ITEM_CURRENT.decrement();
ITEM_CURRENT_BYTES.sub(item_size);
ITEM_DEAD.increment();
ITEM_DEAD_BYTES.sub(item_size);
ITEM_DEAD_BYTES.add(item_size);

self.check_magic();
self.decr_item(item_size as i32);
Expand Down Expand Up @@ -345,6 +345,9 @@ impl<'a> Segment<'a> {

let mut write_offset = read_offset;

let mut items_pruned = 0;
let mut bytes_pruned = 0;

while read_offset <= max_offset {
let item = self.get_item_at(read_offset).unwrap();
if item.klen() == 0 && self.live_items() == 0 {
Expand All @@ -357,9 +360,8 @@ impl<'a> Segment<'a> {

// don't copy deleted items
if item.deleted() {
// since the segment won't be cleared, we decrement dead items
ITEM_DEAD.decrement();
ITEM_DEAD_BYTES.sub(item.size() as _);
items_pruned += 1;
bytes_pruned += item.size();
// move read offset forward, leave write offset trailing
read_offset += item_size;
ITEM_COMPACTED.increment();
Expand Down Expand Up @@ -399,6 +401,11 @@ impl<'a> Segment<'a> {
continue;
}

// We have removed dead items, so we must subtract the pruned items from
// the dead item stats.
ITEM_DEAD.sub(items_pruned as _);
ITEM_DEAD_BYTES.sub(bytes_pruned as _);

// updates the write offset to the new position
self.set_write_offset(write_offset as i32);

Expand All @@ -424,6 +431,9 @@ impl<'a> Segment<'a> {
0
};

let mut items_copied = 0;
let mut bytes_copied = 0;

while read_offset <= max_offset {
let item = self.get_item_at(read_offset).unwrap();
if item.klen() == 0 && self.live_items() == 0 {
Expand Down Expand Up @@ -464,8 +474,8 @@ impl<'a> Segment<'a> {
target.header.incr_live_items();
target.header.incr_live_bytes(item_size as i32);
target.set_write_offset(write_offset as i32 + item_size as i32);
ITEM_CURRENT.increment();
ITEM_CURRENT_BYTES.add(item_size as _);
items_copied += 1;
bytes_copied += item_size;
} else {
// TODO(bmartin): figure out if this could happen and make the
// relink function infallible if it can't happen
Expand All @@ -475,6 +485,12 @@ impl<'a> Segment<'a> {
read_offset += item_size;
}

// We need to increment the current bytes, because removing items from
// this segment decrements these as it marks the item as removed. This
// should result in these stats remaining unchanged by this function.
ITEM_CURRENT.add(items_copied);
ITEM_CURRENT_BYTES.add(bytes_copied as _);

Ok(())
}

Expand Down Expand Up @@ -638,6 +654,11 @@ impl<'a> Segment<'a> {
offset += item.size();
}

// At the end of the clear phase above, we have only dead items that we
// are clearing from the segment. The functions that removed the live
// items from the hashtable have decremented the live items, and
// incremented the dead items. So we subtract all items that were in
// this segment from the dead item stats.
ITEM_DEAD.sub(items as _);
ITEM_DEAD_BYTES.sub(bytes as _);

Expand Down
5 changes: 0 additions & 5 deletions src/rust/storage/seg/src/segments/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ static_metrics! {
static SEGMENT_EVICT_EX: Counter;
static SEGMENT_RETURN: Counter;
static SEGMENT_FREE: Gauge;
static SEGMENT_REQUEST: Counter;
static SEGMENT_REQUEST_EX: Counter;
static SEGMENT_MERGE: Counter;
static SEGMENT_CURRENT: Gauge;
}
Expand Down Expand Up @@ -380,10 +378,7 @@ impl Segments {
pub(crate) fn pop_free(&mut self) -> Option<NonZeroU32> {
assert!(self.free <= self.cap);

SEGMENT_REQUEST.increment();

if self.free == 0 {
SEGMENT_REQUEST_EX.increment();
None
} else {
SEGMENT_FREE.decrement();
Expand Down

0 comments on commit 9f48b7c

Please sign in to comment.