Skip to content

Commit

Permalink
Add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
yejianquan committed Dec 17, 2024
1 parent 9632982 commit 9422030
Showing 1 changed file with 110 additions and 3 deletions.
113 changes: 110 additions & 3 deletions tests/test_rfc2737.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,127 @@

class TestPhysicalTableMIBUpdater(TestCase):

@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration(self, mock_psu_reinit_data):
# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection, and the un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mock_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data):
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
assert str(excinfo.value) == "mocked error"
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(ThermalCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection, and the un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mock_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data):
updater.reinit_data()
assert str(excinfo.value) == "mocked error"
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection, and the un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mock_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
assert str(excinfo.value) == "mocked runtime error"
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection, and the un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mock_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
assert str(excinfo.value) == "mocked runtime error"
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater)
# raises Runtime exception and Exception in the reinit
# Then: The remaining updaters should execute reinit without any affection, and the un-subscription should be called
# Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_multi_exception(self, mock_xcvr_reinit_data, mock_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data):
updater.reinit_data()
assert str(excinfo.value) == "mocked runtime error"
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()

0 comments on commit 9422030

Please sign in to comment.