Skip to content

Commit

Permalink
Fix redis memory leak issue in PhysicalEntityCacheUpdater
Browse files Browse the repository at this point in the history
  • Loading branch information
yejianquan committed Dec 17, 2024
1 parent c5301b2 commit 1aff8bf
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ fabric.properties

gh-release.patch
tests/test_cpuUtilizationHandler.py
tests/test-results.xml
2 changes: 1 addition & 1 deletion src/ax_interface/mib.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def start(self):
redis_exception_happen = False
except RuntimeError:
# Any unexpected exception or error, log it and keep running
logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()")
logger.exception("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
# When redis server restart, swsscommon will throw swsscommon.RedisError, redis connection need re-initialize in reinit_data()
# TODO: change to swsscommon.RedisError
redis_exception_happen = True
Expand Down
15 changes: 15 additions & 0 deletions src/sonic_ax_impl/mibs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,21 @@ def get_redis_pubsub(db_conn, db_name, pattern):
return pubsub


def cancel_redis_pubsub(pubsub, db_conn, db_name, pattern):
db = db_conn.get_dbid(db_name)
logger.debug(f"Cancel subscription {db} {pattern}")
pubsub.punsubscribe("__keyspace@{}__:{}".format(db, pattern))
return pubsub


def clear_pubsub_msg(pubsub):
while True:
msg = pubsub.get_message()
logger.debug("Clearing pubsub {}, get and drop message {}".format(pubsub, msg))
if not msg:
break


class RedisOidTreeUpdater(MIBUpdater):
def __init__(self, prefix_str):
super().__init__()
Expand Down
43 changes: 42 additions & 1 deletion src/sonic_ax_impl/mibs/ietf/rfc2737.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,34 @@ def reinit_data(self):
self.physical_name_map[chassis_mgmt_sub_id] = name
self.physical_fru_map[chassis_mgmt_sub_id] = self.NOT_REPLACEABLE

exceptions = []
has_runtime_err = False
# Catch exception in the iteration
# This makes sure if any exception is raised in the mid of loop
# every updater's reinit_data function will be always called
# So that the redis subscriptions always get chance to be cleaned,
# Otherwise if the exception never recover,
# the redis subscription keeps increasing but never got consumed,
# this causes redis memory leak.
for updater in self.physical_entity_updaters:
updater.reinit_data()
try:
updater.reinit_data()
except BaseException as e:
mibs.logger.error(f"Jianquan {e}, type{type(e)}")
if isinstance(e, RuntimeError):
has_runtime_err = True
# Log traceback so that we know the original error details
mibs.logger.error(e, exc_info=True)
exceptions.append(e)

# The RuntimeError will be considered as Redis connection error
# And will trigger re-init connection, if the exceptions contain any RuntimeError
# We raise runtime error
if exceptions:
if has_runtime_err:
raise RuntimeError(exceptions)
else:
raise Exception(exceptions)

def update_data(self):
# This code is not executed in unit test, since mockredis
Expand Down Expand Up @@ -648,6 +674,21 @@ def __init__(self, mib_updater):
self.entity_to_oid_map = {}

def reinit_data(self):

# Redis subscriptions are established and consumed in update_data,
# but if there's stable exception during update logic,
# the reinit_data will be called, but the update_data is never called.
# The message is sent into subscription queue, but never got consumed,
# this causes Redis memory leaking.
# Hence clear the message in the subscription and cancel the subscription during reinit_data
for db_index in list(self.pub_sub_dict):
pubsub = self.pub_sub_dict[db_index]
db_conn = self.mib_updater.statedb[db_index]
# clear message in the subscription and cancel the subscription
mibs.clear_pubsub_msg(pubsub)
mibs.cancel_redis_pubsub(pubsub, db_conn, db_conn.STATE_DB, self.get_key_pattern())
del self.pub_sub_dict[db_index]

self.entity_to_oid_map.clear()
# retrieve the initial list of entity in db
key_info = Namespace.dbs_keys(self.mib_updater.statedb, mibs.STATE_DB, self.get_key_pattern())
Expand Down
3 changes: 3 additions & 0 deletions tests/mock_tables/dbconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def get_message(self):
def psubscribe(self, *args, **kwargs):
pass

def punsubscribe(self, *args, **kwargs):
pass

def __call__(self, *args, **kwargs):
return self

Expand Down
2 changes: 1 addition & 1 deletion tests/test_rfc1213.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_NextHopUpdater_redis_exception(self):

# check warning
expected = [
mock.call("MIBUpdater.start() caught an unexpected exception during update_data()")
mock.call("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
]
mocked_exception.assert_has_calls(expected)

Expand Down
138 changes: 138 additions & 0 deletions tests/test_rfc2737.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import sys
from unittest import TestCase

import pytest
from sonic_ax_impl.mibs.ietf.rfc2737 import PhysicalTableMIBUpdater


if sys.version_info.major == 3:
from unittest import mock
else:
import mock

modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(modules_path, 'src'))


class TestPhysicalTableMIBUpdater(TestCase):

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(ThermalCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_thermal_reinit_data.assert_called()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater)
# raises Runtime exception and Exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
# Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_multi_exception(self, mocked_xcvr_reinit_data, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error'), Exception('mocked error')]"

0 comments on commit 1aff8bf

Please sign in to comment.