Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis memory leak issue in PhysicalEntityCacheUpdater #343

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ fabric.properties

gh-release.patch
tests/test_cpuUtilizationHandler.py
tests/test-results.xml
2 changes: 1 addition & 1 deletion src/ax_interface/mib.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def start(self):
redis_exception_happen = False
except RuntimeError:
# Any unexpected exception or error, log it and keep running
logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()")
logger.exception("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
# When redis server restart, swsscommon will throw swsscommon.RedisError, redis connection need re-initialize in reinit_data()
# TODO: change to swsscommon.RedisError
redis_exception_happen = True
Expand Down
15 changes: 15 additions & 0 deletions src/sonic_ax_impl/mibs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,21 @@ def get_redis_pubsub(db_conn, db_name, pattern):
return pubsub


def cancel_redis_pubsub(pubsub, db_conn, db_name, pattern):
db = db_conn.get_dbid(db_name)
logger.debug(f"Cancel subscription {db} {pattern}")
pubsub.punsubscribe("__keyspace@{}__:{}".format(db, pattern))
return pubsub


def clear_pubsub_msg(pubsub):
while True:
msg = pubsub.get_message()
logger.debug("Clearing pubsub {}, get and drop message {}".format(pubsub, msg))
if not msg:
break


class RedisOidTreeUpdater(MIBUpdater):
def __init__(self, prefix_str):
super().__init__()
Expand Down
43 changes: 42 additions & 1 deletion src/sonic_ax_impl/mibs/ietf/rfc2737.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,34 @@ def reinit_data(self):
self.physical_name_map[chassis_mgmt_sub_id] = name
self.physical_fru_map[chassis_mgmt_sub_id] = self.NOT_REPLACEABLE

exceptions = []
has_runtime_err = False
# Catch exception in the iteration
# This makes sure if any exception is raised in the mid of loop
# every updater's reinit_data function will be always called
# So that the redis subscriptions always get chance to be cleaned,
# Otherwise if the exception never recover,
# the redis subscription keeps increasing but never got consumed,
# this causes redis memory leak.
for updater in self.physical_entity_updaters:
updater.reinit_data()
try:
updater.reinit_data()
except BaseException as e:
mibs.logger.error(f"Jianquan {e}, type{type(e)}")
yejianquan marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(e, RuntimeError):
has_runtime_err = True
# Log traceback so that we know the original error details
mibs.logger.error(e, exc_info=True)
exceptions.append(e)

# The RuntimeError will be considered as Redis connection error
# And will trigger re-init connection, if the exceptions contain any RuntimeError
# We raise runtime error
if exceptions:
if has_runtime_err:
raise RuntimeError(exceptions)
else:
raise Exception(exceptions)

def update_data(self):
# This code is not executed in unit test, since mockredis
Expand Down Expand Up @@ -648,6 +674,21 @@ def __init__(self, mib_updater):
self.entity_to_oid_map = {}

def reinit_data(self):

# Redis subscriptions are established and consumed in update_data,
# but if there's stable exception during update logic,
# the reinit_data will be called, but the update_data is never called.
# The message is sent into subscription queue, but never got consumed,
# this causes Redis memory leaking.
# Hence clear the message in the subscription and cancel the subscription during reinit_data
for db_index in list(self.pub_sub_dict):
pubsub = self.pub_sub_dict[db_index]
db_conn = self.mib_updater.statedb[db_index]
# clear message in the subscription and cancel the subscription
mibs.clear_pubsub_msg(pubsub)
SuvarnaMeenakshi marked this conversation as resolved.
Show resolved Hide resolved
mibs.cancel_redis_pubsub(pubsub, db_conn, db_conn.STATE_DB, self.get_key_pattern())
del self.pub_sub_dict[db_index]

self.entity_to_oid_map.clear()
# retrieve the initial list of entity in db
key_info = Namespace.dbs_keys(self.mib_updater.statedb, mibs.STATE_DB, self.get_key_pattern())
Expand Down
3 changes: 3 additions & 0 deletions tests/mock_tables/dbconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def get_message(self):
def psubscribe(self, *args, **kwargs):
pass

def punsubscribe(self, *args, **kwargs):
pass

def __call__(self, *args, **kwargs):
return self

Expand Down
2 changes: 1 addition & 1 deletion tests/test_rfc1213.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_NextHopUpdater_redis_exception(self):

# check warning
expected = [
mock.call("MIBUpdater.start() caught an unexpected exception during update_data()")
mock.call("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
]
mocked_exception.assert_has_calls(expected)

Expand Down
138 changes: 138 additions & 0 deletions tests/test_rfc2737.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import sys
from unittest import TestCase

import pytest
from sonic_ax_impl.mibs.ietf.rfc2737 import PhysicalTableMIBUpdater


if sys.version_info.major == 3:
from unittest import mock
else:
import mock

modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(modules_path, 'src'))


class TestPhysicalTableMIBUpdater(TestCase):

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(ThermalCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_thermal_reinit_data.assert_called()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater)
# raises Runtime exception and Exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
# Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_multi_exception(self, mocked_xcvr_reinit_data, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error'), Exception('mocked error')]"
Loading