Skip to content

Commit

Permalink
[eventd] Add incremental polling when waiting for capture service to …
Browse files Browse the repository at this point in the history
…start (#18138)

### Why I did it

Addresses #17350

### How I did it

Instead of a 1 second delay, we poll to check that the thread is available and after each poll increment the delay. There were situations where if there was less memory available, fixed polling would not be effective for starting zmq capture service. Add an incremental delay such that eventd can wait longer to start up capture service if system is too busy or overloaded, but still keep a max duration/retry limit so that we do not wait forever.

#### How to verify it
UT
  • Loading branch information
zbud-msft authored and mssonicbld committed Apr 4, 2024
1 parent 107019c commit 35f1f22
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
42 changes: 26 additions & 16 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <thread>
#include <memory>
#include "eventd.h"
#include "dbconnector.h"
#include "zmq.h"
Expand Down Expand Up @@ -539,6 +540,7 @@ int
capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst)
{
int ret = -1;
int duration = CAPTURE_SERVICE_POLLING_DURATION;

/* Can go in single step only. */
RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl);
Expand All @@ -547,8 +549,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
/* Poll to see if thread has been init, if so exit early. Add delay on every attempt */
this_thread::sleep_for(chrono::milliseconds(duration));
duration = min(duration + CAPTURE_SERVICE_POLLING_INCREMENT, CAPTURE_SERVICE_POLLING_MAX_DURATION);
}
RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl;
Expand Down Expand Up @@ -646,7 +649,8 @@ run_eventd_service()
event_service service;
stats_collector stats_instance;
eventd_proxy *proxy = NULL;
capture_service *capture = NULL;
unique_ptr<capture_service> capture;
bool skip_caching = false;

event_serialized_lst_t capture_fifo_events;
last_events_t capture_last_events;
Expand Down Expand Up @@ -676,9 +680,14 @@ run_eventd_service()
* events until telemetry starts.
* Telemetry will send a stop & collect cache upon startup
*/
capture = new capture_service(zctx, cache_max, &stats_instance);
RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture");
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture->set_control(INIT_CAPTURE) != 0) {
SWSS_LOG_WARN("Failed to initialize capture service, so we skip caching");
skip_caching = true;
capture.reset(); // Capture service will not be available
} else {
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
}

this_thread::sleep_for(chrono::milliseconds(200));
RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance");
Expand All @@ -694,12 +703,12 @@ run_eventd_service()
case EVENT_CACHE_INIT:
/* connect only*/
if (capture != NULL) {
delete capture;
capture.reset();
}
event_serialized_lst_t().swap(capture_fifo_events);
last_events_t().swap(capture_last_events);

capture = new capture_service(zctx, cache_max, &stats_instance);
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture != NULL) {
resp = capture->set_control(INIT_CAPTURE);
}
Expand All @@ -708,7 +717,7 @@ run_eventd_service()

case EVENT_CACHE_START:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to start");
SWSS_LOG_WARN("Cache is not initialized to start");
resp = -1;
break;
}
Expand All @@ -721,7 +730,7 @@ run_eventd_service()

case EVENT_CACHE_STOP:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to stop");
SWSS_LOG_WARN("Cache is not initialized to stop");
resp = -1;
break;
}
Expand All @@ -731,15 +740,19 @@ run_eventd_service()
resp = capture->read_cache(capture_fifo_events, capture_last_events,
overflow);
}
delete capture;
capture = NULL;
capture.reset();

/* Unpause heartbeat upon stop caching */
stats_instance.heartbeat_ctrl();
break;


case EVENT_CACHE_READ:
if (skip_caching) {
SWSS_LOG_WARN("Capture service is unavailable, skipping cache read");
resp = -1;
break;
}
if (capture != NULL) {
SWSS_LOG_ERROR("Cache is not stopped yet.");
resp = -1;
Expand Down Expand Up @@ -802,13 +815,10 @@ run_eventd_service()
if (proxy != NULL) {
delete proxy;
}
if (capture != NULL) {
delete capture;
}
if (zctx != NULL) {
zmq_ctx_term(zctx);
}
SWSS_LOG_ERROR("Eventd service exiting\n");
SWSS_LOG_INFO("Eventd service exiting\n");
}

void set_unit_testing(bool b)
Expand Down
2 changes: 2 additions & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ typedef enum {
#define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_INCREMENT 10
#define CAPTURE_SERVICE_POLLING_MAX_DURATION 100
#define CAPTURE_SERVICE_POLLING_RETRIES 100

/*
Expand Down

0 comments on commit 35f1f22

Please sign in to comment.