Skip to content

Commit

Permalink
Fix test_events flakiness (sonic-net#14140)
Browse files Browse the repository at this point in the history
## Description of PR

Summary:
Fixes # (issue) 28728116

## Approach
#### What is the motivation for this PR?

This PR will

1) Add test_events back to PR checker
2) Fix event_down container test flakiness
3) Remove unneeded test_events_cache test as our tests already use cache for testing

#### How did you do it?

Code change

#### How did you verify/test it?

Lab test and Pipeline
  • Loading branch information
zbud-msft authored Sep 4, 2024
1 parent 47f3167 commit 6e633a6
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 93 deletions.
25 changes: 0 additions & 25 deletions tests/telemetry/events/event_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
import os
import pytest
import json
import re

import ptf.packet as scapy
import ptf.testutils as testutils
Expand Down Expand Up @@ -72,17 +69,6 @@ def check_monit_running(duthost):
return monit_services_status


def create_ip_file(duthost, data_dir, json_file, start_idx, end_idx):
ip_file = os.path.join(data_dir, json_file)
with open(ip_file, "w") as f:
for i in range(start_idx, end_idx + 1):
json_string = f'{{"test-event-source:test": {{"test_key": "test_val_{i}"}}}}'
f.write(json_string + '\n')
dest = "~/" + json_file
duthost.copy(src=ip_file, dest=dest)
duthost.shell("docker cp {} eventd:/".format(dest))


def event_publish_tool(duthost, json_file='', count=1):
cmd = "docker exec eventd python /usr/bin/events_publish_tool.py"
if json_file == '':
Expand All @@ -93,17 +79,6 @@ def event_publish_tool(duthost, json_file='', count=1):
assert ret["rc"] == 0, "Unable to publish events via events_publish_tool.py"


def verify_received_output(received_file, N):
key = "test_key"
with open(received_file, 'r') as file:
json_array = json.load(file)
pytest_assert(len(json_array) == N, "Expected {} events, but found {}".format(N, len(json_array)))
for i in range(0, len(json_array)):
block = json_array[i]["test-event-source:test"]
pytest_assert(key in block and len(re.findall('test_val_{}'.format(i + 1), block[key])) > 0,
"Missing key or incorrect value")


def restart_eventd(duthost):
if duthost.is_multi_asic:
pytest.skip("Skip eventd testing on multi-asic")
Expand Down
16 changes: 11 additions & 5 deletions tests/telemetry/events/host_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_event(duthost, gnxi_path, ptfhost, ptfadapter, data_dir, validate_yang)
"mem_threshold_exceeded.json", "sonic-events-host:mem-threshold-exceeded", tag)
run_test(duthost, gnxi_path, ptfhost, data_dir, validate_yang, restart_container,
"event_stopped_ctr.json", "sonic-events-host:event-stopped-ctr", tag, False)
run_test(duthost, gnxi_path, ptfhost, data_dir, validate_yang, mask_container,
run_test(duthost, gnxi_path, ptfhost, data_dir, validate_yang, stop_container,
"event_down_ctr.json", "sonic-events-host:event-down-ctr", tag, False)
finally:
restore_monit_config(duthost)
Expand Down Expand Up @@ -105,17 +105,23 @@ def restart_container(duthost):
assert is_container_running, "{} not running after restart".format(container)


def mask_container(duthost):
logger.info("Masking container for event down event")
def stop_container(duthost):
logger.info("Stop container for event down event")
container = get_running_container(duthost)
assert container != "", "No available container for testing"

duthost.shell("systemctl mask {}".format(container))
duthost.shell("config feature autorestart {} disabled".format(container))
duthost.shell("docker stop {}".format(container))
output = duthost.shell("sonic-db-cli STATE_DB hget \"FEATURE|{}\" \"container_id\"".format(container))['stdout']
if output:
duthost.shell("sonic-db-cli STATE_DB hset \"FEATURE|{}\" \"container_id\" \"\"".format(container))

time.sleep(30) # Wait 30 seconds for container_checker to fire event

duthost.shell("systemctl unmask {}".format(container))
if output:
duthost.shell("sonic-db-cli STATE_DB hset \"FEATURE|{}\" \"container_id\" {}".format(container, output))

duthost.shell("config feature autorestart {} enabled".format(container))
duthost.shell("systemctl restart {}".format(container))


Expand Down
4 changes: 2 additions & 2 deletions tests/telemetry/events/run_events_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@


def run_test(duthost, gnxi_path, ptfhost, data_dir, validate_yang, trigger, json_file,
filter_event_regex, tag, heartbeat=False, thread_timeout=30, ptfadapter=None):
filter_event_regex, tag, heartbeat=False, timeout=30, ptfadapter=None):
op_file = os.path.join(data_dir, json_file)
if trigger is not None: # no trigger for heartbeat
if ptfadapter is None:
trigger(duthost) # add events to cache
else:
trigger(duthost, ptfadapter)
listen_for_events(duthost, gnxi_path, ptfhost, filter_event_regex, op_file,
thread_timeout) # listen from cache
timeout) # listen from cache
data = {}
with open(op_file, "r") as f:
data = json.load(f)
Expand Down
41 changes: 24 additions & 17 deletions tests/telemetry/telemetry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from pkg_resources import parse_version
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import InterruptableThread
from tests.common.helpers.gnmi_utils import GNMIEnvironment

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -97,25 +96,17 @@ def fetch_json_ptf_output(regex, output, match_no):
return match[:match_no+1]


