Skip to content

Commit

Permalink
[eventd] Fix eventd UT flakiness (#17055)
Browse files Browse the repository at this point in the history
### Why I did it

Fix flakiness of eventd UT - run sub after capture service starts

##### Work item tracking
- Microsoft ADO **(number only)**:25650744

#### How I did it

Run sub socket after capture socket is initialized

#### How to verify it

Pipeline
  • Loading branch information
zbud-msft authored and mssonicbld committed Mar 4, 2024
1 parent 23d5c5a commit 2718ecf
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/sonic-eventd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ endif
-include rsyslog_plugin/subdir.mk
-include rsyslog_plugin_tests/subdir.mk

all: sonic-eventd eventd-tool rsyslog-plugin
all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests

sonic-eventd: $(OBJS)
@echo 'Building target: $@'
Expand Down
4 changes: 2 additions & 2 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
switch(ctrl) {
case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < 100); ++i) {
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(10));
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
}
RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl;
Expand Down
2 changes: 2 additions & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ typedef enum {

#define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_RETRIES 100

/*
* Started by eventd_service.
Expand Down
72 changes: 48 additions & 24 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,23 @@ static const test_data_t ldata[] = {


void run_cap(void *zctx, bool &term, string &read_source,
int &cnt)
int &cnt, bool &should_read_control)
{
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
string source;
internal_event_t ev_int;
int block_ms = 200;
int i=0;
static int proxy_finished_init = false;

EXPECT_TRUE(NULL != mock_cap);
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));

if(!proxy_finished_init) {
if(should_read_control) {
zmq_msg_t msg;
zmq_msg_init(&msg);
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message
proxy_finished_init = true;
EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
}

while(!term) {
Expand Down Expand Up @@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
}
}


TEST(eventd, proxy)
{
printf("Proxy TEST started\n");
bool should_read_control = false;
bool term_sub = false;
bool term_cap = false;
string rd_csource, rd_source, wr_source("hello");
Expand All @@ -247,12 +245,12 @@ TEST(eventd, proxy)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));

/* subscriber in a thread */
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));

/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));

/* Init pub connection */
void *mock_pub = init_pub(zctx);

Expand All @@ -275,9 +273,6 @@ TEST(eventd, proxy)
}
this_thread::sleep_for(chrono::milliseconds(1000));

delete pxy;
pxy = NULL;

term_sub = true;
term_cap = true;

Expand All @@ -287,6 +282,18 @@ TEST(eventd, proxy)
EXPECT_EQ(rd_cevts_sz, wr_evts.size());

zmq_close(mock_pub);

/* Do control test */

should_read_control = true;

/* capture in a thread */
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));

delete pxy;
pxy = NULL;

thrcc.join();
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
Expand All @@ -295,7 +302,6 @@ TEST(eventd, proxy)
printf("eventd_proxy is tested GOOD\n");
}


TEST(eventd, capture)
{
printf("Capture TEST started\n");
Expand Down Expand Up @@ -329,9 +335,6 @@ TEST(eventd, capture)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);

Expand All @@ -341,6 +344,9 @@ TEST(eventd, capture)
/* Initialize the capture */
EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

EXPECT_TRUE(init_cache > 1);
EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata));

Expand Down Expand Up @@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);

Expand All @@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax)

EXPECT_TRUE(init_cache > 1);

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
Expand Down Expand Up @@ -595,6 +601,7 @@ TEST(eventd, service)
}

thread thread_service(&run_eventd_service);
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES));

/* Need client side service to interact with server side */
EXPECT_EQ(0, service.init_client(zctx));
Expand All @@ -610,7 +617,7 @@ TEST(eventd, service)
string wr_source("hello");

/* Test service startup caching */
event_serialized_lst_t evts_start, evts_read;
event_serialized_lst_t evts_start, evts_read, polled_events;

