From cb4526a4edfcb736a381f8f1193a6abc7eda6fff Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 30 Oct 2024 00:17:45 -0700 Subject: [PATCH] [core] Add metrics ray_io_context_event_loop_lag_ms. (#47989) Adds a metric to monitor event loop lag for `instrumented_io_context`. This is by default applied to IO Contexts in GCS only. By default every 250ms we post an async task and use std::chrono::steady_clock to record the lag. The metric is a GAUGE with the thread name recorded. I tested this on my laptop and it works - it's a ~0 on an idle cluster, and 1000ms if I use `RAY_testing_asio_delay_us=event_loop_lag_probe=1000000:1000000`. Signed-off-by: Ruiyang Wang --- BUILD.bazel | 8 ++- src/ray/common/asio/asio_util.h | 18 +++-- .../common/asio/instrumented_io_context.cc | 72 +++++++++++++++++-- src/ray/common/asio/instrumented_io_context.h | 10 ++- src/ray/common/ray_config_def.h | 10 +++ src/ray/common/test/ray_syncer_test.cc | 1 + .../gcs_server/gcs_server_io_context_policy.h | 2 + src/ray/gcs/gcs_server/gcs_server_main.cc | 3 +- src/ray/raylet/main.cc | 1 + src/ray/stats/metric_defs.cc | 6 ++ src/ray/stats/metric_defs.h | 3 + src/ray/util/util.h | 2 +- 12 files changed, 118 insertions(+), 18 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 7f497e3f6de1..4a3c625fa24c 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1259,9 +1259,11 @@ ray_cc_test( "no_windows", "team:core", ], - target_compatible_with = [ - "@platforms//os:linux", - ], + target_compatible_with = select({ + "@platforms//os:osx": [], + "@platforms//os:linux": [], + "//conditions:default": ["@platforms//:incompatible"], + }), deps = [ ":core_worker_lib", ":plasma_store_server_lib", diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index c7df71405758..ccd15edc5c65 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -54,9 +54,12 @@ class InstrumentedIOContextWithThread { /** * Constructor. * @param thread_name The name of the thread. + * @param enable_lag_probe If true, enables the lag probe. It posts tasks to the + * io_context so that a run() will never return. */ - explicit InstrumentedIOContextWithThread(const std::string &thread_name) - : io_service_(), work_(io_service_), thread_name_(thread_name) { + explicit InstrumentedIOContextWithThread(const std::string &thread_name, + bool enable_lag_probe = false) + : io_service_(enable_lag_probe), work_(io_service_), thread_name_(thread_name) { io_thread_ = std::thread([this] { SetThreadName(this->thread_name_); io_service_.run(); @@ -106,6 +109,8 @@ class InstrumentedIOContextWithThread { /// // instrumented_io_context for each. Must be unique and should not contain empty /// // names. /// constexpr static std::array kAllDedicatedIOContextNames; +/// // List of bools to enable lag probe for each dedicated io context. +/// constexpr static std::array kAllDedicatedIOContextEnableLagProbe; /// /// // For a given T, returns an index to kAllDedicatedIOContextNames, or -1 for the /// // default io context. @@ -128,8 +133,9 @@ class IOContextProvider { : default_io_context_(default_io_context) { for (size_t i = 0; i < Policy::kAllDedicatedIOContextNames.size(); i++) { const auto &name = Policy::kAllDedicatedIOContextNames[i]; - dedicated_io_contexts_[i] = - std::make_unique(std::string(name)); + bool enable_lag_probe = Policy::kAllDedicatedIOContextEnableLagProbe[i]; + dedicated_io_contexts_[i] = std::make_unique( + std::string(name), enable_lag_probe); } } @@ -174,6 +180,10 @@ class IOContextProvider { "kAllDedicatedIOContextNames must not contain empty strings."); static_assert(ray::ArrayIsUnique(Policy::kAllDedicatedIOContextNames), "kAllDedicatedIOContextNames must not contain duplicate elements."); + static_assert(Policy::kAllDedicatedIOContextNames.size() == + Policy::kAllDedicatedIOContextEnableLagProbe.size(), + "kAllDedicatedIOContextNames and kAllDedicatedIOContextEnableLagProbe " + "must have the same size."); // Using unique_ptr because the class has no default constructor, so it's not easy // to initialize objects directly in the array. diff --git a/src/ray/common/asio/instrumented_io_context.cc b/src/ray/common/asio/instrumented_io_context.cc index 30c3b288ef16..69776022c1bf 100644 --- a/src/ray/common/asio/instrumented_io_context.cc +++ b/src/ray/common/asio/instrumented_io_context.cc @@ -22,9 +22,69 @@ #include "ray/common/asio/asio_chaos.h" #include "ray/common/asio/asio_util.h" +#include "ray/stats/metric.h" +#include "ray/stats/metric_defs.h" + +namespace { + +// Post a probe. Records the lag and schedule another probe. +// Requires: `interval_ms` > 0. +void LagProbeLoop(instrumented_io_context &io_context, int64_t interval_ms) { + auto begin = std::chrono::steady_clock::now(); + io_context.post( + [&io_context, begin, interval_ms]() { + auto end = std::chrono::steady_clock::now(); + auto duration = + std::chrono::duration_cast(end - begin); + ray::stats::STATS_io_context_event_loop_lag_ms.Record( + duration.count(), + { + {"Name", GetThreadName()}, + }); + + // Schedule the next probe. If `duration` is larger than `interval_ms`, we + // should schedule the next probe immediately. Otherwise, we should wait + // for `interval_ms - duration`. + auto delay = interval_ms - duration.count(); + if (delay <= 0) { + LagProbeLoop(io_context, interval_ms); + } else { + execute_after( + io_context, + [&io_context, interval_ms]() { LagProbeLoop(io_context, interval_ms); }, + std::chrono::milliseconds(delay)); + } + }, + "event_loop_lag_probe"); +} + +void ScheduleLagProbe(instrumented_io_context &io_context) { + if (!RayConfig::instance().enable_metrics_collection()) { + return; + } + auto interval = + RayConfig::instance().io_context_event_loop_lag_collection_interval_ms(); + if (interval <= 0) { + return; + } + RAY_LOG(DEBUG) << "Scheduling lag probe for the io_context on thread " + << GetThreadName() << " every " << interval << "ms"; + // At this time, the `io_context` may not be running yet, so we need to post the + // first probe. + io_context.post([&io_context, interval]() { LagProbeLoop(io_context, interval); }, + "event_loop_lag_probe"); +} +} // namespace + +instrumented_io_context::instrumented_io_context(bool enable_lag_probe) + : event_stats_(std::make_shared()) { + if (enable_lag_probe) { + ScheduleLagProbe(*this); + } +} void instrumented_io_context::post(std::function handler, - const std::string name, + const std::string &name, int64_t delay_us) { if (RayConfig::instance().event_stats()) { // References are only invalidated upon deletion of the corresponding item from the @@ -32,9 +92,9 @@ void instrumented_io_context::post(std::function handler, // GuardedHandlerStats synchronizes internal access, we can concurrently write to the // handler stats it->second from multiple threads without acquiring a table-level // readers lock in the callback. - const auto stats_handle = event_stats_->RecordStart(name); + auto stats_handle = event_stats_->RecordStart(name); handler = [handler = std::move(handler), stats_handle = std::move(stats_handle)]() { - EventTracker::RecordExecution(handler, std::move(stats_handle)); + EventTracker::RecordExecution(handler, stats_handle); }; } delay_us += ray::asio::testing::get_delay_us(name); @@ -47,11 +107,11 @@ void instrumented_io_context::post(std::function handler, } void instrumented_io_context::dispatch(std::function handler, - const std::string name) { + const std::string &name) { if (!RayConfig::instance().event_stats()) { return boost::asio::io_context::post(std::move(handler)); } - const auto stats_handle = event_stats_->RecordStart(name); + auto stats_handle = event_stats_->RecordStart(name); // References are only invalidated upon deletion of the corresponding item from the // table, which we won't do until this io_context is deleted. Provided that // GuardedHandlerStats synchronizes internal access, we can concurrently write to the @@ -59,6 +119,6 @@ void instrumented_io_context::dispatch(std::function handler, // readers lock in the callback. boost::asio::io_context::dispatch( [handler = std::move(handler), stats_handle = std::move(stats_handle)]() { - EventTracker::RecordExecution(handler, std::move(stats_handle)); + EventTracker::RecordExecution(handler, stats_handle); }); } diff --git a/src/ray/common/asio/instrumented_io_context.h b/src/ray/common/asio/instrumented_io_context.h index 088fa557d3ce..b4cb31099acb 100644 --- a/src/ray/common/asio/instrumented_io_context.h +++ b/src/ray/common/asio/instrumented_io_context.h @@ -28,7 +28,11 @@ class instrumented_io_context : public boost::asio::io_context { public: /// Initializes the global stats struct after calling the base contructor. /// TODO(ekl) allow taking an externally defined event tracker. - instrumented_io_context() : event_stats_(std::make_shared()) {} + /// + /// \param enable_lag_probe If true, and if related Ray configs are set, schedule a + /// probe to measure the event loop lag. After a probe is done, it schedules another one + /// so a io_context.run() call will never return. + explicit instrumented_io_context(bool enable_lag_probe = false); /// A proxy post function that collects count, queueing, and execution statistics for /// the given handler. @@ -37,7 +41,7 @@ class instrumented_io_context : public boost::asio::io_context { /// \param name A human-readable name for the handler, to be used for viewing stats /// for the provided handler. /// \param delay_us Delay time before the handler will be executed. - void post(std::function handler, const std::string name, int64_t delay_us = 0); + void post(std::function handler, const std::string &name, int64_t delay_us = 0); /// A proxy post function that collects count, queueing, and execution statistics for /// the given handler. @@ -45,7 +49,7 @@ class instrumented_io_context : public boost::asio::io_context { /// \param handler The handler to be posted to the event loop. /// \param name A human-readable name for the handler, to be used for viewing stats /// for the provided handler. - void dispatch(std::function handler, const std::string name); + void dispatch(std::function handler, const std::string &name); EventTracker &stats() const { return *event_stats_; }; diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 674895d4b389..b1a73f79cb1a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -549,6 +549,16 @@ RAY_CONFIG(bool, enable_metrics_collection, true) /// available *after* a grpc call. RAY_CONFIG(std::string, enable_grpc_metrics_collection_for, "") +/// Only effective if `enable_metrics_collection` is also true. +/// +/// If > 0, we monitor each instrumented_io_context every +/// `io_context_event_loop_lag_collection_interval_ms` milliseconds, by posting a task to +/// the io_context to measure the duration from post to run. The metric is +/// `ray_io_context_event_loop_lag_ms`. +/// +/// A probe task is only posted after a previous probe task has completed. +RAY_CONFIG(int64_t, io_context_event_loop_lag_collection_interval_ms, 250) + // Max number bytes of inlined objects in a task rpc request/response. RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024) diff --git a/src/ray/common/test/ray_syncer_test.cc b/src/ray/common/test/ray_syncer_test.cc index 0f21e65ee731..cc80af47018d 100644 --- a/src/ray/common/test/ray_syncer_test.cc +++ b/src/ray/common/test/ray_syncer_test.cc @@ -98,6 +98,7 @@ class RaySyncerTest : public ::testing::Test { void TearDown() override { work_guard_->reset(); + io_context_.stop(); thread_->join(); } diff --git a/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h b/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h index 9c146d811353..2461f1ee9dc5 100644 --- a/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h +++ b/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h @@ -52,6 +52,8 @@ struct GcsServerIOContextPolicy { // name, or get leaks by creating unused threads. constexpr static std::array kAllDedicatedIOContextNames{ "task_io_context", "pubsub_io_context", "ray_syncer_io_context"}; + constexpr static std::array kAllDedicatedIOContextEnableLagProbe{ + true, true, true}; constexpr static size_t IndexOf(std::string_view name) { return ray::IndexOf(kAllDedicatedIOContextNames, name); diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 18d7b83d896e..f6b7c4bc4197 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -73,7 +73,8 @@ int main(int argc, char *argv[]) { ray::rpc::testing::init(); // IO Service for main loop. - instrumented_io_context main_service; + SetThreadName("gcs_server"); + instrumented_io_context main_service(/*enable_lag_probe=*/true); // Ensure that the IO service keeps running. Without this, the main_service will exit // as soon as there is no more work to be processed. boost::asio::io_service::work work(main_service); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 2b30f9068b6b..f72e8d2661b2 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -184,6 +184,7 @@ int main(int argc, char *argv[]) { ray::raylet::NodeManagerConfig node_manager_config; absl::flat_hash_map static_resource_conf; + SetThreadName("raylet"); // IO Service for node manager. instrumented_io_context main_service; diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 5d393acdce8d..18916a19ab89 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -145,6 +145,12 @@ DEFINE_stats(placement_groups, /// ===================== INTERNAL SYSTEM METRICS ================================= /// =============================================================================== +DEFINE_stats(io_context_event_loop_lag_ms, + "Latency of a task from post to execution", + ("Name"), // Name of the instrumented_io_context. + (), + ray::stats::GAUGE); + /// Event stats DEFINE_stats(operation_count, "operation count", ("Method"), (), ray::stats::GAUGE); DEFINE_stats( diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index d76c64e7f42f..ae74cd539392 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -55,6 +55,9 @@ DECLARE_stats(finished_jobs); /// Placement group stats, broken down by state. DECLARE_stats(placement_groups); +/// ASIO stats +DECLARE_stats(io_context_event_loop_lag_ms); + /// Event stats DECLARE_stats(operation_count); DECLARE_stats(operation_run_time_ms); diff --git a/src/ray/util/util.h b/src/ray/util/util.h index e34a3d7da983..3e7ff70d8588 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -310,7 +310,7 @@ inline void SetThreadName(const std::string &thread_name) { } inline std::string GetThreadName() { -#if defined(__linux__) +#if defined(__linux__) || defined(__APPLE__) char name[128]; auto rc = pthread_getname_np(pthread_self(), name, sizeof(name)); if (rc != 0) {