Skip to content

Commit

Permalink
[#16015 Fix]: Cleaning up unused code from snappi_fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
selldinesh committed Dec 11, 2024
1 parent 4aa835e commit 4ffacc9
Showing 1 changed file with 0 additions and 120 deletions.
120 changes: 0 additions & 120 deletions tests/common/snappi_tests/snappi_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,47 +664,6 @@ def setup_dut_ports(
return config, port_config_list, snappi_ports


@pytest.fixture(scope="function")
def get_multidut_snappi_ports(duthosts, conn_graph_facts, fanout_graph_facts): # noqa: F811
"""
Populate tgen ports and connected DUT ports info of T0 testbed and returns as a list
Args:
duthost (pytest fixture): duthost fixture
conn_graph_facts (pytest fixture): connection graph
fanout_graph_facts (pytest fixture): fanout graph
Return:
return tuple of duts and tgen ports
"""
def _get_multidut_snappi_ports(line_card_choice, line_card_info):
host_names = line_card_info['hostname']
asic_info = line_card_info['asic']
asic_port_map = {
"asic0": ['Ethernet%d' % i for i in range(0, 72, 4)],
"asic1": ['Ethernet%d' % i for i in range(72, 144, 4)],
None: ['Ethernet%d' % i for i in range(0, 144, 4)],
}
ports = []
for index, host in enumerate(duthosts):
snappi_fanout_list = SnappiFanoutManager(fanout_graph_facts)
for i in range(len(snappi_fanout_list.fanout_list)):
try:
snappi_fanout_list.get_fanout_device_details(i)
except Exception:
pass
snappi_ports = snappi_fanout_list.get_ports(peer_device=host.hostname)
for port in snappi_ports:
port['location'] = get_snappi_port_location(port)
for hostname in host_names:
for asic in asic_info:
if port["peer_port"] in asic_port_map[asic] and hostname in port['peer_device']:
port['asic_value'] = asic
port['asic_type'] = host.facts["asic_type"]
port['duthost'] = host
ports.append(port)
return ports
return _get_multidut_snappi_ports


def get_tgen_peer_ports(snappi_ports, hostname):
ports = [(port['location'], port['peer_port']) for port in snappi_ports if port['peer_device'] == hostname]
return ports
Expand Down Expand Up @@ -861,85 +820,6 @@ def __intf_config_multidut(config, port_config_list, duthost, snappi_ports, setu
return True


def get_multidut_tgen_peer_port_set(line_card_choice, ports, config_set, number_of_tgen_peer_ports=2):
"""
Configures interfaces of the DUT
Args:
line_card_choice (obj): Line card type defined by the variable file
ports (list): list of Snappi port configuration information
config_set: Comprises of linecard configuration type and asic values
number_of_tgen_peer_ports: number of ports needed for the test
Returns:
The ports for the respective line card choice from the testbed file
"""
linecards = {}
try:
from itertools import product
from itertools import izip_longest as zip_longest
except ImportError:
from itertools import zip_longest

for port in ports:
if port['peer_device'] in linecards:
if port['asic_value'] not in linecards[port['peer_device']]:
linecards[port['peer_device']][port['asic_value']] = []
else:
linecards[port['peer_device']] = {}
linecards[port['peer_device']][port['asic_value']] = []
linecards[port['peer_device']][port['asic_value']].append(port)

if len(ports) < number_of_tgen_peer_ports or not linecards:
raise Exception("Not Enough ports ")
peer_ports = []
if line_card_choice in ['chassis_single_line_card_single_asic', 'non_chassis_single_line_card']:
# same asic ports required
for line_card, asics in linecards.items():
for asic, asic_info in asics.items():
if config_set[line_card_choice]['asic'][0] == asic:
if len(asic_info) >= number_of_tgen_peer_ports:
peer_ports = list(random.sample(asic_info, number_of_tgen_peer_ports))
return peer_ports
else:
raise Exception(
'Error: Not enough ports for line card "%s" and asic "%s"' % (line_card_choice, asic))
elif line_card_choice in ['chassis_single_line_card_multi_asic']:
# need 2 asic minimum one port from each asic
for line_card, asics in linecards.items():
if len(asics.keys()) >= 2:
peer_ports = list(zip_longest(*asics.values()))
peer_ports = [item for sublist in peer_ports for item in sublist]
peer_ports = list(filter(None, peer_ports))
return peer_ports[:number_of_tgen_peer_ports]
else:
raise Exception('Error: Invalid line_card_choice or Not enough ports')

elif line_card_choice in ['chassis_multi_line_card_single_asic', 'non_chassis_multi_line_card']:
# DIfferent line card and minimum one port from same same asic number
if len(linecards.keys()) >= 2:
common_asic_across_line_cards = set(linecards[next(iter(linecards))].keys())
for d in linecards.values():
common_asic_across_line_cards.intersection_update(set(d.keys()))
for asic in common_asic_across_line_cards:
peer_ports = [linecards[line_card][asic] for line_card in linecards.keys()]
peer_ports = list(zip(*peer_ports))
peer_ports = [item for sublist in peer_ports for item in sublist]
return peer_ports[:number_of_tgen_peer_ports]
else:
raise Exception('Error: Not enough line_card_choice')

elif line_card_choice in ['chassis_multi_line_card_multi_asic']:
# Different line card and minimum one port from different asic number
if len(linecards.keys()) >= 2:
host_asic = list(product(config_set[line_card_choice]['hostname'], config_set[line_card_choice]['asic']))
peer_ports = list(zip_longest(*[linecards[host][asic]
for host, asic in host_asic if asic in linecards[host]]))
peer_ports = [item for sublist in peer_ports for item in sublist]
peer_ports = list(filter(None, peer_ports))
return peer_ports[:number_of_tgen_peer_ports]
else:
raise Exception('Error: Not enough line_card_choice')


def create_ip_list(value, count, mask=32, incr=0):
'''
Create a list of ips based on the count provided
Expand Down

0 comments on commit 4ffacc9

Please sign in to comment.