From 6a5c96d167996620eebb600672d4b508b5b2df6b Mon Sep 17 00:00:00 2001 From: Jianquan Ye Date: Thu, 19 Dec 2024 05:13:06 +1000 Subject: [PATCH] Fix redis memory leak issue in PhysicalEntityCacheUpdater (#343) - What I did Fixes the redis memory leak bug: #342 There's chance that the physical_entity_updater creates subscriptions to redis, and never consume the messages due to exceptions. Then the memory buffer(omem) of redis client starts to increase and never end, redis memory leaks. The reason is all 5 physical entity cache updaters inherit from PhysicalEntityCacheUpdater. In the first update_data, they initialize the psubscription to redis database. self.pub_sub_dict[db_index] = mibs.get_redis_pubsub(db, db.STATE_DB, self.get_key_pattern()) And everytime when the update_data is called again, it get the message from the psub and process. msg = pubsub.get_message() And outside, in the logic of the MIBUpdater, it calls update_data more frequently than reinit_data. A side-effect is, if reinit_data failed forever, the update_counter will not been cleaned, then update_data will not be called forever. self.update_counter = 0 So the problem is, at the begining, the psub is created at the first update_data and all things work well, until an unrecoverable issue happened, PHYSICAL_ENTITY_INFO|PSU * missing in the database (it's a pmon issue) This causes both reinit_data and update_data to be failed, because all of them finally call _update_per_namespace_data, which tries to cast an empty string '' to int and raises ValueError. Then the update_data is not called forever, because reinit_data will never success. But the previously established psubscription is still there, and no one gonna to consume it(the update_data is blocked), then Redis database memory starts to slowly leak. - How I did it Catch the exception during the loop of reinit_data, make sure the reinit_data of every physical_entity_updater will be called Clear message and cancel the subscription in the reinit_data, avoid the message accumulates in the redis subscription queue - How to verify it Tested on Cisco chassis, the memory is not leaking anymore. --- .gitignore | 1 + src/ax_interface/mib.py | 2 +- src/sonic_ax_impl/mibs/__init__.py | 15 +++ src/sonic_ax_impl/mibs/ietf/rfc2737.py | 42 +++++++- tests/mock_tables/dbconnector.py | 3 + tests/test_rfc1213.py | 2 +- tests/test_rfc2737.py | 138 +++++++++++++++++++++++++ 7 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 tests/test_rfc2737.py diff --git a/.gitignore b/.gitignore index b9caeccda..8db9a37d1 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,4 @@ fabric.properties gh-release.patch tests/test_cpuUtilizationHandler.py +tests/test-results.xml diff --git a/src/ax_interface/mib.py b/src/ax_interface/mib.py index 404e2916a..43bf9b478 100644 --- a/src/ax_interface/mib.py +++ b/src/ax_interface/mib.py @@ -49,7 +49,7 @@ async def start(self): redis_exception_happen = False except RuntimeError: # Any unexpected exception or error, log it and keep running - logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()") + logger.exception("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections") # When redis server restart, swsscommon will throw swsscommon.RedisError, redis connection need re-initialize in reinit_data() # TODO: change to swsscommon.RedisError redis_exception_happen = True diff --git a/src/sonic_ax_impl/mibs/__init__.py b/src/sonic_ax_impl/mibs/__init__.py index 38c70b113..1fe6264ec 100644 --- a/src/sonic_ax_impl/mibs/__init__.py +++ b/src/sonic_ax_impl/mibs/__init__.py @@ -496,6 +496,21 @@ def get_redis_pubsub(db_conn, db_name, pattern): return pubsub +def cancel_redis_pubsub(pubsub, db_conn, db_name, pattern): + db = db_conn.get_dbid(db_name) + logger.debug(f"Cancel subscription {db} {pattern}") + pubsub.punsubscribe("__keyspace@{}__:{}".format(db, pattern)) + return pubsub + + +def clear_pubsub_msg(pubsub): + while True: + msg = pubsub.get_message() + logger.debug("Clearing pubsub {}, get and drop message {}".format(pubsub, msg)) + if not msg: + break + + class RedisOidTreeUpdater(MIBUpdater): def __init__(self, prefix_str): super().__init__() diff --git a/src/sonic_ax_impl/mibs/ietf/rfc2737.py b/src/sonic_ax_impl/mibs/ietf/rfc2737.py index d9e5fdec2..0ba772608 100644 --- a/src/sonic_ax_impl/mibs/ietf/rfc2737.py +++ b/src/sonic_ax_impl/mibs/ietf/rfc2737.py @@ -318,8 +318,33 @@ def reinit_data(self): self.physical_name_map[chassis_mgmt_sub_id] = name self.physical_fru_map[chassis_mgmt_sub_id] = self.NOT_REPLACEABLE + exceptions = [] + has_runtime_err = False + # Catch exception in the iteration + # This makes sure if any exception is raised in the mid of loop + # every updater's reinit_data function will be always called + # So that the redis subscriptions always get chance to be cleaned, + # Otherwise if the exception never recover, + # the redis subscription keeps increasing but never got consumed, + # this causes redis memory leak. for updater in self.physical_entity_updaters: - updater.reinit_data() + try: + updater.reinit_data() + except BaseException as e: + if isinstance(e, RuntimeError): + has_runtime_err = True + # Log traceback so that we know the original error details + mibs.logger.error(e, exc_info=True) + exceptions.append(e) + + # The RuntimeError will be considered as Redis connection error + # And will trigger re-init connection, if the exceptions contain any RuntimeError + # We raise runtime error + if exceptions: + if has_runtime_err: + raise RuntimeError(exceptions) + else: + raise Exception(exceptions) def update_data(self): # This code is not executed in unit test, since mockredis @@ -648,6 +673,21 @@ def __init__(self, mib_updater): self.entity_to_oid_map = {} def reinit_data(self): + + # Redis subscriptions are established and consumed in update_data, + # but if there's stable exception during update logic, + # the reinit_data will be called, but the update_data is never called. + # The message is sent into subscription queue, but never got consumed, + # this causes Redis memory leaking. + # Hence clear the message in the subscription and cancel the subscription during reinit_data + for db_index in list(self.pub_sub_dict): + pubsub = self.pub_sub_dict[db_index] + db_conn = self.mib_updater.statedb[db_index] + # clear message in the subscription and cancel the subscription + mibs.clear_pubsub_msg(pubsub) + mibs.cancel_redis_pubsub(pubsub, db_conn, db_conn.STATE_DB, self.get_key_pattern()) + del self.pub_sub_dict[db_index] + self.entity_to_oid_map.clear() # retrieve the initial list of entity in db key_info = Namespace.dbs_keys(self.mib_updater.statedb, mibs.STATE_DB, self.get_key_pattern()) diff --git a/tests/mock_tables/dbconnector.py b/tests/mock_tables/dbconnector.py index 6a5cbd997..e4cf95daa 100644 --- a/tests/mock_tables/dbconnector.py +++ b/tests/mock_tables/dbconnector.py @@ -83,6 +83,9 @@ def get_message(self): def psubscribe(self, *args, **kwargs): pass + def punsubscribe(self, *args, **kwargs): + pass + def __call__(self, *args, **kwargs): return self diff --git a/tests/test_rfc1213.py b/tests/test_rfc1213.py index af5c2c936..0f8e7e2a6 100644 --- a/tests/test_rfc1213.py +++ b/tests/test_rfc1213.py @@ -77,7 +77,7 @@ def test_NextHopUpdater_redis_exception(self): # check warning expected = [ - mock.call("MIBUpdater.start() caught an unexpected exception during update_data()") + mock.call("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections") ] mocked_exception.assert_has_calls(expected) diff --git a/tests/test_rfc2737.py b/tests/test_rfc2737.py new file mode 100644 index 000000000..8054df2c9 --- /dev/null +++ b/tests/test_rfc2737.py @@ -0,0 +1,138 @@ +import os +import sys +from unittest import TestCase + +import pytest +from sonic_ax_impl.mibs.ietf.rfc2737 import PhysicalTableMIBUpdater + + +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + +modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(modules_path, 'src')) + + +class TestPhysicalTableMIBUpdater(TestCase): + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first updater(XcvrCacheUpdater) raises exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_xcvr_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(Exception) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[Exception('mocked error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The last updater(ThermalCacheUpdater) raises exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(Exception) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[Exception('mocked error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as excinfo, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch( + 'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_thermal_reinit_data.assert_called() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(excinfo.value) == "[RuntimeError('mocked runtime error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_xcvr_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as exc_info, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(exc_info.value) == "[RuntimeError('mocked runtime error')]" + + # Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater + # When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater) + # raises Runtime exception and Exception in the reinit + # Then: The remaining updaters should execute reinit without any affection, + # and the redis un-subscription should be called + # Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error')) + @mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error')) + def test_PhysicalTableMIBUpdater_multi_exception(self, mocked_xcvr_reinit_data, mocked_thermal_reinit_data): + updater = PhysicalTableMIBUpdater() + + with (pytest.raises(RuntimeError) as exc_info, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data, + mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data, + mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub): + updater.reinit_data() + mocked_xcvr_reinit_data.assert_called() + mocked_psu_reinit_data.assert_called() + mocked_fan_drawer_reinit_data.assert_called() + mocked_fan_cache_reinit_data.assert_called() + mocked_thermal_reinit_data.assert_called() + mocked_cancel_redis_pubsub.assert_called() + assert str(exc_info.value) == "[RuntimeError('mocked runtime error'), Exception('mocked error')]"