Skip to content

Commit

Permalink
[CMIS] Add lane_mask parameter to set_loopback_mode() to enable setti…
Browse files Browse the repository at this point in the history
…ng loopback mode for individual lanes rather than the entire physical port (#499)

Description
This commit introduces two additional input parameters, lane_mask and enable, to the set_loopback_mode() function. By specifying lane_mask, users can enable or disable the loopback mode on individual lanes rather than applying it to the entire physical port

Motivation and Context
Previously, the set_loopback_mode() function affected all lanes of a physical port simultaneously, limiting flexibility. By adding the lane_mask parameter, this update allows callers to target specific lanes, thereby enabling more precise testing, debugging, and configuration of network equipment.

How Has This Been Tested?
Tested on Cisco 8111 with Credo C1 cable.

Signed-off-by: xinyu <[email protected]>
  • Loading branch information
xinyulin authored Sep 12, 2024
1 parent 4db745d commit 8fb25f1
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 62 deletions.
225 changes: 192 additions & 33 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,50 +1115,209 @@ def get_loopback_capability(self):
loopback_capability['media_side_output_loopback_supported'] = bool((allowed_loopback_result >> 0) & 0x1)
return loopback_capability

def set_loopback_mode(self, loopback_mode):
def set_host_input_loopback(self, lane_mask, enable):
'''
This function sets the module loopback mode.
Loopback mode has to be one of the five:
1. "none" (default)
2. "host-side-input"
3. "host-side-output"
4. "media-side-input"
5. "media-side-output"
The function will look at 13h:128 to check advertized loopback capabilities.
Return True if the provision succeeds, False if it fails
Sets the host-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False
if loopback_mode == 'none':
status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0)
status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0)
status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0)
status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0)
return all([status_host_input, status_host_output, status_media_input, status_media_output])
elif loopback_mode == 'host-side-input':
if loopback_capability['host_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0xff)
else:
return False
elif loopback_mode == 'host-side-output':
if loopback_capability['host_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0xff)
else:

if loopback_capability['host_side_input_loopback_supported'] is False:
logger.error('Host input loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane host input loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:#x}, media_output_val:{media_output_val:#x}'
logger.error(txt)
return False
elif loopback_mode == 'media-side-input':
if loopback_capability['media_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0xff)
else:

host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask)

def set_host_output_loopback(self, lane_mask, enable):
'''
Sets the host-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['host_side_output_loopback_supported'] is False:
logger.error('Host output loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane host output loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:x}, media_output_val:{media_output_val:#x}'
logger.error(txt)
return False
elif loopback_mode == 'media-side-output':
if loopback_capability['media_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0xff)
else:

host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask)

def set_media_input_loopback(self, lane_mask, enable):
'''
Sets the media-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['media_side_input_loopback_supported'] is False:
logger.error('Media input loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane media input loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}'
logger.error(txt)
return False

media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask)

def set_media_output_loopback(self, lane_mask, enable):
'''
Sets the media-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['media_side_output_loopback_supported'] is False:
logger.error('Media output loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane media output loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}'
logger.error(txt)
return False

media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask)

def set_loopback_mode(self, loopback_mode, lane_mask = 0xff, enable = False):
'''
This function sets the module loopback mode.
Args:
- loopback_mode (str): Specifies the loopback mode. It must be one of the following:
1. "none"
2. "host-side-input"
3. "host-side-output"
4. "media-side-input"
5. "media-side-output"
- lane_mask (int): A bitmask representing the lanes to which the loopback mode should
be applied. Default 0xFF applies to all lanes.
- enable (bool): Whether to enable or disable the loopback mode. Default False.
Returns:
- bool: True if the operation succeeds, False otherwise.
'''
loopback_functions = {
'host-side-input': self.set_host_input_loopback,
'host-side-output': self.set_host_output_loopback,
'media-side-input': self.set_media_input_loopback,
'media-side-output': self.set_media_output_loopback,
}

if loopback_mode == 'none':
return all([
self.set_host_input_loopback(0xff, False),
self.set_host_output_loopback(0xff, False),
self.set_media_input_loopback(0xff, False),
self.set_media_output_loopback(0xff, False)
])

func = loopback_functions.get(loopback_mode)
if func:
return func(lane_mask, enable)

logger.error('Invalid loopback mode:%s, lane_mask:%#x', loopback_mode, lane_mask)
return False

def get_vdm(self):
'''
This function returns all the VDM items, including real time monitor value, threholds and flags
Expand Down
Loading

0 comments on commit 8fb25f1

Please sign in to comment.