Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:18138] [eventd] Add incremental polling when waiting for capture service to start (#18138) #18552

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <thread>
#include <memory>
#include "eventd.h"
#include "dbconnector.h"
#include "zmq.h"
Expand Down Expand Up @@ -539,6 +540,7 @@ int
capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst)
{
int ret = -1;
int duration = CAPTURE_SERVICE_POLLING_DURATION;

/* Can go in single step only. */
RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl);
Expand All @@ -547,8 +549,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
/* Poll to see if thread has been init, if so exit early. Add delay on every attempt */
this_thread::sleep_for(chrono::milliseconds(duration));
duration = min(duration + CAPTURE_SERVICE_POLLING_INCREMENT, CAPTURE_SERVICE_POLLING_MAX_DURATION);
}
RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl;
Expand Down Expand Up @@ -646,7 +649,8 @@ run_eventd_service()
event_service service;
stats_collector stats_instance;
eventd_proxy *proxy = NULL;
capture_service *capture = NULL;
unique_ptr<capture_service> capture;
bool skip_caching = false;

event_serialized_lst_t capture_fifo_events;
last_events_t capture_last_events;
Expand Down Expand Up @@ -676,9 +680,14 @@ run_eventd_service()
* events until telemetry starts.
* Telemetry will send a stop & collect cache upon startup
*/
capture = new capture_service(zctx, cache_max, &stats_instance);
RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture");
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture->set_control(INIT_CAPTURE) != 0) {
SWSS_LOG_WARN("Failed to initialize capture service, so we skip caching");
skip_caching = true;
capture.reset(); // Capture service will not be available
} else {
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");
}

this_thread::sleep_for(chrono::milliseconds(200));
RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance");
Expand All @@ -694,12 +703,12 @@ run_eventd_service()
case EVENT_CACHE_INIT:
/* connect only*/
if (capture != NULL) {
delete capture;
capture.reset();
}
event_serialized_lst_t().swap(capture_fifo_events);
last_events_t().swap(capture_last_events);

capture = new capture_service(zctx, cache_max, &stats_instance);
capture = make_unique<capture_service>(zctx, cache_max, &stats_instance);
if (capture != NULL) {
resp = capture->set_control(INIT_CAPTURE);
}
Expand All @@ -708,7 +717,7 @@ run_eventd_service()

case EVENT_CACHE_START:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to start");
SWSS_LOG_WARN("Cache is not initialized to start");
resp = -1;
break;
}
Expand All @@ -721,7 +730,7 @@ run_eventd_service()

case EVENT_CACHE_STOP:
if (capture == NULL) {
SWSS_LOG_ERROR("Cache is not initialized to stop");
SWSS_LOG_WARN("Cache is not initialized to stop");
resp = -1;
break;
}
Expand All @@ -731,15 +740,19 @@ run_eventd_service()
resp = capture->read_cache(capture_fifo_events, capture_last_events,
overflow);
}
delete capture;
capture = NULL;
capture.reset();

/* Unpause heartbeat upon stop caching */
stats_instance.heartbeat_ctrl();
break;


case EVENT_CACHE_READ:
if (skip_caching) {
SWSS_LOG_WARN("Capture service is unavailable, skipping cache read");
resp = -1;
break;
}
if (capture != NULL) {
SWSS_LOG_ERROR("Cache is not stopped yet.");
resp = -1;
Expand Down Expand Up @@ -802,13 +815,10 @@ run_eventd_service()
if (proxy != NULL) {
delete proxy;
}
if (capture != NULL) {
delete capture;
}
if (zctx != NULL) {
zmq_ctx_term(zctx);
}
SWSS_LOG_ERROR("Eventd service exiting\n");
SWSS_LOG_INFO("Eventd service exiting\n");
}

void set_unit_testing(bool b)
Expand Down
2 changes: 2 additions & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ typedef enum {
#define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_INCREMENT 10
#define CAPTURE_SERVICE_POLLING_MAX_DURATION 100
#define CAPTURE_SERVICE_POLLING_RETRIES 100

/*
Expand Down
Loading