Skip to content

Commit

Permalink
[core] Add metrics ray_io_context_event_loop_lag_ms. (ray-project#47989)
Browse files Browse the repository at this point in the history
Adds a metric to monitor event loop lag for `instrumented_io_context`.
This is by default applied to IO Contexts in GCS only.

By default every 250ms we post an async task and use
std::chrono::steady_clock to record the lag. The metric is a GAUGE with
the thread name recorded.

I tested this on my laptop and it works - it's a ~0 on an idle cluster,
and 1000ms if I use
`RAY_testing_asio_delay_us=event_loop_lag_probe=1000000:1000000`.

Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang authored Oct 30, 2024
1 parent 956310d commit cb4526a
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 18 deletions.
8 changes: 5 additions & 3 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1259,9 +1259,11 @@ ray_cc_test(
"no_windows",
"team:core",
],
target_compatible_with = [
"@platforms//os:linux",
],
target_compatible_with = select({
"@platforms//os:osx": [],
"@platforms//os:linux": [],
"//conditions:default": ["@platforms//:incompatible"],
}),
deps = [
":core_worker_lib",
":plasma_store_server_lib",
Expand Down
18 changes: 14 additions & 4 deletions src/ray/common/asio/asio_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ class InstrumentedIOContextWithThread {
/**
* Constructor.
* @param thread_name The name of the thread.
* @param enable_lag_probe If true, enables the lag probe. It posts tasks to the
* io_context so that a run() will never return.
*/
explicit InstrumentedIOContextWithThread(const std::string &thread_name)
: io_service_(), work_(io_service_), thread_name_(thread_name) {
explicit InstrumentedIOContextWithThread(const std::string &thread_name,
bool enable_lag_probe = false)
: io_service_(enable_lag_probe), work_(io_service_), thread_name_(thread_name) {
io_thread_ = std::thread([this] {
SetThreadName(this->thread_name_);
io_service_.run();
Expand Down Expand Up @@ -106,6 +109,8 @@ class InstrumentedIOContextWithThread {
/// // instrumented_io_context for each. Must be unique and should not contain empty
/// // names.
/// constexpr static std::array<std::string_view, N> kAllDedicatedIOContextNames;
/// // List of bools to enable lag probe for each dedicated io context.
/// constexpr static std::array<bool, N> kAllDedicatedIOContextEnableLagProbe;
///
/// // For a given T, returns an index to kAllDedicatedIOContextNames, or -1 for the
/// // default io context.
Expand All @@ -128,8 +133,9 @@ class IOContextProvider {
: default_io_context_(default_io_context) {
for (size_t i = 0; i < Policy::kAllDedicatedIOContextNames.size(); i++) {
const auto &name = Policy::kAllDedicatedIOContextNames[i];
dedicated_io_contexts_[i] =
std::make_unique<InstrumentedIOContextWithThread>(std::string(name));
bool enable_lag_probe = Policy::kAllDedicatedIOContextEnableLagProbe[i];
dedicated_io_contexts_[i] = std::make_unique<InstrumentedIOContextWithThread>(
std::string(name), enable_lag_probe);
}
}

Expand Down Expand Up @@ -174,6 +180,10 @@ class IOContextProvider {
"kAllDedicatedIOContextNames must not contain empty strings.");
static_assert(ray::ArrayIsUnique(Policy::kAllDedicatedIOContextNames),
"kAllDedicatedIOContextNames must not contain duplicate elements.");
static_assert(Policy::kAllDedicatedIOContextNames.size() ==
Policy::kAllDedicatedIOContextEnableLagProbe.size(),
"kAllDedicatedIOContextNames and kAllDedicatedIOContextEnableLagProbe "
"must have the same size.");

// Using unique_ptr because the class has no default constructor, so it's not easy
// to initialize objects directly in the array.
Expand Down
72 changes: 66 additions & 6 deletions src/ray/common/asio/instrumented_io_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,79 @@

#include "ray/common/asio/asio_chaos.h"
#include "ray/common/asio/asio_util.h"
#include "ray/stats/metric.h"
#include "ray/stats/metric_defs.h"

namespace {

// Post a probe. Records the lag and schedule another probe.
// Requires: `interval_ms` > 0.
void LagProbeLoop(instrumented_io_context &io_context, int64_t interval_ms) {
auto begin = std::chrono::steady_clock::now();
io_context.post(
[&io_context, begin, interval_ms]() {
auto end = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - begin);
ray::stats::STATS_io_context_event_loop_lag_ms.Record(
duration.count(),
{
{"Name", GetThreadName()},
});

// Schedule the next probe. If `duration` is larger than `interval_ms`, we
// should schedule the next probe immediately. Otherwise, we should wait
// for `interval_ms - duration`.
auto delay = interval_ms - duration.count();
if (delay <= 0) {
LagProbeLoop(io_context, interval_ms);
} else {
execute_after(
io_context,
[&io_context, interval_ms]() { LagProbeLoop(io_context, interval_ms); },
std::chrono::milliseconds(delay));
}
},
"event_loop_lag_probe");
}

void ScheduleLagProbe(instrumented_io_context &io_context) {
if (!RayConfig::instance().enable_metrics_collection()) {
return;
}
auto interval =
RayConfig::instance().io_context_event_loop_lag_collection_interval_ms();
if (interval <= 0) {
return;
}
RAY_LOG(DEBUG) << "Scheduling lag probe for the io_context on thread "
<< GetThreadName() << " every " << interval << "ms";
// At this time, the `io_context` may not be running yet, so we need to post the
// first probe.
io_context.post([&io_context, interval]() { LagProbeLoop(io_context, interval); },
"event_loop_lag_probe");
}
} // namespace

instrumented_io_context::instrumented_io_context(bool enable_lag_probe)
: event_stats_(std::make_shared<EventTracker>()) {
if (enable_lag_probe) {
ScheduleLagProbe(*this);
}
}

void instrumented_io_context::post(std::function<void()> handler,
const std::string name,
const std::string &name,
int64_t delay_us) {
if (RayConfig::instance().event_stats()) {
// References are only invalidated upon deletion of the corresponding item from the
// table, which we won't do until this io_context is deleted. Provided that
// GuardedHandlerStats synchronizes internal access, we can concurrently write to the
// handler stats it->second from multiple threads without acquiring a table-level
// readers lock in the callback.
const auto stats_handle = event_stats_->RecordStart(name);
auto stats_handle = event_stats_->RecordStart(name);
handler = [handler = std::move(handler), stats_handle = std::move(stats_handle)]() {
EventTracker::RecordExecution(handler, std::move(stats_handle));
EventTracker::RecordExecution(handler, stats_handle);
};
}
delay_us += ray::asio::testing::get_delay_us(name);
Expand All @@ -47,18 +107,18 @@ void instrumented_io_context::post(std::function<void()> handler,
}

void instrumented_io_context::dispatch(std::function<void()> handler,
const std::string name) {
const std::string &name) {
if (!RayConfig::instance().event_stats()) {
return boost::asio::io_context::post(std::move(handler));
}
const auto stats_handle = event_stats_->RecordStart(name);
auto stats_handle = event_stats_->RecordStart(name);
// References are only invalidated upon deletion of the corresponding item from the
// table, which we won't do until this io_context is deleted. Provided that
// GuardedHandlerStats synchronizes internal access, we can concurrently write to the
// handler stats it->second from multiple threads without acquiring a table-level
// readers lock in the callback.
boost::asio::io_context::dispatch(
[handler = std::move(handler), stats_handle = std::move(stats_handle)]() {
EventTracker::RecordExecution(handler, std::move(stats_handle));
EventTracker::RecordExecution(handler, stats_handle);
});
}
10 changes: 7 additions & 3 deletions src/ray/common/asio/instrumented_io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ class instrumented_io_context : public boost::asio::io_context {
public:
/// Initializes the global stats struct after calling the base contructor.
/// TODO(ekl) allow taking an externally defined event tracker.
instrumented_io_context() : event_stats_(std::make_shared<EventTracker>()) {}
///
/// \param enable_lag_probe If true, and if related Ray configs are set, schedule a
/// probe to measure the event loop lag. After a probe is done, it schedules another one
/// so a io_context.run() call will never return.
explicit instrumented_io_context(bool enable_lag_probe = false);

/// A proxy post function that collects count, queueing, and execution statistics for
/// the given handler.
Expand All @@ -37,15 +41,15 @@ class instrumented_io_context : public boost::asio::io_context {
/// \param name A human-readable name for the handler, to be used for viewing stats
/// for the provided handler.
/// \param delay_us Delay time before the handler will be executed.
void post(std::function<void()> handler, const std::string name, int64_t delay_us = 0);
void post(std::function<void()> handler, const std::string &name, int64_t delay_us = 0);

/// A proxy post function that collects count, queueing, and execution statistics for
/// the given handler.
///
/// \param handler The handler to be posted to the event loop.
/// \param name A human-readable name for the handler, to be used for viewing stats
/// for the provided handler.
void dispatch(std::function<void()> handler, const std::string name);
void dispatch(std::function<void()> handler, const std::string &name);

EventTracker &stats() const { return *event_stats_; };

Expand Down
10 changes: 10 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,16 @@ RAY_CONFIG(bool, enable_metrics_collection, true)
/// available *after* a grpc call.
RAY_CONFIG(std::string, enable_grpc_metrics_collection_for, "")

/// Only effective if `enable_metrics_collection` is also true.
///
/// If > 0, we monitor each instrumented_io_context every
/// `io_context_event_loop_lag_collection_interval_ms` milliseconds, by posting a task to
/// the io_context to measure the duration from post to run. The metric is
/// `ray_io_context_event_loop_lag_ms`.
///
/// A probe task is only posted after a previous probe task has completed.
RAY_CONFIG(int64_t, io_context_event_loop_lag_collection_interval_ms, 250)

// Max number bytes of inlined objects in a task rpc request/response.
RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024)

Expand Down
1 change: 1 addition & 0 deletions src/ray/common/test/ray_syncer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class RaySyncerTest : public ::testing::Test {

void TearDown() override {
work_guard_->reset();
io_context_.stop();
thread_->join();
}

Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server_io_context_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct GcsServerIOContextPolicy {
// name, or get leaks by creating unused threads.
constexpr static std::array<std::string_view, 3> kAllDedicatedIOContextNames{
"task_io_context", "pubsub_io_context", "ray_syncer_io_context"};
constexpr static std::array<bool, 3> kAllDedicatedIOContextEnableLagProbe{
true, true, true};

constexpr static size_t IndexOf(std::string_view name) {
return ray::IndexOf(kAllDedicatedIOContextNames, name);
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ int main(int argc, char *argv[]) {
ray::rpc::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
SetThreadName("gcs_server");
instrumented_io_context main_service(/*enable_lag_probe=*/true);
// Ensure that the IO service keeps running. Without this, the main_service will exit
// as soon as there is no more work to be processed.
boost::asio::io_service::work work(main_service);
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ int main(int argc, char *argv[]) {
ray::raylet::NodeManagerConfig node_manager_config;
absl::flat_hash_map<std::string, double> static_resource_conf;

SetThreadName("raylet");
// IO Service for node manager.
instrumented_io_context main_service;

Expand Down
6 changes: 6 additions & 0 deletions src/ray/stats/metric_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ DEFINE_stats(placement_groups,
/// ===================== INTERNAL SYSTEM METRICS =================================
/// ===============================================================================

DEFINE_stats(io_context_event_loop_lag_ms,
"Latency of a task from post to execution",
("Name"), // Name of the instrumented_io_context.
(),
ray::stats::GAUGE);

/// Event stats
DEFINE_stats(operation_count, "operation count", ("Method"), (), ray::stats::GAUGE);
DEFINE_stats(
Expand Down
3 changes: 3 additions & 0 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ DECLARE_stats(finished_jobs);
/// Placement group stats, broken down by state.
DECLARE_stats(placement_groups);

/// ASIO stats
DECLARE_stats(io_context_event_loop_lag_ms);

/// Event stats
DECLARE_stats(operation_count);
DECLARE_stats(operation_run_time_ms);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ inline void SetThreadName(const std::string &thread_name) {
}

inline std::string GetThreadName() {
#if defined(__linux__)
#if defined(__linux__) || defined(__APPLE__)
char name[128];
auto rc = pthread_getname_np(pthread_self(), name, sizeof(name));
if (rc != 0) {
Expand Down

0 comments on commit cb4526a

Please sign in to comment.