def listen_for_event(ptfhost, cmd, results):
ret = ptfhost.shell(cmd)
assert ret["rc"] == 0, "PTF docker was not able to query EVENTS path"
results[0] = ret["stdout"]


def listen_for_events(duthost, gnxi_path, ptfhost, filter_event_regex, op_file, thread_timeout, update_count=1,
def listen_for_events(duthost, gnxi_path, ptfhost, filter_event_regex, op_file, timeout, update_count=1,
match_number=0):
cmd = generate_client_cli(duthost=duthost, gnxi_path=gnxi_path, method=METHOD_SUBSCRIBE,
submode=SUBMODE_ONCHANGE, update_count=update_count, xpath="all[heartbeat=2]",
target="EVENTS", filter_event_regex=filter_event_regex)
results = [""]
event_thread = InterruptableThread(target=listen_for_event, args=(ptfhost, cmd, results,))
event_thread.start()
event_thread.join(thread_timeout) # close thread after 30 sec, was not able to find event within reasonable time
assert results[0] != "", "No output from PTF docker, thread timed out after {} seconds".format(thread_timeout)
target="EVENTS", filter_event_regex=filter_event_regex, timeout=timeout)
result = ptfhost.shell(cmd)
assert result["rc"] == 0, "PTF command failed with non zero return code"
output = result["stdout"]
assert len(output) != 0, "No output from PTF docker, thread timed out after {} seconds".format(timeout)
# regex logic and then to write to file
result = results[0]
event_strs = fetch_json_ptf_output(EVENT_REGEX, result, match_number)
event_strs = fetch_json_ptf_output(EVENT_REGEX, output, match_number)
with open(op_file, "w") as f:
f.write("[\n")
for i in range(0, len(event_strs)):
Expand All @@ -139,8 +130,22 @@ def trigger_logger(duthost, log, process, container="", priority="local0.notice"

def generate_client_cli(duthost, gnxi_path, method=METHOD_GET, xpath="COUNTERS/Ethernet0", target="COUNTERS_DB",
subscribe_mode=SUBSCRIBE_MODE_STREAM, submode=SUBMODE_SAMPLE,
intervalms=0, update_count=3, create_connections=1, filter_event_regex=""):
intervalms=0, update_count=3, create_connections=1, filter_event_regex="",
timeout=-1):
""" Generate the py_gnmicli command line based on the given params.
t --target: gNMI target; required
p --port: port of target; required
m --mode: get/susbcribe; default get
x --xpath: gnmi path, table name; required
xt --xpath_target: gnmi path prefix, db name
o --host_override, targets hostname for certificate CN
subscribe_mode: 0=STREAM, 1=ONCE, 2=POLL; default 0
submode: 0=TARGET_DEFINED, 1=ON_CHANGE, 2=SAMPLE; default 2
interval: sample interval in milliseconds, default 10000ms
update_count: Max number of streaming updates to receive. 0 means no limit. default 0
create_connections: Creates TCP connections with gNMI server; default 1; -1 for infinite connections
filter_event_regex: Regex to filter event when querying events path
timeout: Subscription duration in seconds; After X seconds, request terminates; default none
"""
env = GNMIEnvironment(duthost, GNMIEnvironment.TELEMETRY_MODE)
cmdFormat = 'python ' + gnxi_path + 'gnmi_cli_py/py_gnmicli.py -g -t {0} -p {1} -m {2} -x {3} -xt {4} -o {5}'
Expand All @@ -153,6 +158,8 @@ def generate_client_cli(duthost, gnxi_path, method=METHOD_GET, xpath="COUNTERS/E
update_count, create_connections)
if filter_event_regex != "":
cmd += " --filter_event_regex {}".format(filter_event_regex)
if timeout > 0:
cmd += " --timeout {}".format(timeout)
return cmd


