Skip to content

Commit

Permalink
[CMIS] Add options to individually clear host/media, input/output loo…
Browse files Browse the repository at this point in the history
…pback modes and log errors for all rejection cases

Signed-off-by: xinyu <[email protected]>
  • Loading branch information
xinyulin committed Aug 18, 2024
1 parent 24f94a5 commit a65614b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 49 deletions.
89 changes: 62 additions & 27 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,11 +1119,15 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff):
'''
This function sets the module loopback mode.
loopback_mode: Loopback mode has to be one of the five:
1. "none" (default)
2. "host-side-input"
3. "host-side-output"
4. "media-side-input"
5. "media-side-output"
1. "none"
2. "host-side-input-none"
3. "host-side-output-none",
4. "media-side-input-none"
5. "media-side-output-none"
6. "host-side-input"
7. "host-side-output"
8. "media-side-input"
9. "media-side-output"
lane_mask: A bitmask representing which lanes to apply the loopback mode to.
The default value of 0xFF indicates that the mode should be applied to all lanes.
Expand All @@ -1132,42 +1136,73 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff):
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

status_host_input = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
status_host_output = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
status_media_input = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
status_media_output = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)

if loopback_mode == 'none':
status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0)
status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0)
status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0)
status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0)
return all([status_host_input, status_host_output, status_media_input, status_media_output])
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)

if lane_mask != 0xff:
if any([loopback_capability['per_lane_host_loopback_supported'] is False and 'host' in loopback_mode,
loopback_capability['per_lane_media_loopback_supported'] is False and 'media' in loopback_mode]):
lane_mask = 0xff
txt = f'Per-lane {loopback_mode} loopback is not supported, lane_mask is automatically set to 0xff\n'
logger.error(txt)

if 'none' in loopback_mode:
if loopback_mode == 'none':
status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0)
status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0)
status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0)
status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0)
return all([status_host_input, status_host_output, status_media_input, status_media_output])

if loopback_mode == 'host-side-input-none':
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask)

if loopback_mode == 'host-side-output-none':
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask)

if loopback_mode == 'media-side-input-none':
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask)

if loopback_mode == 'media-side-output-none':
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask)
else:
if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
if loopback_mode in ['host-side-input', 'host-side-output'] and \
(status_media_input or status_media_output):
return False

if loopback_mode in ['media-side-input', 'media-side-output'] and \
(status_host_output or status_host_input):
if any([loopback_mode in ['host-side-input', 'host-side-output'] and (media_input_val or media_output_val),
loopback_mode in ['media-side-input', 'media-side-output'] and (host_input_val or host_output_val)]):
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, '
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n'
logger.error(txt)
return False

if loopback_mode == 'host-side-input':
if loopback_capability['host_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, lane_mask)
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask)
elif loopback_mode == 'host-side-output':
if loopback_capability['host_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, lane_mask)
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask)
elif loopback_mode == 'media-side-input':
if loopback_capability['media_side_input_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, lane_mask)
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask)
elif loopback_mode == 'media-side-output':
if loopback_capability['media_side_output_loopback_supported']:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, lane_mask)

return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask)

txt = f'Failed to set {loopback_mode} loopback, lane_mask:{lane_mask:02x}\n'
host_input_support = loopback_capability['host_side_input_loopback_supported']
host_output_support = loopback_capability['host_side_output_loopback_supported']
media_input_support = loopback_capability['media_side_input_loopback_supported']
media_output_support = loopback_capability['media_side_output_loopback_supported']
txt += f'host_input_support:{host_input_support}, host_output_support:{host_output_support}, '
txt += f'media_input_support:{media_input_support}, media_output_support:{media_output_support}\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, '
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n'
logger.error(txt)
return False

def get_vdm(self, field_option=None):
Expand Down
64 changes: 42 additions & 22 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,50 +1052,70 @@ def test_get_loopback_capability(self, mock_response, expected):
result = self.api.get_loopback_capability()
assert result == expected

@pytest.mark.parametrize("input_param, mock_response",[
('none', {
@pytest.mark.parametrize("input_param, mock_response, expected",[
(['none',0x0f], {
'host_side_input_loopback_supported': True,
'host_side_output_loopback_supported': True,
'media_side_input_loopback_supported': True,
'media_side_output_loopback_supported': True,
'simultaneous_host_media_loopback_supported': True
}),
('host-side-input', {
'simultaneous_host_media_loopback_supported': True,
'per_lane_host_loopback_supported': True,
'per_lane_media_loopback_supported': True
}, True),
(['host-side-input', 0x0f], {
'host_side_input_loopback_supported': True,
'host_side_output_loopback_supported': True,
'media_side_input_loopback_supported': True,
'media_side_output_loopback_supported': True,
'simultaneous_host_media_loopback_supported': True
}),
('host-side-output', {
'simultaneous_host_media_loopback_supported': True,
'per_lane_host_loopback_supported': True,
'per_lane_media_loopback_supported': True
}, True),
(['host-side-output', 0x0f], {
'host_side_input_loopback_supported': True,
'host_side_output_loopback_supported': True,
'media_side_input_loopback_supported': True,
'media_side_output_loopback_supported': True,
'simultaneous_host_media_loopback_supported': False
}),
('media-side-input', {
'simultaneous_host_media_loopback_supported': False,
'per_lane_host_loopback_supported': False,
'per_lane_media_loopback_supported': False
}, False),
(['media-side-input', 0x0f], {
'host_side_input_loopback_supported': True,
'host_side_output_loopback_supported': True,
'media_side_input_loopback_supported': True,
'media_side_output_loopback_supported': True,
'simultaneous_host_media_loopback_supported': False
}),
('media-side-output', {
'simultaneous_host_media_loopback_supported': True,
'per_lane_host_loopback_supported': True,
'per_lane_media_loopback_supported': True
}, True),
(['media-side-output', 0x0f], {
'host_side_input_loopback_supported': True,
'host_side_output_loopback_supported': True,
'media_side_input_loopback_supported': True,
'media_side_output_loopback_supported': True,
'simultaneous_host_media_loopback_supported': True
}),
(
'none', None
)
])
def test_set_loopback_mode(self, input_param, mock_response):
'simultaneous_host_media_loopback_supported': False,
'per_lane_host_loopback_supported': False,
'per_lane_media_loopback_supported': False
}, False),
(['media-side-output', 0x0f], {
'host_side_input_loopback_supported': False,
'host_side_output_loopback_supported': False,
'media_side_input_loopback_supported': False,
'media_side_output_loopback_supported': False,
'simultaneous_host_media_loopback_supported': True,
'per_lane_host_loopback_supported': True,
'per_lane_media_loopback_supported': True
}, False),
(['none', 0x0F], None, False)
])
def test_set_loopback_mode(self, input_param, mock_response, expected):
self.api.get_loopback_capability = MagicMock()
self.api.get_loopback_capability.return_value = mock_response
self.api.set_loopback_mode(input_param)
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.side_effect = [0xf0,0,0xf0,0]
result = self.api.set_loopback_mode(input_param[0], input_param[1])
assert result == expected

@pytest.mark.parametrize("mock_response, expected",[
(
Expand Down

0 comments on commit a65614b

Please sign in to comment.