diff --git a/pyproject.toml b/pyproject.toml index d52bbe61c..5cda3fc51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ watchdog-gevent = "^0.1.1" pip = "22.2.2" pytest-timeout = "^2.1.0" pytest-mock = "^3.10.0" +deprecated = "^1.2.14" [tool.poetry.group.dev.dependencies] pytest = "^6.2.5" diff --git a/src/volttron/client/commands/control.py b/src/volttron/client/commands/control.py index 3364038a0..f534d2bae 100644 --- a/src/volttron/client/commands/control.py +++ b/src/volttron/client/commands/control.py @@ -1553,7 +1553,7 @@ def add_config_to_store(opts): file_contents = opts.infile.read() call( - "manage_store", + "set_config", opts.identity, opts.name, file_contents, @@ -1565,14 +1565,14 @@ def delete_config_from_store(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call if opts.delete_store: - call("manage_delete_store", opts.identity) + call("delete_store", opts.identity) return if opts.name is None: _stderr.write("ERROR: must specify a configuration when not deleting entire store\n") return - call("manage_delete_config", opts.identity, opts.name) + call("delete_config", opts.identity, opts.name) def list_store(opts): @@ -1580,9 +1580,9 @@ def list_store(opts): call = opts.connection.call results = [] if opts.identity is None: - results = call("manage_list_stores") + results = call("list_stores") else: - results = call("manage_list_configs", opts.identity) + results = call("list_configs", opts.identity) for item in results: _stdout.write(item + "\n") @@ -1591,7 +1591,7 @@ def list_store(opts): def get_config(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call - results = call("manage_get", opts.identity, opts.name, raw=opts.raw) + results = call("get_config", opts.identity, opts.name, raw=opts.raw) if opts.raw: _stdout.write(results) @@ -1612,7 +1612,7 @@ def edit_config(opts): raw_data = "" else: try: - results = call("manage_get_metadata", opts.identity, opts.name) + results = call("get_metadata", opts.identity, opts.name) config_type = results["type"] raw_data = results["data"] except RemoteError as e: @@ -1648,7 +1648,7 @@ def edit_config(opts): return call( - "manage_store", + "set_config", opts.identity, opts.name, new_raw_data, diff --git a/src/volttron/client/vip/agent/subsystems/configstore.py b/src/volttron/client/vip/agent/subsystems/configstore.py index dc6470698..c59a873e6 100644 --- a/src/volttron/client/vip/agent/subsystems/configstore.py +++ b/src/volttron/client/vip/agent/subsystems/configstore.py @@ -30,7 +30,7 @@ from .base import SubsystemBase from volttron.utils.storeutils import list_unique_links, check_for_config_link - +from volttron.utils import jsonapi # from volttron.client.storeutils import list_unique_links, check_for_config_link from volttron.client.vip.agent import errors from volttron.client.known_identities import CONFIGURATION_STORE @@ -47,7 +47,7 @@ _log = logging.getLogger(__name__) -VALID_ACTIONS = set(["NEW", "UPDATE", "DELETE"]) +VALID_ACTIONS = ("NEW", "UPDATE", "DELETE") class ConfigStore(SubsystemBase): @@ -68,6 +68,7 @@ def __init__(self, owner, core, rpc): self._initial_callbacks_called = False self._process_callbacks_code_object = self._process_callbacks.__code__ + self.vip_identity = self._core().identity def sub_factory(): return defaultdict(set) @@ -77,6 +78,8 @@ def sub_factory(): def onsetup(sender, **kwargs): rpc.export(self._update_config, "config.update") rpc.export(self._initial_update, "config.initial_update") + rpc.allow("config.update", "sync_agent_config") + rpc.allow("config.initial_update", "sync_agent_config") core.onsetup.connect(onsetup, self) core.configuration.connect(self._onconfig, self) @@ -84,7 +87,7 @@ def onsetup(sender, **kwargs): def _onconfig(self, sender, **kwargs): if not self._initialized: try: - self._rpc().call(CONFIGURATION_STORE, "get_configs").get() + self._rpc().call(CONFIGURATION_STORE, "initialize_configs", self.vip_identity).get() except errors.Unreachable as e: _log.error("Connected platform does not support the Configuration Store feature.") return @@ -144,8 +147,7 @@ def _process_links(self, config_contents, already_gathered): elif isinstance(value, str): config_name = check_for_config_link(value) if config_name is not None: - config_contents[key] = self._gather_child_configs( - config_name, already_gathered) + config_contents[key] = self._gather_child_configs(config_name, already_gathered) elif isinstance(config_contents, list): for i, value in enumerate(config_contents): if isinstance(value, (dict, list)): @@ -153,8 +155,7 @@ def _process_links(self, config_contents, already_gathered): elif isinstance(value, str): config_name = check_for_config_link(value) if config_name is not None: - config_contents[i] = self._gather_child_configs( - config_name, already_gathered) + config_contents[i] = self._gather_child_configs(config_name, already_gathered) def _gather_child_configs(self, config_name, already_gathered): if config_name in already_gathered: @@ -285,7 +286,7 @@ def list(self): # Handle case were we are called during "onstart". if not self._initialized: try: - self._rpc().call(CONFIGURATION_STORE, "get_configs").get() + self._rpc().call(CONFIGURATION_STORE, "initialize_configs", self.vip_identity).get() except errors.Unreachable as e: _log.error("Connected platform does not support the Configuration Store feature.") except errors.VIPError as e: @@ -317,7 +318,7 @@ def get(self, config_name="config"): # may be a default configuration to grab. if not self._initialized: try: - self._rpc().call(CONFIGURATION_STORE, "get_configs").get() + self._rpc().call(CONFIGURATION_STORE, "initialize_configs", self.vip_identity).get() except errors.Unreachable as e: _log.error("Connected platform does not support the Configuration Store feature.") except errors.VIPError as e: @@ -333,9 +334,7 @@ def _check_call_from_process_callbacks(self): # Don't create any unneeded references to frame objects. for frame, *_ in frame_records: if self._process_callbacks_code_object is frame.f_code: - raise RuntimeError( - "Cannot request changes to the config store from a configuration callback." - ) + raise RuntimeError("Cannot request changes to the config store from a configuration callback.") finally: del frame_records @@ -349,6 +348,8 @@ def set(self, config_name, contents, trigger_callback=False, send_update=True): :param config_name: Name of configuration to add to store. :param contents: Contents of the configuration. May be a string, dictionary, or list. :param trigger_callback: Tell the platform to trigger callbacks on the agent for this change. + :param send_update: Boolean flag to tell the server if it should call config.update on this agent + after server side update is done :type config_name: str :type contents: str, dict, list @@ -356,14 +357,17 @@ def set(self, config_name, contents, trigger_callback=False, send_update=True): """ self._check_call_from_process_callbacks() - self._rpc().call( - CONFIGURATION_STORE, - "set_config", - config_name, - contents, - trigger_callback=trigger_callback, - send_update=send_update, - ).get(timeout=10.0) + if isinstance(contents, (dict, list)): + config_type = 'json' + raw_data = jsonapi.dumps(contents) + elif isinstance(contents, str): + config_type = 'raw' + raw_data = contents + else: + raise ValueError("Unsupported configuration content type: {}".format(str(type(contents)))) + + self._rpc().call(CONFIGURATION_STORE, "set_config", self.vip_identity, config_name, raw_data, + config_type, trigger_callback=trigger_callback, send_update=send_update).get(timeout=10.0) def set_default(self, config_name, contents): """Called to set the contents of a default configuration file. Default configurations are used if the @@ -422,19 +426,16 @@ def delete(self, config_name, trigger_callback=False, send_update=True): """ self._check_call_from_process_callbacks() - self._rpc().call( - CONFIGURATION_STORE, - "delete_config", - config_name, - trigger_callback=trigger_callback, - send_update=send_update, - ).get(timeout=10.0) + self._rpc().call(CONFIGURATION_STORE, "delete_config", self.vip_identity, config_name, + trigger_callback=trigger_callback, + send_update=send_update).get(timeout=10.0) def subscribe(self, callback, actions=VALID_ACTIONS, pattern="*"): """Subscribe to changes to a configuration. :param callback: Function to call in response to changes to a configuration. - :param actions: Change actions to respond to. Valid values are "NEW", "UPDATE", and "DELETE". May be a single action or a list of actions. + :param actions: Change actions to respond to. Valid values are "NEW", "UPDATE", and "DELETE". + Maybe a single action or a list of actions. :param pattern: Configuration name pattern to match to. Uses Unix style filename pattern matching. :type callback: str @@ -446,9 +447,9 @@ def subscribe(self, callback, actions=VALID_ACTIONS, pattern="*"): actions = set(action.upper() for action in actions) - invalid_actions = actions - VALID_ACTIONS + invalid_actions = actions - set(VALID_ACTIONS) if invalid_actions: - raise ValueError("Invalid actions: " + list(invalid_actions)) + raise ValueError(f"Invalid actions: {invalid_actions}") pattern = pattern.lower() diff --git a/src/volttron/services/auth/auth_service.py b/src/volttron/services/auth/auth_service.py index b119f2eae..27ae19978 100644 --- a/src/volttron/services/auth/auth_service.py +++ b/src/volttron/services/auth/auth_service.py @@ -135,6 +135,17 @@ def topics(): return defaultdict(set) self._user_to_permissions = topics() + entry = AuthEntry( + credentials=self.core.publickey, + user_id=self.core.identity, + capabilities=[{ + "edit_config_store": { + "identity": self.core.identity + } + }], + comments="Automatically added by init of auth service" + ) + AuthFile().add(entry, overwrite=True) @Core.receiver("onsetup") def setup_zap(self, sender, **kwargs): @@ -1154,7 +1165,7 @@ def __init__(self, auth_file=None): @property def version(self): - return {"major": 1, "minor": 2} + return {"major": 1, "minor": 3} def _check_for_upgrade(self): allow_list, deny_list, groups, roles, version = self._read() @@ -1290,6 +1301,10 @@ def upgrade_1_1_to_1_2(allow_list): version["minor"] = 1 if version["major"] == 1 and version["minor"] == 1: allow_list = upgrade_1_1_to_1_2(allow_list) + if version["major"] == 1 and version["minor"] == 2: + # on start a new entry for config.store should have got created automatically + # so just update version + version["minor"] = 3 allow_entries, deny_entries = self._get_entries(allow_list, deny_list) self._write(allow_entries, deny_entries, groups, roles) diff --git a/src/volttron/services/config_store/config_store_service.py b/src/volttron/services/config_store/config_store_service.py index 467410e4f..e66ec2070 100644 --- a/src/volttron/services/config_store/config_store_service.py +++ b/src/volttron/services/config_store/config_store_service.py @@ -29,7 +29,7 @@ import errno from csv import DictReader from io import StringIO - +from deprecated import deprecated import gevent from gevent.lock import Semaphore @@ -47,6 +47,7 @@ from volttron.utils.jsonrpc import RemoteError, MethodNotFound from volttron.utils.storeutils import check_for_recursion, strip_config_name, store_ext from volttron.client.vip.agent import Agent, Core, RPC, Unreachable, VIPError +from volttron.services.auth.auth_service import AuthFile, AuthEntry _log = logging.getLogger(__name__) @@ -68,8 +69,7 @@ def process_store(identity, store): raise ValueError("Recursive configuration references") results[config_name] = processed_config except ValueError as e: - _log.error("Error processing Agent {} config {}: {}".format( - identity, config_name, str(e))) + _log.error("Error processing Agent {} config {}: {}".format(identity, config_name, str(e))) sync_store = True del store[config_name] @@ -107,6 +107,7 @@ def process_raw_config(config_string, config_type="raw"): class ConfigStoreService(ServiceInterface): def __init__(self, **kwargs): + kwargs["enable_store"] = False super(ConfigStoreService, self).__init__(**kwargs) # This agent is started before the router so we need @@ -115,6 +116,14 @@ def __init__(self, **kwargs): self.store = {} self.store_path = os.path.join(os.environ["VOLTTRON_HOME"], "configuration_store") + entry = AuthEntry( + credentials=self.core.publickey, + user_id=self.core.identity, + capabilities="sync_agent_config", + comments="Automatically added by config store service" + ) + AuthFile().add(entry, overwrite=True) + @Core.receiver("onsetup") def _setup(self, sender, **kwargs): @@ -149,25 +158,50 @@ def _setup(self, sender, **kwargs): @RPC.export @RPC.allow("edit_config_store") - def manage_store(self, identity, config_name, raw_contents, config_type="raw"): + @deprecated(reason="Use set_config") + def manage_store(self, identity, config_name, raw_contents, config_type="raw", trigger_callback=True, + send_update=True): + """ + This method is deprecated and will be removed in VOLTTRON 10. Please use set_config instead + """ contents = process_raw_config(raw_contents, config_type) - self._add_config_to_store( - identity, - config_name, - raw_contents, - contents, - config_type, - trigger_callback=True, - ) + self._add_config_to_store(identity, config_name, raw_contents, contents, config_type, + trigger_callback=trigger_callback, send_update=send_update) @RPC.export - @RPC.allow("edit_config_store") - def manage_delete_config(self, identity, config_name): - self.delete(identity, config_name, trigger_callback=True) + @RPC.allow('edit_config_store') + def set_config(self, identity, config_name, raw_contents, config_type="raw", trigger_callback=True, + send_update=True): + contents = process_raw_config(raw_contents, config_type) + self._add_config_to_store(identity, config_name, raw_contents, contents, config_type, + trigger_callback=trigger_callback, send_update=send_update) @RPC.export - @RPC.allow("edit_config_store") + @RPC.allow('edit_config_store') + @deprecated(reason="Use delete_config") + def manage_delete_config(self, identity, config_name, trigger_callback=True, send_update=True): + """ + This method is deprecated and will be removed in VOLTTRON 10. Please use delete_config instead + """ + self.delete(identity, config_name, trigger_callback=trigger_callback, send_update=send_update) + + @RPC.export + @RPC.allow('edit_config_store') + def delete_config(self, identity, config_name, trigger_callback=True, send_update=True): + self.delete(identity, config_name, trigger_callback=trigger_callback, send_update=send_update) + + @RPC.export + @RPC.allow('edit_config_store') + @deprecated(reason="Use delete_store") def manage_delete_store(self, identity): + """ + This method is deprecated and will be removed in VOLTTRON 10. Please use delete_store instead + """ + self.delete_store(identity) + + @RPC.export + @RPC.allow('edit_config_store') + def delete_store(self, identity): agent_store = self.store.get(identity) if agent_store is None: return @@ -184,21 +218,20 @@ def manage_delete_store(self, identity): # Sync will delete the file if the store is empty. agent_disk_store.async_sync() - with agent_store_lock: - try: - self.vip.rpc.call(identity, - "config.update", - "DELETE_ALL", - None, - trigger_callback=True).get(timeout=UPDATE_TIMEOUT) - except Unreachable: - _log.debug("Agent {} not currently running. Configuration update not sent.".format( - identity)) - except RemoteError as e: - _log.error("Agent {} failure when all configurations: {}".format(identity, e)) - except MethodNotFound as e: - _log.error("Agent {} failure when deleting configuration store: {}".format( - identity, e)) + if identity in self.vip.peerlist.peers_list: + with agent_store_lock: + try: + self.vip.rpc.call(identity, + "config.update", + "DELETE_ALL", + None, + trigger_callback=True).get(timeout=UPDATE_TIMEOUT) + except Unreachable: + _log.debug("Agent {} not currently running. Configuration update not sent.".format(identity)) + except RemoteError as e: + _log.error("Agent {} failure when all configurations: {}".format(identity, e)) + except MethodNotFound as e: + _log.error("Agent {} failure when deleting configuration store: {}".format(identity, e)) # If the store is still empty (nothing jumped in and added to it while # we were informing the agent) then remove it from the global store. @@ -206,23 +239,46 @@ def manage_delete_store(self, identity): self.store.pop(identity, None) @RPC.export + @deprecated(reason="Use list_configs") def manage_list_configs(self, identity): + """ + This method is deprecated and will be removed in VOLTTRON 10. Use list_configs instead + """ + return self.list_configs(identity) + + @RPC.export + def list_configs(self, identity): result = list(self.store.get(identity, {}).get("store", {}).keys()) result.sort() return result @RPC.export + @deprecated(reason="Use list_stores") def manage_list_stores(self): + """ + This method is deprecated and will be removed in VOLTTRON 10. Use list_stores instead + """ + return self.list_stores() + + @RPC.export + def list_stores(self): result = list(self.store.keys()) result.sort() return result @RPC.export + @deprecated(reason="Use get_config") def manage_get(self, identity, config_name, raw=True): + """ + This method is deprecated and will be removed in VOLTTRON 10. Use get_config instead + """ + return self.get_config(identity, config_name, raw) + + @RPC.export + def get_config(self, identity, config_name, raw=True): agent_store = self.store.get(identity) if agent_store is None: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) agent_configs = agent_store["configs"] agent_disk_store = agent_store["store"] @@ -232,8 +288,7 @@ def manage_get(self, identity, config_name, raw=True): config_name_lower = config_name.lower() if config_name_lower not in agent_name_map: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) real_config_name = agent_name_map[config_name_lower] @@ -243,11 +298,18 @@ def manage_get(self, identity, config_name, raw=True): return agent_configs[real_config_name] @RPC.export + @deprecated(reason="Use get_metadata") def manage_get_metadata(self, identity, config_name): + """ + This method is deprecated and will be removed in VOLTTRON 10. Please use get_metadata instead + """ + return self.get_metadata(identity, config_name) + + @RPC.export + def get_metadata(self, identity, config_name): agent_store = self.store.get(identity) if agent_store is None: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) agent_disk_store = agent_store["store"] agent_name_map = agent_store["name_map"] @@ -256,8 +318,7 @@ def manage_get_metadata(self, identity, config_name): config_name_lower = config_name.lower() if config_name_lower not in agent_name_map: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) real_config_name = agent_name_map[config_name_lower] @@ -269,24 +330,13 @@ def manage_get_metadata(self, identity, config_name): return real_config + @RPC.allow('edit_config_store') @RPC.export - def set_config(self, config_name, contents, trigger_callback=False, send_update=True): - identity = self.vip.rpc.context.vip_message.peer - self.store_config( - identity, - config_name, - contents, - trigger_callback=trigger_callback, - send_update=send_update, - ) - - @RPC.export - def get_configs(self): + def initialize_configs(self, identity): """ Called by an Agent at startup to trigger initial configuration state push. """ - identity = self.vip.rpc.context.vip_message.peer # We need to create store and lock if it doesn't exist in case someone # tries to add a configuration while we are sending the initial state. @@ -307,46 +357,31 @@ def get_configs(self): agent_configs = agent_store["configs"] agent_disk_store = agent_store["store"] agent_store_lock = agent_store["lock"] - - with agent_store_lock: - try: - self.vip.rpc.call(identity, "config.initial_update", - agent_configs).get(timeout=UPDATE_TIMEOUT) - except Unreachable: - _log.debug("Agent {} not currently running. Configuration update not sent.".format( - identity)) - except RemoteError as e: - _log.error("Agent {} failure when performing initial update: {}".format( - identity, e)) - except MethodNotFound as e: - _log.error("Agent {} failure when performing initial update: {}".format( - identity, e)) - except VIPError as e: - _log.error("VIP Error sending initial agent configuration: {}".format(e)) + if identity in self.vip.peerlist.peers_list: + with agent_store_lock: + try: + self.vip.rpc.call(identity, "config.initial_update", + agent_configs).get(timeout=UPDATE_TIMEOUT) + except Unreachable: + _log.debug("Agent {} not currently running. Configuration update not sent.".format(identity)) + except RemoteError as e: + _log.error("Agent {} failure when performing initial update: {}".format(identity, e)) + except MethodNotFound as e: + _log.error("Agent {} failure when performing initial update: {}".format(identity, e)) + except VIPError as e: + _log.error("VIP Error sending initial agent configuration: {}".format(e)) # If the store is empty (and nothing jumped in and added to it while we # were informing the agent) then remove it from the global store. if not agent_disk_store: self.store.pop(identity, None) - @RPC.export - def delete_config(self, config_name, trigger_callback=False, send_update=True): - """Called by an Agent to delete a configuration.""" - identity = self.vip.rpc.context.vip_message.peer - self.delete( - identity, - config_name, - trigger_callback=trigger_callback, - send_update=send_update, - ) - # Helper method to allow the local services to delete configs before message # bus in online. def delete(self, identity, config_name, trigger_callback=False, send_update=True): agent_store = self.store.get(identity) if agent_store is None: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) agent_configs = agent_store["configs"] agent_disk_store = agent_store["store"] @@ -357,8 +392,7 @@ def delete(self, identity, config_name, trigger_callback=False, send_update=True config_name_lower = config_name.lower() if config_name_lower not in agent_name_map: - raise KeyError('No configuration file "{}" for VIP IDENTIY {}'.format( - config_name, identity)) + raise KeyError('No configuration file "{}" for VIP IDENTITY {}'.format(config_name, identity)) real_config_name = agent_name_map[config_name_lower] @@ -369,7 +403,7 @@ def delete(self, identity, config_name, trigger_callback=False, send_update=True # Sync will delete the file if the store is empty. agent_disk_store.async_sync() - if send_update: + if send_update and identity in self.vip.peerlist.peers_list: with agent_store_lock: try: self.vip.rpc.call( @@ -380,15 +414,12 @@ def delete(self, identity, config_name, trigger_callback=False, send_update=True trigger_callback=trigger_callback, ).get(timeout=UPDATE_TIMEOUT) except Unreachable: - _log.debug( - "Agent {} not currently running. Configuration update not sent.".format( - identity)) + _log.debug("Agent {} not currently running. Configuration update not sent.".format(identity)) except RemoteError as e: - _log.error("Agent {} failure when deleting configuration {}: {}".format( - identity, config_name, e)) + _log.error("Agent {} failure when deleting configuration {}: {}".format(identity, config_name, e)) except MethodNotFound as e: - _log.error("Agent {} failure when adding/updating configuration {}: {}".format( - identity, config_name, e)) + _log.error( + "Agent {} failure when adding/updating configuration {}: {}".format(identity, config_name, e)) # If the store is empty (and nothing jumped in and added to it while we # were informing the agent) then remove it from the global store. @@ -397,12 +428,7 @@ def delete(self, identity, config_name, trigger_callback=False, send_update=True # Helper method to allow the local services to store configs before message # bus is online. - def store_config(self, - identity, - config_name, - contents, - trigger_callback=False, - send_update=True): + def store_config(self, identity, config_name, contents, trigger_callback=False, send_update=True): config_type = None raw_data = None if isinstance(contents, (dict, list)): @@ -412,34 +438,15 @@ def store_config(self, config_type = "raw" raw_data = contents else: - raise ValueError("Unsupported configuration content type: {}".format( - str(type(contents)))) - - self._add_config_to_store( - identity, - config_name, - raw_data, - contents, - config_type, - trigger_callback=trigger_callback, - send_update=send_update, - ) + raise ValueError("Unsupported configuration content type: {}".format(str(type(contents)))) + + self._add_config_to_store(identity, config_name, raw_data, contents, config_type, + trigger_callback=trigger_callback, send_update=send_update) - def _add_config_to_store( - self, - identity, - config_name, - raw, - parsed, - config_type, - trigger_callback=False, - send_update=True, - ): + def _add_config_to_store(self, identity, config_name, raw, parsed, config_type, trigger_callback=False, + send_update=True): """Adds a processed configuration to the store.""" agent_store = self.store.get(identity) - # Make sure that the agent is alive before sending update. - if send_update: - send_update = identity in self.vip.peerlist().get() action = "UPDATE" @@ -486,7 +493,7 @@ def _add_config_to_store( _log.debug("Agent {} config {} stored.".format(identity, config_name)) - if send_update: + if send_update and identity in self.vip.peerlist.peers_list: with agent_store_lock: try: self.vip.rpc.call( @@ -498,18 +505,14 @@ def _add_config_to_store( trigger_callback=trigger_callback, ).get(timeout=UPDATE_TIMEOUT) except Unreachable: - _log.debug( - "Agent {} not currently running. Configuration update not sent.".format( - identity)) + _log.debug("Agent {} not currently running. Configuration update not sent.".format(identity)) except RemoteError as e: - _log.error("Agent {} failure when adding/updating configuration {}: {}".format( - identity, config_name, e)) + _log.error( + "Agent {} failure when adding/updating configuration {}: {}".format(identity, config_name, e)) except MethodNotFound as e: - _log.error("Agent {} failure when adding/updating configuration {}: {}".format( - identity, config_name, e)) + _log.error( + "Agent {} failure when adding/updating configuration {}: {}".format(identity, config_name, e)) except gevent.timeout.Timeout: - _log.error("Config update to agent {} timed out after {} seconds".format( - identity, UPDATE_TIMEOUT)) + _log.error("Config update to agent {} timed out after {} seconds".format(identity, UPDATE_TIMEOUT)) except Exception as e: - _log.error("Unknown error sending update to agent identity {}.: {}".format( - identity, e)) + _log.error("Unknown error sending update to agent identity {}.: {}".format(identity, e)) diff --git a/src/volttron/services/health/health_service.py b/src/volttron/services/health/health_service.py index a09d57034..f136a1655 100644 --- a/src/volttron/services/health/health_service.py +++ b/src/volttron/services/health/health_service.py @@ -34,6 +34,7 @@ # TODO: rmq addition # from volttron.utils.rmq_config_params import RMQConfig # from volttron.utils.rmq_setup import start_rabbit, RabbitMQStartError +from volttron.services.auth.auth_service import AuthFile, AuthEntry _log = logging.getLogger(__name__) @@ -46,6 +47,17 @@ def __init__(self, **kwargs): # Store the health stats for given peers in a dictionary with # keys being the identity of the connected agent. self._health_dict = defaultdict(dict) + entry = AuthEntry( + credentials=self.core.publickey, + user_id=self.core.identity, + capabilities=[{ + "edit_config_store": { + "identity": self.core.identity + } + }], + comments="Automatically added on health service init" + ) + AuthFile().add(entry, overwrite=True) def peer_added(self, peer): """