Skip to content

Commit

Permalink
[CMIS] Add lane_mask parameter to set_loopback_mode() to enable setti… (
Browse files Browse the repository at this point in the history
#490)

* [CMIS] Add lane_mask parameter to set_loopback_mode() to enable setting loopback mode for individual lanes rather than the entire physical port

Signed-off-by: xinyu <[email protected]>

* [CMIS] Add options to individually clear host/media, input/output loopback modes and log errors for all rejection cases

Signed-off-by: xinyu <[email protected]>

* [CMIS] Reject the loopback mode request if simultaneous host and media loopback is not supported or per-lane loopback is not supported

Signed-off-by: xinyu <[email protected]>

* [CMIS] Log errors for all set loopback mode rejection cases

Signed-off-by: xinyu <[email protected]>

* [CMIS] Improve readability by using individual functions for setting host, media, input, and output loopback modes

Signed-off-by: xinyu <[email protected]>

* [CMIS] Add an enable parameter to eliminate the need for loopback modes with the -none suffix

Signed-off-by: xinyu <[email protected]>

---------

Signed-off-by: xinyu <[email protected]>
  • Loading branch information
xinyulin authored and mssonicbld committed Oct 17, 2024
1 parent 03a0118 commit 049952d
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 62 deletions.
225 changes: 192 additions & 33 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,50 +1115,209 @@ def get_loopback_capability(self):
loopback_capability['media_side_output_loopback_supported'] = bool((allowed_loopback_result >> 0) & 0x1)
return loopback_capability

def set_loopback_mode(self, loopback_mode):
def set_host_input_loopback(self, lane_mask, enable):
'''
This function sets the module loopback mode.
Loopback mode has to be one of the five:
1. "none" (default)
2. "host-side-input"
3. "host-side-output"
4. "media-side-input"
5. "media-side-output"
The function will look at 13h:128 to check advertized loopback capabilities.
Return True if the provision succeeds, False if it fails
Sets the host-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False
if loopback_mode == 'none':
status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0)
status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0)
status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0)
status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0)
return all([status_host_input, status_host_output, status_media_input, status_media_output])
elif loopback_mode == 'host-side-input':
if loopback_capability['host_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0xff)
else:
return False
elif loopback_mode == 'host-side-output':
if loopback_capability['host_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0xff)
else:

if loopback_capability['host_side_input_loopback_supported'] is False:
logger.error('Host input loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane host input loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:#x}, media_output_val:{media_output_val:#x}'
logger.error(txt)
return False
elif loopback_mode == 'media-side-input':
if loopback_capability['media_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0xff)
else:

host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask)

def set_host_output_loopback(self, lane_mask, enable):
'''
Sets the host-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['host_side_output_loopback_supported'] is False:
logger.error('Host output loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane host output loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:x}, media_output_val:{media_output_val:#x}'
logger.error(txt)
return False
elif loopback_mode == 'media-side-output':
if loopback_capability['media_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0xff)
else:

host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask)

def set_media_input_loopback(self, lane_mask, enable):
'''
Sets the media-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['media_side_input_loopback_supported'] is False:
logger.error('Media input loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane media input loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}'
logger.error(txt)
return False

media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask)

def set_media_output_loopback(self, lane_mask, enable):
'''
Sets the media-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['media_side_output_loopback_supported'] is False:
logger.error('Media output loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
logger.error('Per-lane media output loopback is not supported, lane_mask:%#x', lane_mask)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}'
logger.error(txt)
return False

media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask)

def set_loopback_mode(self, loopback_mode, lane_mask = 0xff, enable = False):
'''
This function sets the module loopback mode.
Args:
- loopback_mode (str): Specifies the loopback mode. It must be one of the following:
1. "none"
2. "host-side-input"
3. "host-side-output"
4. "media-side-input"
5. "media-side-output"
- lane_mask (int): A bitmask representing the lanes to which the loopback mode should
be applied. Default 0xFF applies to all lanes.
- enable (bool): Whether to enable or disable the loopback mode. Default False.
Returns:
- bool: True if the operation succeeds, False otherwise.
'''
loopback_functions = {
'host-side-input': self.set_host_input_loopback,
'host-side-output': self.set_host_output_loopback,
'media-side-input': self.set_media_input_loopback,
'media-side-output': self.set_media_output_loopback,
}

if loopback_mode == 'none':
return all([
self.set_host_input_loopback(0xff, False),
self.set_host_output_loopback(0xff, False),
self.set_media_input_loopback(0xff, False),
self.set_media_output_loopback(0xff, False)
])

func = loopback_functions.get(loopback_mode)
if func:
return func(lane_mask, enable)

logger.error('Invalid loopback mode:%s, lane_mask:%#x', loopback_mode, lane_mask)
return False

def get_vdm(self, field_option=None):
'''
This function returns all the VDM items, including real time monitor value, threholds and flags
Expand Down
Loading

0 comments on commit 049952d

Please sign in to comment.