Skip to content

Commit

Permalink
Periodically report memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
andyleiserson committed Dec 19, 2024
1 parent 1cdc2f7 commit f27beaa
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
9 changes: 8 additions & 1 deletion ipa-core/src/seq_join/multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tracing::{Instrument, Span};

use crate::telemetry::memory::periodic_memory_report;

#[cfg(feature = "shuttle")]
mod shuttle_spawner {
use std::future::Future;
Expand Down Expand Up @@ -62,6 +64,7 @@ where
#[pin]
source: Fuse<S>,
capacity: usize,
spawned: usize,
}

impl<S, F> SequentialFutures<'_, S, F>
Expand All @@ -75,6 +78,7 @@ where
spawner: unsafe { create_spawner() },
source: source.fuse(),
capacity: active.get(),
spawned: 0,
}
}
}
Expand Down Expand Up @@ -103,11 +107,14 @@ where
// a dependency between futures, pending one will never complete.
// Cancellable futures will be cancelled when spawner is dropped which is
// the behavior we want.
let task_index = this.spawner.len();
let task_index = *this.spawned;
this.spawner
.spawn_cancellable(f.into_future().instrument(Span::current()), move || {
panic!("SequentialFutures: spawned task {task_index} cancelled")
});

periodic_memory_report(*this.spawned);
*this.spawned += 1;
} else {
break;
}
Expand Down
56 changes: 56 additions & 0 deletions ipa-core/src/telemetry/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#[cfg(not(jemalloc))]
pub fn periodic_memory_report() { }

#[cfg(jemalloc)]
pub use jemalloc::periodic_memory_report;

#[cfg(jemalloc)]
mod jemalloc {
use std::sync::LazyLock;

use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib};

const MB: usize = 2 << 20;

// In an unfortunate acronym collision, `mib` in the names of the jemalloc
// statistics stands for "Management Information Base", not "mebibytes".
// The reporting unit is bytes.

static EPOCH: LazyLock<epoch_mib> = LazyLock::new(|| {
tikv_jemalloc_ctl::epoch::mib().unwrap()
});

static ALLOCATED: LazyLock<allocated_mib> = LazyLock::new(|| {
tikv_jemalloc_ctl::stats::allocated::mib().unwrap()
});

fn report_memory_usage(count: usize) {
// Some of the information jemalloc uses when reporting statistics is cached, and
// refreshed only upon advancing the epoch.
EPOCH.advance().unwrap();
let allocated = ALLOCATED.read().unwrap() / MB;
tracing::debug!("i={count}: {allocated} MiB allocated");
}

fn should_print_report(count: usize) -> bool {
if count == 0 {
return true;
}

let bits = count.ilog2();
let report_interval_log2 = std::cmp::max(bits.saturating_sub(2), 8);
let report_interval_mask = (1 << report_interval_log2) - 1;
(count & report_interval_mask) == 0
}

/// Print a memory report periodically, based on the value of `count`.
///
/// As `count` increases, so does the report interval. This results in
/// a tolerable amount of log messages for loops with many iterations,
/// while still providing some reporting for shorter loops.
pub fn periodic_memory_report(count: usize) {
if should_print_report(count) {
report_memory_usage(count);
}
}
}
1 change: 1 addition & 0 deletions ipa-core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod memory;
pub mod stats;
mod step_stats;

Expand Down

0 comments on commit f27beaa

Please sign in to comment.