diff --git a/src/sonic-eventd/Makefile b/src/sonic-eventd/Makefile index 9faeb3027196..ebc1aa7ee9e8 100644 --- a/src/sonic-eventd/Makefile +++ b/src/sonic-eventd/Makefile @@ -29,7 +29,7 @@ endif -include rsyslog_plugin/subdir.mk -include rsyslog_plugin_tests/subdir.mk -all: sonic-eventd eventd-tool rsyslog-plugin +all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests sonic-eventd: $(OBJS) @echo 'Building target: $@' diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index 953fe9b7c491..36c162453427 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -546,9 +546,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst switch(ctrl) { case INIT_CAPTURE: m_thr = thread(&capture_service::do_capture, this); - for(int i=0; !m_cap_run && (i < 100); ++i) { + for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) { /* Wait max a second for thread to init */ - this_thread::sleep_for(chrono::milliseconds(10)); + this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION)); } RET_ON_ERR(m_cap_run, "Failed to init capture"); m_ctrl = ctrl; diff --git a/src/sonic-eventd/src/eventd.h b/src/sonic-eventd/src/eventd.h index 706863667eea..a7a87f9436a0 100644 --- a/src/sonic-eventd/src/eventd.h +++ b/src/sonic-eventd/src/eventd.h @@ -21,6 +21,8 @@ typedef enum { #define EVENTS_STATS_FIELD_NAME "value" #define STATS_HEARTBEAT_MIN 300 +#define CAPTURE_SERVICE_POLLING_DURATION 10 +#define CAPTURE_SERVICE_POLLING_RETRIES 100 /* * Started by eventd_service. diff --git a/src/sonic-eventd/tests/eventd_ut.cpp b/src/sonic-eventd/tests/eventd_ut.cpp index db46845b0480..729563fcd39d 100644 --- a/src/sonic-eventd/tests/eventd_ut.cpp +++ b/src/sonic-eventd/tests/eventd_ut.cpp @@ -152,25 +152,23 @@ static const test_data_t ldata[] = { void run_cap(void *zctx, bool &term, string &read_source, - int &cnt) + int &cnt, bool &should_read_control) { void *mock_cap = zmq_socket (zctx, ZMQ_SUB); string source; internal_event_t ev_int; int block_ms = 200; int i=0; - static int proxy_finished_init = false; EXPECT_TRUE(NULL != mock_cap); EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); - if(!proxy_finished_init) { + if(should_read_control) { zmq_msg_t msg; zmq_msg_init(&msg); - EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message - proxy_finished_init = true; + EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture } while(!term) { @@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst) } } - TEST(eventd, proxy) { printf("Proxy TEST started\n"); + bool should_read_control = false; bool term_sub = false; bool term_cap = false; string rd_csource, rd_source, wr_source("hello"); @@ -247,12 +245,12 @@ TEST(eventd, proxy) /* Starting proxy */ EXPECT_EQ(0, pxy->init()); + /* capture in a thread */ + thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control)); + /* subscriber in a thread */ thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz)); - /* capture in a thread */ - thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz)); - /* Init pub connection */ void *mock_pub = init_pub(zctx); @@ -275,9 +273,6 @@ TEST(eventd, proxy) } this_thread::sleep_for(chrono::milliseconds(1000)); - delete pxy; - pxy = NULL; - term_sub = true; term_cap = true; @@ -287,6 +282,18 @@ TEST(eventd, proxy) EXPECT_EQ(rd_cevts_sz, wr_evts.size()); zmq_close(mock_pub); + + /* Do control test */ + + should_read_control = true; + + /* capture in a thread */ + thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control)); + + delete pxy; + pxy = NULL; + + thrcc.join(); zmq_ctx_term(zctx); /* Provide time for async proxy removal to complete */ @@ -295,7 +302,6 @@ TEST(eventd, proxy) printf("eventd_proxy is tested GOOD\n"); } - TEST(eventd, capture) { printf("Capture TEST started\n"); @@ -329,9 +335,6 @@ TEST(eventd, capture) /* Starting proxy */ EXPECT_EQ(0, pxy->init()); - /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ - thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); - /* Create capture service */ capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); @@ -341,6 +344,9 @@ TEST(eventd, capture) /* Initialize the capture */ EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); + /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ + thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); + EXPECT_TRUE(init_cache > 1); EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata)); @@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax) /* Starting proxy */ EXPECT_EQ(0, pxy->init()); - /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ - thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); - /* Create capture service */ capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); @@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax) EXPECT_TRUE(init_cache > 1); + /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ + thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); + /* Collect few serailized strings of events for startup cache */ for(int i=0; i < init_cache; ++i) { internal_event_t ev(create_ev(ldata[i])); @@ -595,6 +601,7 @@ TEST(eventd, service) } thread thread_service(&run_eventd_service); + this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES)); /* Need client side service to interact with server side */ EXPECT_EQ(0, service.init_client(zctx)); @@ -610,7 +617,7 @@ TEST(eventd, service) string wr_source("hello"); /* Test service startup caching */ - event_serialized_lst_t evts_start, evts_read; + event_serialized_lst_t evts_start, evts_read, polled_events; for(int i=0; i(current_ts - poll_start_ts).count() >= max_polling_duration) { + break; + } + event_serialized_lst_t read_events; + service.cache_read(read_events); + polled_events.insert(polled_events.end(), read_events.begin(), read_events.end()); + if (!read_events.empty()) { + break; + } + this_thread::sleep_for(chrono::milliseconds(polling_interval)); + } EXPECT_EQ(0, service.cache_stop()); - /* Read the cache; expect wr_sz events */ + /* Read remaining events in cache, if any */ EXPECT_EQ(0, service.cache_read(evts_read)); - EXPECT_EQ(evts_read, evts_start); + polled_events.insert(polled_events.end(), evts_read.begin(), evts_read.end()); + + EXPECT_EQ(polled_events, evts_start); zmq_close(mock_pub); } diff --git a/src/sonic-eventd/tests/main.cpp b/src/sonic-eventd/tests/main.cpp index fdc175e645d8..f0d011305a50 100644 --- a/src/sonic-eventd/tests/main.cpp +++ b/src/sonic-eventd/tests/main.cpp @@ -1,6 +1,7 @@ #include "gtest/gtest.h" #include "dbconnector.h" #include +#include using namespace std; using namespace swss; @@ -20,57 +21,34 @@ class SwsscommonEnvironment : public ::testing::Environment { // Override this to define how to set up the environment void SetUp() override { // by default , init should be false - cout<<"Default : isInit = "<