diff --git a/scripts/caclmgrd b/scripts/caclmgrd index 30b166e7..c415992a 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -17,6 +17,9 @@ try: import sys import threading import time + import traceback + import signal + from queue import Empty, Queue from sonic_py_common.general import getstatusoutput_noshell_pipe from sonic_py_common import daemon_base, device_info, multi_asic from swsscommon import swsscommon @@ -129,6 +132,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.update_thread = {} self.lock = {} self.num_changes = {} + self.stop_event = threading.Event() # Initialize update-thread-specific data for default namespace self.update_thread[DEFAULT_NAMESPACE] = None @@ -707,7 +711,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): table_ip_version = 4 # Read DST_PORT info from Config DB, insert it back to ACL_SERVICES - if acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT" in rule_props: + if acl_service == 'EXTERNAL_CLIENT':# and "L4_DST_PORT" in rule_props: dst_ports = [rule_props["L4_DST_PORT"]] self.ACL_SERVICES[acl_service]["dst_ports"] = dst_ports elif acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT_RANGE" in rule_props: @@ -852,7 +856,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.run_commands(dualtor_iptables_cmds) - def check_and_update_control_plane_acls(self, namespace, num_changes): + def check_and_update_control_plane_acls(self, namespace, num_changes, exception_queue): """ This function is intended to be spawned in a separate thread. Its purpose is to prevent unnecessary iptables updates if we receive @@ -869,6 +873,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): while True: # Sleep for our delay interval time.sleep(self.UPDATE_DELAY_SECS) + self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace)) with self.lock[namespace]: if self.num_changes[namespace] > num_changes: @@ -890,6 +895,18 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.num_changes[namespace] = 0 self.update_thread[namespace] = None return + except Exception as e: + self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + #self.stop_event.set() + exc_info = traceback.format_exc() + exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue + self.log_error("Exiting thread {}, put it into exception_queue {}".format( + threading.current_thread().getName(), exception_queue)) finally: new_config_db_connector.close("CONFIG_DB") @@ -988,6 +1005,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # set up state_db connector state_db_connector = swsscommon.DBConnector("STATE_DB", 0) config_db_connector = swsscommon.DBConnector("CONFIG_DB", 0) + exception_queue = Queue() if self.DualToR: self.log_info("Dual ToR mode") @@ -1031,103 +1049,121 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Get the ACL rule table seprator acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator() + try: + # Loop on select to see if any event happen on state db or config db of any namespace + while True: + # Periodically check for exceptions from child threads + try: + namespace, error, _ = exception_queue.get_nowait() # Non-blocking + self.log_error(f"Exception in namespace '{namespace}': {error}") + self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName())) + os.kill(os.getpid(), signal.SIGKILL) + except Empty: + # No exceptions in the queue + pass + (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) + # Continue if select is timeout or selectable object is not return + if state != swsscommon.Select.OBJECT: + continue - # Loop on select to see if any event happen on state db or config db of any namespace - while True: - (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) - # Continue if select is timeout or selectable object is not return - if state != swsscommon.Select.OBJECT: - continue - - # Get the redisselect object from selectable object - redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - - # Get the corresponding namespace and db_id from redisselect - namespace = redisSelectObj.getDbConnector().getNamespace() - db_id = redisSelectObj.getDbConnector().getDbId() - - if db_id == state_db_id: - while True: - key, op, fvs = subscribe_bfd_session.pop() - if not key: - break + # Get the redisselect object from selectable object + redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - if op == 'SET' and not self.bfdAllowed: - self.allow_bfd_protocol(namespace) - self.bfdAllowed = True - sel.removeSelectable(subscribe_bfd_session) + # Get the corresponding namespace and db_id from redisselect + namespace = redisSelectObj.getDbConnector().getNamespace() + db_id = redisSelectObj.getDbConnector().getDbId() - if self.DualToR: - '''dhcp packet mark update''' + if db_id == state_db_id: while True: - key, op, fvs = subscribe_dhcp_packet_mark.pop() + key, op, fvs = subscribe_bfd_session.pop() if not key: break - self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) - '''initial value is None''' - pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - cur_mark = None if op == 'DEL' else dict(fvs)['mark'] - dhcp_packet_mark_tbl[key] = cur_mark - self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) + if op == 'SET' and not self.bfdAllowed: + self.allow_bfd_protocol(namespace) + self.bfdAllowed = True + sel.removeSelectable(subscribe_bfd_session) + + if self.DualToR: + '''dhcp packet mark update''' + while True: + key, op, fvs = subscribe_dhcp_packet_mark.pop() + if not key: + break + self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) + + '''initial value is None''' + pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + cur_mark = None if op == 'DEL' else dict(fvs)['mark'] + dhcp_packet_mark_tbl[key] = cur_mark + self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) + + '''mux cable update''' + while True: + key, op, fvs = subscribe_mux_cable.pop() + if not key: + break + self.log_info("mux cable update : '%s'" % str((key, op, fvs))) + + mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + self.update_dhcp_acl(key, op, dict(fvs), mark) + continue - '''mux cable update''' + if db_id == config_db_id: while True: - key, op, fvs = subscribe_mux_cable.pop() + key, op, fvs = subscribe_vxlan_table.pop() if not key: break - self.log_info("mux cable update : '%s'" % str((key, op, fvs))) + if op == 'SET' and not self.VxlanAllowed: + self.allow_vxlan_port(namespace, fvs) + elif op == 'DEL' and self.VxlanAllowed: + self.block_vxlan_port(namespace) - mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - self.update_dhcp_acl(key, op, dict(fvs), mark) - continue + ctrl_plane_acl_notification = set() - if db_id == config_db_id: - while True: - key, op, fvs = subscribe_vxlan_table.pop() - if not key: - break - if op == 'SET' and not self.VxlanAllowed: - self.allow_vxlan_port(namespace, fvs) - elif op == 'DEL' and self.VxlanAllowed: - self.block_vxlan_port(namespace) - - ctrl_plane_acl_notification = set() - - # Pop data of both Subscriber Table object of namespace that got config db acl table event - for table in config_db_subscriber_table_map[namespace]: - while True: - (key, op, fvp) = table.pop() - # Pop of table that does not have data so break - if key == '': - break - # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event - # This can be optimize further but we should not have many acl table set/del events in normal - # scenario - if acl_rule_table_seprator not in key: - ctrl_plane_acl_notification.add(namespace) - # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane - else: - acl_table = key.split(acl_rule_table_seprator)[0] - if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: + # Pop data of both Subscriber Table object of namespace that got config db acl table event + for table in config_db_subscriber_table_map[namespace]: + while True: + (key, op, fvp) = table.pop() + # Pop of table that does not have data so break + if key == '': + break + # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event + # This can be optimize further but we should not have many acl table set/del events in normal + # scenario + if acl_rule_table_seprator not in key: ctrl_plane_acl_notification.add(namespace) - - # Update the Control Plane ACL of the namespace that got config db acl table event - for namespace in ctrl_plane_acl_notification: - with self.lock[namespace]: - if self.num_changes[namespace] == 0: - self.log_info("ACL change detected for namespace '{}'".format(namespace)) - - # Increment the number of change events we've received for this namespace - self.num_changes[namespace] += 1 - - # If an update thread is not already spawned for the namespace which we received - # the ACL table update event, spawn one now - if not self.update_thread[namespace]: - self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) - self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, - args=(namespace, self.num_changes[namespace])) - self.update_thread[namespace].start() + # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane + else: + acl_table = key.split(acl_rule_table_seprator)[0] + if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: + ctrl_plane_acl_notification.add(namespace) + + # Update the Control Plane ACL of the namespace that got config db acl table event + for namespace in ctrl_plane_acl_notification: + with self.lock[namespace]: + if self.num_changes[namespace] == 0: + self.log_info("ACL change detected for namespace '{}'".format(namespace)) + + # Increment the number of change events we've received for this namespace + self.num_changes[namespace] += 1 + + # If an update thread is not already spawned for the namespace which we received + # the ACL table update event, spawn one now + if not self.update_thread[namespace]: + self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) + self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, + args=(namespace, self.num_changes[namespace], exception_queue)) + self.update_thread[namespace].start() + except Exception as e: + self.log_error("Exception occured at main thread due to {}".format(repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + self.log_error("Catch exception in main thread, generating SIGKILL for main thread") + os.kill(os.getpid(), signal.SIGKILL) # ============================= Functions =============================