for(int i=0; i<wr_sz; ++i) {
string evt_str;
Expand All @@ -624,15 +631,32 @@ TEST(eventd, service)
/* Publish events. */
run_pub(mock_pub, wr_source, wr_evts);

/* Published events must have been captured. Give a pause, to ensure sent. */
this_thread::sleep_for(chrono::milliseconds(200));
int max_polling_duration = 2000;
int polling_interval = 100;
auto poll_start_ts = chrono::steady_clock::now();

while(true) {
auto current_ts = chrono::steady_clock::now();
if(chrono::duration_cast<chrono::milliseconds>(current_ts - poll_start_ts).count() >= max_polling_duration) {
break;
}
event_serialized_lst_t read_events;
service.cache_read(read_events);
polled_events.insert(polled_events.end(), read_events.begin(), read_events.end());
if (!read_events.empty()) {
break;
}
this_thread::sleep_for(chrono::milliseconds(polling_interval));
}

EXPECT_EQ(0, service.cache_stop());

/* Read the cache; expect wr_sz events */
/* Read remaining events in cache, if any */
EXPECT_EQ(0, service.cache_read(evts_read));

EXPECT_EQ(evts_read, evts_start);
polled_events.insert(polled_events.end(), evts_read.begin(), evts_read.end());

EXPECT_EQ(polled_events, evts_start);

zmq_close(mock_pub);
}
Expand Down
42 changes: 10 additions & 32 deletions src/sonic-eventd/tests/main.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "gtest/gtest.h"
#include "dbconnector.h"
#include <iostream>
#include <stdexcept>

using namespace std;
using namespace swss;
Expand All @@ -20,57 +21,34 @@ class SwsscommonEnvironment : public ::testing::Environment {
// Override this to define how to set up the environment
void SetUp() override {
// by default , init should be false
cout<<"Default : isInit = "<<SonicDBConfig::isInit()<<endl;
cout << "Default : isInit = " << SonicDBConfig::isInit() << endl;
EXPECT_FALSE(SonicDBConfig::isInit());

// load nonexisting file, should throw exception with NO file existing
try
{
cout<<"INIT: loading nonexisting db config file"<<endl;
SonicDBConfig::initialize(nonexisting_file);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Sonic database config file doesn't exist"));
}
EXPECT_THROW(SonicDBConfig::initialize(nonexisting_file), runtime_error);

EXPECT_FALSE(SonicDBConfig::isInit());

// load local config file, init should be true
SonicDBConfig::initialize(existing_file);
cout<<"INIT: load local db config file, isInit = "<<SonicDBConfig::isInit()<<endl;
cout << "INIT: load local db config file, isInit = " << SonicDBConfig::isInit() << endl;
EXPECT_TRUE(SonicDBConfig::isInit());

// Test the database_global.json file
// by default , global_init should be false
cout<<"Default : isGlobalInit = "<<SonicDBConfig::isGlobalInit()<<endl;
cout << "Default : isGlobalInit = " << SonicDBConfig::isGlobalInit() << endl;
EXPECT_FALSE(SonicDBConfig::isGlobalInit());

// Call an API which actually needs the data populated by SonicDBConfig::initializeGlobalConfig
try
{
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, asic0)"<<endl;
SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"));
}
EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, TEST_NAMESPACE), runtime_error);

// load local global file, init should be true
SonicDBConfig::initializeGlobalConfig(global_existing_file);
cout<<"INIT: load global db config file, isInit = "<<SonicDBConfig::isGlobalInit()<<endl;
cout << "INIT: load global db config file, isInit = " << SonicDBConfig::isGlobalInit() << endl;
EXPECT_TRUE(SonicDBConfig::isGlobalInit());

// Call an API with wrong namespace passed
try
{
cout<<"INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)"<<endl;
SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE);
}
catch (exception &e)
{
EXPECT_TRUE(strstr(e.what(), "Namespace invalid is not a valid namespace name in config file"));
}
cout << "INIT: Invoking SonicDBConfig::getDbId(APPL_DB, invalid)" << endl;
EXPECT_THROW(SonicDBConfig::getDbId(TEST_DB, INVALID_NAMESPACE), out_of_range);

// Get this info handy
try
Expand Down

0 comments on commit 2718ecf

Please sign in to comment.