From 35f1f223a5a8eb1b8c61313169c6dde6017b0adf Mon Sep 17 00:00:00 2001 From: Zain Budhwani <99770260+zbud-msft@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:00:11 -0700 Subject: [PATCH] [eventd] Add incremental polling when waiting for capture service to start (#18138) ### Why I did it Addresses https://github.com/sonic-net/sonic-buildimage/issues/17350 ### How I did it Instead of a 1 second delay, we poll to check that the thread is available and after each poll increment the delay. There were situations where if there was less memory available, fixed polling would not be effective for starting zmq capture service. Add an incremental delay such that eventd can wait longer to start up capture service if system is too busy or overloaded, but still keep a max duration/retry limit so that we do not wait forever. #### How to verify it UT --- src/sonic-eventd/src/eventd.cpp | 42 ++++++++++++++++++++------------- src/sonic-eventd/src/eventd.h | 2 ++ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index 36c162453427..eb692d5b3a9b 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -1,4 +1,5 @@ #include +#include #include "eventd.h" #include "dbconnector.h" #include "zmq.h" @@ -539,6 +540,7 @@ int capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst) { int ret = -1; + int duration = CAPTURE_SERVICE_POLLING_DURATION; /* Can go in single step only. */ RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl); @@ -547,8 +549,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst case INIT_CAPTURE: m_thr = thread(&capture_service::do_capture, this); for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) { - /* Wait max a second for thread to init */ - this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION)); + /* Poll to see if thread has been init, if so exit early. Add delay on every attempt */ + this_thread::sleep_for(chrono::milliseconds(duration)); + duration = min(duration + CAPTURE_SERVICE_POLLING_INCREMENT, CAPTURE_SERVICE_POLLING_MAX_DURATION); } RET_ON_ERR(m_cap_run, "Failed to init capture"); m_ctrl = ctrl; @@ -646,7 +649,8 @@ run_eventd_service() event_service service; stats_collector stats_instance; eventd_proxy *proxy = NULL; - capture_service *capture = NULL; + unique_ptr capture; + bool skip_caching = false; event_serialized_lst_t capture_fifo_events; last_events_t capture_last_events; @@ -676,9 +680,14 @@ run_eventd_service() * events until telemetry starts. * Telemetry will send a stop & collect cache upon startup */ - capture = new capture_service(zctx, cache_max, &stats_instance); - RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture"); - RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture"); + capture = make_unique(zctx, cache_max, &stats_instance); + if (capture->set_control(INIT_CAPTURE) != 0) { + SWSS_LOG_WARN("Failed to initialize capture service, so we skip caching"); + skip_caching = true; + capture.reset(); // Capture service will not be available + } else { + RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture"); + } this_thread::sleep_for(chrono::milliseconds(200)); RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance"); @@ -694,12 +703,12 @@ run_eventd_service() case EVENT_CACHE_INIT: /* connect only*/ if (capture != NULL) { - delete capture; + capture.reset(); } event_serialized_lst_t().swap(capture_fifo_events); last_events_t().swap(capture_last_events); - capture = new capture_service(zctx, cache_max, &stats_instance); + capture = make_unique(zctx, cache_max, &stats_instance); if (capture != NULL) { resp = capture->set_control(INIT_CAPTURE); } @@ -708,7 +717,7 @@ run_eventd_service() case EVENT_CACHE_START: if (capture == NULL) { - SWSS_LOG_ERROR("Cache is not initialized to start"); + SWSS_LOG_WARN("Cache is not initialized to start"); resp = -1; break; } @@ -721,7 +730,7 @@ run_eventd_service() case EVENT_CACHE_STOP: if (capture == NULL) { - SWSS_LOG_ERROR("Cache is not initialized to stop"); + SWSS_LOG_WARN("Cache is not initialized to stop"); resp = -1; break; } @@ -731,8 +740,7 @@ run_eventd_service() resp = capture->read_cache(capture_fifo_events, capture_last_events, overflow); } - delete capture; - capture = NULL; + capture.reset(); /* Unpause heartbeat upon stop caching */ stats_instance.heartbeat_ctrl(); @@ -740,6 +748,11 @@ run_eventd_service() case EVENT_CACHE_READ: + if (skip_caching) { + SWSS_LOG_WARN("Capture service is unavailable, skipping cache read"); + resp = -1; + break; + } if (capture != NULL) { SWSS_LOG_ERROR("Cache is not stopped yet."); resp = -1; @@ -802,13 +815,10 @@ run_eventd_service() if (proxy != NULL) { delete proxy; } - if (capture != NULL) { - delete capture; - } if (zctx != NULL) { zmq_ctx_term(zctx); } - SWSS_LOG_ERROR("Eventd service exiting\n"); + SWSS_LOG_INFO("Eventd service exiting\n"); } void set_unit_testing(bool b) diff --git a/src/sonic-eventd/src/eventd.h b/src/sonic-eventd/src/eventd.h index a7a87f9436a0..960dfb8b8dc1 100644 --- a/src/sonic-eventd/src/eventd.h +++ b/src/sonic-eventd/src/eventd.h @@ -22,6 +22,8 @@ typedef enum { #define EVENTS_STATS_FIELD_NAME "value" #define STATS_HEARTBEAT_MIN 300 #define CAPTURE_SERVICE_POLLING_DURATION 10 +#define CAPTURE_SERVICE_POLLING_INCREMENT 10 +#define CAPTURE_SERVICE_POLLING_MAX_DURATION 100 #define CAPTURE_SERVICE_POLLING_RETRIES 100 /*