Expand Down
50 changes: 6 additions & 44 deletions tests/telemetry/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import os
import sys

from tests.common.utilities import InterruptableThread
from telemetry_utils import listen_for_events
from telemetry_utils import skip_201911_and_older
from events.event_utils import create_ip_file
from events.event_utils import event_publish_tool, verify_received_output
from events.event_utils import event_publish_tool
from events.event_utils import reset_event_counters, read_event_counters
from events.event_utils import verify_counter_increase, restart_eventd

Expand Down Expand Up @@ -50,49 +47,14 @@ def test_events(duthosts, enum_rand_one_per_hwsku_hostname, ptfhost, ptfadapter,
for file in os.listdir(EVENTS_TESTS_PATH):
if file.endswith("_events.py") and not file.endswith("eventd_events.py"):
module = __import__(file[:len(file)-3])
module.test_event(duthost, gnxi_path, ptfhost, ptfadapter, DATA_DIR, validate_yang)
try:
module.test_event(duthost, gnxi_path, ptfhost, ptfadapter, DATA_DIR, validate_yang)
except pytest.skip.Exception as e:
logger.info("Skipping test file: {} due to {}".format(file, e))
continue
logger.info("Completed test file: {}".format(os.path.join(EVENTS_TESTS_PATH, file)))


@pytest.mark.parametrize('setup_streaming_telemetry', [False], indirect=True)
@pytest.mark.disable_loganalyzer
def test_events_cache(duthosts, enum_rand_one_per_hwsku_hostname, ptfhost, setup_streaming_telemetry, gnxi_path):
"""Create expected o/p file of events with N events. Call event-publisher tool to publish M events (M<N). Publish
remainder of events. Verify o/p file that N events were received"""
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
logger.info("Start events cache testing")

skip_201911_and_older(duthost)
reset_event_counters(duthost)
restart_eventd(duthost)
current_published_counter = read_event_counters(duthost)[1]

M = 20
N = 30

received_op_file = os.path.join(DATA_DIR, "received_op_file")

create_ip_file(duthost, DATA_DIR, "first_part_ip_file", 1, M)
create_ip_file(duthost, DATA_DIR, "second_part_ip_file", M + 1, N)

# Publish first M events
event_publish_tool(duthost, "first_part_ip_file")

event_thread = InterruptableThread(target=listen_for_events, args=(duthost, gnxi_path, ptfhost,
"test-event-source:test", received_op_file, 30, N, N-1))
event_thread.start()

# Publish second batch of events
event_publish_tool(duthost, "second_part_ip_file")

event_thread.join(30)

# Verify received output
verify_received_output(received_op_file, N)

verify_counter_increase(duthost, current_published_counter, N, PUBLISHED)


@pytest.mark.parametrize('setup_streaming_telemetry', [False], indirect=True)
@pytest.mark.disable_loganalyzer
def test_events_cache_overflow(duthosts, enum_rand_one_per_hwsku_hostname, ptfhost, setup_streaming_telemetry,
Expand Down

0 comments on commit 6e633a6

Please sign in to comment.