diff --git a/tests/common/snappi_tests/snappi_fixtures.py b/tests/common/snappi_tests/snappi_fixtures.py index 816266fcd3f..5d99964538b 100755 --- a/tests/common/snappi_tests/snappi_fixtures.py +++ b/tests/common/snappi_tests/snappi_fixtures.py @@ -664,47 +664,6 @@ def setup_dut_ports( return config, port_config_list, snappi_ports -@pytest.fixture(scope="function") -def get_multidut_snappi_ports(duthosts, conn_graph_facts, fanout_graph_facts): # noqa: F811 - """ - Populate tgen ports and connected DUT ports info of T0 testbed and returns as a list - Args: - duthost (pytest fixture): duthost fixture - conn_graph_facts (pytest fixture): connection graph - fanout_graph_facts (pytest fixture): fanout graph - Return: - return tuple of duts and tgen ports - """ - def _get_multidut_snappi_ports(line_card_choice, line_card_info): - host_names = line_card_info['hostname'] - asic_info = line_card_info['asic'] - asic_port_map = { - "asic0": ['Ethernet%d' % i for i in range(0, 72, 4)], - "asic1": ['Ethernet%d' % i for i in range(72, 144, 4)], - None: ['Ethernet%d' % i for i in range(0, 144, 4)], - } - ports = [] - for index, host in enumerate(duthosts): - snappi_fanout_list = SnappiFanoutManager(fanout_graph_facts) - for i in range(len(snappi_fanout_list.fanout_list)): - try: - snappi_fanout_list.get_fanout_device_details(i) - except Exception: - pass - snappi_ports = snappi_fanout_list.get_ports(peer_device=host.hostname) - for port in snappi_ports: - port['location'] = get_snappi_port_location(port) - for hostname in host_names: - for asic in asic_info: - if port["peer_port"] in asic_port_map[asic] and hostname in port['peer_device']: - port['asic_value'] = asic - port['asic_type'] = host.facts["asic_type"] - port['duthost'] = host - ports.append(port) - return ports - return _get_multidut_snappi_ports - - def get_tgen_peer_ports(snappi_ports, hostname): ports = [(port['location'], port['peer_port']) for port in snappi_ports if port['peer_device'] == hostname] return ports @@ -861,85 +820,6 @@ def __intf_config_multidut(config, port_config_list, duthost, snappi_ports, setu return True -def get_multidut_tgen_peer_port_set(line_card_choice, ports, config_set, number_of_tgen_peer_ports=2): - """ - Configures interfaces of the DUT - Args: - line_card_choice (obj): Line card type defined by the variable file - ports (list): list of Snappi port configuration information - config_set: Comprises of linecard configuration type and asic values - number_of_tgen_peer_ports: number of ports needed for the test - Returns: - The ports for the respective line card choice from the testbed file - """ - linecards = {} - try: - from itertools import product - from itertools import izip_longest as zip_longest - except ImportError: - from itertools import zip_longest - - for port in ports: - if port['peer_device'] in linecards: - if port['asic_value'] not in linecards[port['peer_device']]: - linecards[port['peer_device']][port['asic_value']] = [] - else: - linecards[port['peer_device']] = {} - linecards[port['peer_device']][port['asic_value']] = [] - linecards[port['peer_device']][port['asic_value']].append(port) - - if len(ports) < number_of_tgen_peer_ports or not linecards: - raise Exception("Not Enough ports ") - peer_ports = [] - if line_card_choice in ['chassis_single_line_card_single_asic', 'non_chassis_single_line_card']: - # same asic ports required - for line_card, asics in linecards.items(): - for asic, asic_info in asics.items(): - if config_set[line_card_choice]['asic'][0] == asic: - if len(asic_info) >= number_of_tgen_peer_ports: - peer_ports = list(random.sample(asic_info, number_of_tgen_peer_ports)) - return peer_ports - else: - raise Exception( - 'Error: Not enough ports for line card "%s" and asic "%s"' % (line_card_choice, asic)) - elif line_card_choice in ['chassis_single_line_card_multi_asic']: - # need 2 asic minimum one port from each asic - for line_card, asics in linecards.items(): - if len(asics.keys()) >= 2: - peer_ports = list(zip_longest(*asics.values())) - peer_ports = [item for sublist in peer_ports for item in sublist] - peer_ports = list(filter(None, peer_ports)) - return peer_ports[:number_of_tgen_peer_ports] - else: - raise Exception('Error: Invalid line_card_choice or Not enough ports') - - elif line_card_choice in ['chassis_multi_line_card_single_asic', 'non_chassis_multi_line_card']: - # DIfferent line card and minimum one port from same same asic number - if len(linecards.keys()) >= 2: - common_asic_across_line_cards = set(linecards[next(iter(linecards))].keys()) - for d in linecards.values(): - common_asic_across_line_cards.intersection_update(set(d.keys())) - for asic in common_asic_across_line_cards: - peer_ports = [linecards[line_card][asic] for line_card in linecards.keys()] - peer_ports = list(zip(*peer_ports)) - peer_ports = [item for sublist in peer_ports for item in sublist] - return peer_ports[:number_of_tgen_peer_ports] - else: - raise Exception('Error: Not enough line_card_choice') - - elif line_card_choice in ['chassis_multi_line_card_multi_asic']: - # Different line card and minimum one port from different asic number - if len(linecards.keys()) >= 2: - host_asic = list(product(config_set[line_card_choice]['hostname'], config_set[line_card_choice]['asic'])) - peer_ports = list(zip_longest(*[linecards[host][asic] - for host, asic in host_asic if asic in linecards[host]])) - peer_ports = [item for sublist in peer_ports for item in sublist] - peer_ports = list(filter(None, peer_ports)) - return peer_ports[:number_of_tgen_peer_ports] - else: - raise Exception('Error: Not enough line_card_choice') - - def create_ip_list(value, count, mask=32, incr=0): ''' Create a list of ips based on the count provided